Explorar o código

0808 mq的消息处理和fanout/direct

Qing hai 8 meses
pai
achega
1316c9da4f

+ 27 - 0
mq-demo/src/main/java/com/sf/exchange/direct/Consumer.java

@@ -0,0 +1,27 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static final String ExchangeName = "directExchange";
+    private static final String RoutingKey = "harbin";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 因为要使用多个队列  这里使用临时队列 名字是随机的
+        // 所谓临时 是在使用结束后自动删除的队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+
+        // 将队列名字和交换机名字绑定  路由的key不为空
+        channel.queueBind(queueName, ExchangeName, RoutingKey);
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
+    }
+}

+ 27 - 0
mq-demo/src/main/java/com/sf/exchange/direct/ConsumerBJ.java

@@ -0,0 +1,27 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class ConsumerBJ {
+
+    private static final String ExchangeName = "directExchange";
+    private static final String RoutingKey = "beijing";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 因为要使用多个队列  这里使用临时队列 名字是随机的
+        // 所谓临时 是在使用结束后自动删除的队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+
+        // 将队列名字和交换机名字绑定  路由的key不为空
+        channel.queueBind(queueName, ExchangeName, RoutingKey);
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
+    }
+}

+ 28 - 0
mq-demo/src/main/java/com/sf/exchange/direct/Producer.java

@@ -0,0 +1,28 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static final String ExchangeName = "directExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String input = scanner.nextLine();
+            String[] split = input.split(" ");
+            String message = split[0];
+            String routingKey = split[1];
+            // 在发布消息时 需要指定路由的key 意思是找到和这个key匹配的队列
+            channel.basicPublish(ExchangeName, routingKey, null, message.getBytes());
+            System.out.println("message: " + message + ", routingKey: " + routingKey);
+        }
+    }
+}

+ 26 - 0
mq-demo/src/main/java/com/sf/exchange/fanout/Consumer.java

@@ -0,0 +1,26 @@
+package com.sf.exchange.fanout;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static final String ExchangeName = "fanoutExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 因为要使用多个队列  这里使用临时队列 名字是随机的
+        // 所谓临时 是在使用结束后自动删除的队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+
+        // 将队列名字和交换机名字绑定  路由的key为空
+        channel.queueBind(queueName, ExchangeName, "");
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
+    }
+}

+ 26 - 0
mq-demo/src/main/java/com/sf/exchange/fanout/Producer.java

@@ -0,0 +1,26 @@
+package com.sf.exchange.fanout;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static final String ExchangeName = "fanoutExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+//        channel.exchangeDeclare(ExchangeName, "fanout");
+        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.FANOUT);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            // 在发布消息时 不指定路由的key 意思是将消息发送给所有绑定这个交换机的队列
+            channel.basicPublish(ExchangeName, "", null, message.getBytes());
+            System.out.println("message: " + message);
+        }
+    }
+}

+ 33 - 0
mq-demo/src/main/java/com/sf/helloworld/plus/Consumer.java

@@ -0,0 +1,33 @@
+package com.sf.helloworld.plus;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import com.sf.util.RabbitMqUtils;
+
+import java.io.IOException;
+
+public class Consumer {
+
+    private static final String QUEUE_NAME = "helloworld_queue";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 使用rabbitmq 提供的 默认的消费者类
+        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
+            // ctrl + N  调出生成的菜单 选择重写的方法 选择handleDelivery
+            @Override
+            public void handleDelivery(
+                    String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+//                super.handleDelivery(consumerTag, envelope, properties, body);
+                // 处理消息的逻辑
+                System.out.println("message: " + new String(body));
+                // 手动应答
+                channel.basicAck(envelope.getDeliveryTag(), false);
+            }
+        };
+        // 将准备好的消费逻辑 传给队列
+        channel.basicConsume(QUEUE_NAME, defaultConsumer);
+    }
+}

+ 37 - 0
mq-demo/src/main/java/com/sf/helloworld/plus/Producer.java

@@ -0,0 +1,37 @@
+package com.sf.helloworld.plus;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.sf.util.RabbitMqUtils;
+
+public class Producer {
+    // 声明交换机、队列和绑定关系的名字
+    private static final String EXCHANGE_NAME = "helloworld_exchange";
+    private static final String QUEUE_NAME = "helloworld_queue";
+    private static final String ROUTING_KEY = "helloworld";
+
+    public static void main(String[] args) throws Exception{
+//        ConnectionFactory factory = new ConnectionFactory();
+//        // 类似于mysql中的一个具体数据库
+////        factory.setVirtualHost("test");
+//        factory.setUri("amqp://guest:guest@127.0.0.1/test");
+//        Connection connection = factory.newConnection();
+//        Channel channel = connection.createChannel();
+
+        Channel channel = RabbitMqUtils.getChannel();
+
+        // 声明交换机 -> 声明队列 -> 声明交换机和队列的绑定关系
+        // 对应参数为 交换机名字 交换机类型 是否序列化 是否自动删除 其他参数
+        channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,null);
+        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
+        // 将队列名 交换机名 和 路由的key 绑定在一起
+        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
+
+        String message = "Hello World!";
+        // 发送消息时 是通过 交换机名字 路由的key + 消息的字节数组
+        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null,message.getBytes());
+
+        channel.close();
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/message/durable/Consumer.java

@@ -0,0 +1,24 @@
+package com.sf.message.durable;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello2";
+
+    public static void main(String[] args) throws  Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+//            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+//            System.out.println(deliveryTag);
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
+    }
+
+}

+ 36 - 0
mq-demo/src/main/java/com/sf/message/durable/Producer.java

@@ -0,0 +1,36 @@
+package com.sf.message.durable;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.MessageProperties;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 当创建过一个队列后 不能修改参数重新创建
+        // 声明队列是持久化的
+        channel.queueDeclare(queueName, true, false, false, null);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNextLine()) {
+            String message = scanner.nextLine();
+            // 发送的格式是 消息类型和消息自身  0,123 代表发送非持久化的消息 123 如果类型是1 代表持久化的消息
+            // 将消息本身设置为持久化的
+            String[] split = message.split(",");
+            String msg = split[1];
+            switch (split[0]) {
+                case "0":
+                    channel.basicPublish("", queueName, null, msg.getBytes());
+                    break;
+                case "1":
+                    channel.basicPublish("", queueName, MessageProperties.PERSISTENT_BASIC, msg.getBytes());
+            }
+            System.out.println("Producer send message : " + msg);
+        }
+    }
+}

+ 22 - 0
mq-demo/src/main/java/com/sf/message/pull/Consumer.java

@@ -0,0 +1,22 @@
+package com.sf.message.pull;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.GetResponse;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        // 消息的处理有两种模式 推和拉
+        //  basicAck 推送,是由mq主动推送的方式
+        //  basicGet 拉取,消费者主动获取消息
+        GetResponse response = channel.basicGet(queueName, true);
+        byte[] body = response.getBody();
+        String message = new String(body, "UTF-8");
+        System.out.println(message);
+        channel.close();
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/message/pull/Producer.java

@@ -0,0 +1,24 @@
+package com.sf.message.pull;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 声明队列
+        channel.queueDeclare(queueName, false, false, false, null);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            channel.basicPublish("", queueName, null, message.getBytes());
+            System.out.println("Producer send message : " + message);
+        }
+    }
+}

+ 33 - 0
mq-demo/src/main/java/com/sf/message/qos/Consumer.java

@@ -0,0 +1,33 @@
+package com.sf.message.qos;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class Consumer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        // 预取值
+        int prefetchCount = 5;
+        channel.basicQos(prefetchCount);
+
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+            try {
+                Thread.sleep(5000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            channel.basicAck(deliveryTag, false);
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
+    }
+}

+ 33 - 0
mq-demo/src/main/java/com/sf/message/qos/ConsumerFast.java

@@ -0,0 +1,33 @@
+package com.sf.message.qos;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.sf.util.RabbitMqUtils;
+
+public class ConsumerFast {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception{
+        Channel channel = RabbitMqUtils.getChannel();
+        // 预取值
+        int prefetchCount = 5;
+        channel.basicQos(prefetchCount);
+
+        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
+            String message = new String(delivery.getBody(), "UTF-8");
+            System.out.println(message);
+            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            channel.basicAck(deliveryTag, false);
+        };
+        CancelCallback cancelCallback = (consumerTag) -> {};
+        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
+    }
+}

+ 24 - 0
mq-demo/src/main/java/com/sf/message/qos/Producer.java

@@ -0,0 +1,24 @@
+package com.sf.message.qos;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.RabbitMqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static String queueName = "hello1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = RabbitMqUtils.getChannel();
+        // 声明队列
+        channel.queueDeclare(queueName, false, false, false, null);
+        System.out.println("输入消息:");
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            channel.basicPublish("", queueName, null, message.getBytes());
+            System.out.println("Producer send message : " + message);
+        }
+    }
+}

+ 3 - 0
mq-demo/src/main/java/com/sf/workqueue/Consumer.java

@@ -9,6 +9,9 @@ public class Consumer {
 
     private static String queueName = "hello";
 
+    // 默认是轮询的分发策略
+    // 如果有n个消费者 如果有m条消息  此时的分发方式为 将第m条消息分发给第 m%n对应的消费者
+    // 但往往不是所有消费者的处理速度相同 可以进一步根据消费者的处理能力进行分发
     public static void main(String[] args) throws  Exception{
         Channel channel = RabbitMqUtils.getChannel();
         DeliverCallback deliverCallback = (consumerTag, delivery) -> {