Bläddra i källkod

0808 mq的简单模式和工作队列

Qing 8 månader sedan
förälder
incheckning
a182268c4d

+ 38 - 0
mq-demo/.gitignore

@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store

+ 24 - 0
mq-demo/pom.xml

@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.sf</groupId>
+    <artifactId>mq-demo</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>mq-demo</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>5.20.0</version>
+        </dependency>
+    </dependencies>
+</project>

+ 11 - 0
mq-demo/src/main/java/com/sf/App.java

@@ -0,0 +1,11 @@
+package com.sf;
+
+/**
+ * Hello world!
+ */
+public class App {
+
+    public static void main(String[] args) {
+        System.out.println("Hello World!");
+    }
+}

+ 53 - 0
mq-demo/src/main/java/com/sf/helloworld/Consumer.java

@@ -0,0 +1,53 @@
+package com.sf.helloworld;
+
+import com.rabbitmq.client.*;
+
+import java.io.IOException;
+
+// 是消费者
+public class Consumer {
+
+    public static void main(String[] args) throws Exception {
+        // 先连接mq 然后生产消息
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        // 浏览器控制台的端口号是15672 mq服务本身的端口号是5672
+        factory.setPort(5672);
+        factory.setUsername("guest");
+        factory.setPassword("guest");
+        // 通过连接工厂创建连接
+        Connection connection = factory.newConnection();
+        // 通过连接 创建通道
+        Channel channel = connection.createChannel();
+
+        // 从mq中取消息
+        // 匿名内部类
+//        DeliverCallback deliverCallback = new DeliverCallback() {
+//            @Override
+//            public void handle(String consumerTag, Delivery message) throws IOException {
+//
+//            }
+//        };
+        // 声明一个投递回调的接口类
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            byte[] body = delivery.getBody();
+            String message = new String(body);
+            System.out.println(message);
+            // 投递的封皮的标记
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+        };
+        // 声明一个取消回调的接口类
+        CancelCallback cancelCallback = (consumerTag) -> {
+            System.out.println(consumerTag);
+        };
+
+        // 将两个处理逻辑给到队列
+        // 对应参数为 队列名 是否自动应答 如何消费消息 如何取消消费
+        channel.basicConsume("hello", true, deliverCallback, cancelCallback);
+
+        // 关闭资源
+        channel.close();
+        connection.close();
+    }
+}

+ 36 - 0
mq-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -0,0 +1,36 @@
+package com.sf.helloworld;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+// 简单模式:一个生产者、一个消费者、一个队列
+// 当前是生产者
+public class Producer {
+
+    public static void main(String[] args) throws Exception{
+        // 先连接mq 然后生产消息
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        // 浏览器控制台的端口号是15672 mq服务本身的端口号是5672
+        factory.setPort(5672);
+        factory.setUsername("guest");
+        factory.setPassword("guest");
+        // 通过连接工厂创建连接
+        Connection connection = factory.newConnection();
+        // 通过连接 创建通道
+        Channel channel = connection.createChannel();
+        // 声明一个队列
+        // 对应参数为 队列名称 是否持久化 是否独自消费 是否自动删除 其他参数
+        channel.queueDeclare("hello", false, false, false, null);
+        // 准备消息
+        String message = "Hello World123";
+        // 通过通道的方法 发布  将消息发布到对应的队列上 使用消息的字节数组
+        channel.basicPublish("", "hello", null, message.getBytes());
+
+        // 要关闭资源
+        channel.close();
+        connection.close();
+
+    }
+}

+ 26 - 0
mq-demo/src/main/java/com/sf/helloworld/official/Recv.java

@@ -0,0 +1,26 @@
+package com.sf.helloworld.official;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.DeliverCallback;
+
+public class Recv {
+
+    private final static String QUEUE_NAME = "hello";
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        Connection connection = factory.newConnection();
+        Channel channel = connection.createChannel();
+
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(" [x] Received '" + message + "'");
+        };
+        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/helloworld/official/Send.java

@@ -0,0 +1,24 @@
+package com.sf.helloworld.official;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class Send {
+    private final static String QUEUE_NAME = "hello";
+
+    public static void main(String[] argv) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        // 在try中声明资源 当try执行后 会自动的关闭资源
+        // 是jdk8后对 try的使用优化
+        try (Connection connection = factory.newConnection();
+             Channel channel = connection.createChannel()) {
+
+            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+            String message = "Hello World!";
+            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
+            System.out.println(" [x] Sent '" + message + "'");
+        }
+    }
+}

+ 33 - 0
mq-demo/src/main/java/com/sf/message/ack/Consumer.java

@@ -0,0 +1,33 @@
+package com.sf.message.ack;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws  Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+            // 当手动应答时 消费者可能成功消费消息 也可能拒绝消息 给其他消费者来消费
+            // 如果消费成功 使用basicAck 需要的参数 投递标记 是否应答多个
+            channel.basicAck(deliveryTag,false);
+
+            // 拒绝消息 分为消息本身是好的完整的 需要重新入队列
+            //            消息是坏的 其他消费者也消费不了 直接丢弃(不入队列)
+//            channel.basicReject(deliveryTag,true);
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        // 消息的应答 分为自动应答和手动应答
+        boolean autoAck = false;
+        channel.basicConsume(queueName, autoAck, deliverCallback, cancelCallback);
+    }
+
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/message/ack/Producer.java

@@ -0,0 +1,24 @@
+package com.sf.message.ack;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 声明队列
+        channel.queueDeclare(queueName, false, false, false, null);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            channel.basicPublish("", queueName, null, message.getBytes());
+            System.out.println("Producer send message : " + message);
+        }
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/message/publish/Consumer.java

@@ -0,0 +1,24 @@
+package com.sf.message.publish;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws  Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+//            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+//            System.out.println(deliveryTag);
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
+    }
+
+}

+ 44 - 0
mq-demo/src/main/java/com/sf/message/publish/Producer.java

@@ -0,0 +1,44 @@
+package com.sf.message.publish;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        channel.queueDeclare(queueName, false, false, false, null);
+
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()){
+            String message = scanner.next();
+            // 发送一个持久化的消息
+//            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+//                    .contentType("text/plain").deliveryMode(2).priority(1).build();
+//            // void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
+//            channel.basicPublish("", queueName, basicProperties, message.getBytes());
+
+            // 发送一个带有header的消息
+//            Map<String,Object> headers = new HashMap<>();
+//            headers.put("location","here");
+//            headers.put("time",System.currentTimeMillis());
+//            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().headers(headers).build();
+//            channel.basicPublish("",queueName,basicProperties,message.getBytes());
+
+            // 发送一个带有过期时间的消息
+            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+                    .expiration("10000").build();
+            channel.basicPublish("", queueName, basicProperties, message.getBytes());
+
+        }
+
+    }
+}

+ 19 - 0
mq-demo/src/main/java/com/sf/util/RabbitMqUtils.java

@@ -0,0 +1,19 @@
+package com.sf.util;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class RabbitMqUtils {
+
+    public static Channel getChannel() throws Exception{
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("localhost");
+        factory.setPort(5672);
+        factory.setUsername("guest");
+        factory.setPassword("guest");
+        Connection connection = factory.newConnection();
+        Channel channel = connection.createChannel();
+        return channel;
+    }
+}

+ 39 - 0
mq-demo/src/main/java/com/sf/workqueue/Consumer.java

@@ -0,0 +1,39 @@
+package com.sf.workqueue;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello";
+
+    public static void main(String[] args) throws  Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+
+            // 官网示例的主要区别是  增加了一个消费者的等待处理
+            // https://www.rabbitmq.com/tutorials/tutorial-two-java
+            try {
+                doWork(message);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                System.out.println(" [x] Done");
+            }
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
+    }
+
+    private static void doWork(String task) throws InterruptedException {
+        for (char ch: task.toCharArray()) {
+            if (ch == '.') Thread.sleep(1000);
+        }
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/workqueue/Producer.java

@@ -0,0 +1,24 @@
+package com.sf.workqueue;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 声明队列
+        channel.queueDeclare(queueName, false, false, false, null);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            channel.basicPublish("", queueName, null, message.getBytes());
+            System.out.println("Producer send message : " + message);
+        }
+    }
+}