wuheng 2 жил өмнө
parent
commit
e3adb4b1d8

+ 3 - 2
rebbitmq/consumer/src/main/java/com/lovecoding/rabbitmq/consumer/ConsumerDemo.java

@@ -31,9 +31,10 @@ public class ConsumerDemo {
             public void handleDelivery(String consumerTag,
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        AMQP.BasicProperties properties,
-                                       byte[] body)
-                    throws IOException
+                                       byte[] body) throws IOException
             {
             {
+                //应答当前消息, 队列可以删除消息了
+                channel.basicAck(envelope.getDeliveryTag(), false);
                 //我们收到消息后打印消息
                 //我们收到消息后打印消息
                 System.out.println( new String( body ) );
                 System.out.println( new String( body ) );
             }
             }

+ 42 - 0
rebbitmq/consumer/src/main/java/com/lovecoding/rabbitmq/consumer/ConsumerTemplate.java

@@ -6,14 +6,56 @@ import org.springframework.stereotype.Component;
 @Component
 @Component
 public class ConsumerTemplate {
 public class ConsumerTemplate {
 
 
+    //自动应答消息, 自动处理消息
     @RabbitListener(queues = "testQueue")
     @RabbitListener(queues = "testQueue")
     public void t1( String msg ){
     public void t1( String msg ){
         System.out.println( "T1 消费了消息: " + msg );
         System.out.println( "T1 消费了消息: " + msg );
     }
     }
 
 
+    //自动应答消息, 自动处理消息
     @RabbitListener(queues = "testQueue")
     @RabbitListener(queues = "testQueue")
     public void t2( String msg ){
     public void t2( String msg ){
         System.out.println( "T2 消费了消息: " + msg );
         System.out.println( "T2 消费了消息: " + msg );
     }
     }
 
 
+    /**
+     * 广播模式 能接收所有消息
+     * 只要 队列和交换机绑定了 就能收消息
+     * @param msg
+     */
+    @RabbitListener(queues = "fanoutDemo")
+    public void t3( String msg ){
+        System.out.println( "T3 消费了消息 :" + msg );
+    }
+
+    /**
+     * 广播模式 能接收所有消息
+     * 只要 队列和交换机绑定了 就能收消息
+     * @param msg
+     */
+    @RabbitListener(queues = "fanoutDemoT4")
+    public void t4( String msg ){
+        System.out.println( "T4 消费了消息 :" + msg );
+    }
+
+    /**
+     * Direct 消息模式 routeKey 是 北京
+     * 只接收 key 是北京的消息
+     * @param msg
+     */
+    @RabbitListener(queues = "directQueue")
+    public void t5( String msg ) {
+        System.out.println( "北京天气:" + msg );
+    }
+
+    /**
+     * Direct 消息模式 routeKey 是 哈尔滨
+     * 只接收 key 是哈尔滨的消息
+     * @param msg
+     */
+    @RabbitListener(queues = "directQueueT6")
+    public void t6( String msg){
+        System.out.println( "哈尔滨天气:" + msg );
+    }
+
 }
 }

+ 37 - 0
rebbitmq/consumer/src/main/java/com/lovecoding/rabbitmq/consumer/DirectConfig.java

@@ -0,0 +1,37 @@
+package com.lovecoding.rabbitmq.consumer;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class DirectConfig {
+
+    @Bean
+    public DirectExchange directExchange(){
+        return new DirectExchange("com.lc.direct.demo");
+    }
+
+    @Bean
+    public Queue directQueue(){
+        return new Queue("directQueue");
+    }
+
+    @Bean
+    public Queue directQueueT6(){
+        return new Queue("directQueueT6");
+    }
+
+    @Bean
+    public Binding getBindingDirectQueue( Queue directQueue, DirectExchange directExchange ){
+        return BindingBuilder.bind(directQueue).to(directExchange).with( "北京" );
+    }
+
+    @Bean
+    public Binding getBindingDirectQueueT6(Queue directQueueT6, DirectExchange directExchange){
+        return BindingBuilder.bind(directQueueT6).to(directExchange).with("哈尔滨");
+    }
+}

+ 34 - 0
rebbitmq/consumer/src/main/java/com/lovecoding/rabbitmq/consumer/FanoutConfig.java

@@ -0,0 +1,34 @@
+package com.lovecoding.rabbitmq.consumer;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FanoutConfig {
+
+    @Bean
+    public FanoutExchange fanoutExchangeDemo(){
+        return new FanoutExchange("com.lc.fanout.demo");
+    }
+
+    @Bean
+    public Queue queueDemo(){
+        return new Queue("fanoutDemo");
+    }
+
+    @Bean
+    public Queue queueDemoT4(){
+        return new Queue("fanoutDemoT4");
+    }
+
+    @Bean
+    public Binding getBindingFanout( FanoutExchange fanoutExchangeDemo, Queue queueDemo ){
+        return BindingBuilder.bind(queueDemo).to(fanoutExchangeDemo);
+    }
+
+    @Bean
+    public Binding getBindingFanoutT4( FanoutExchange fanoutExchangeDemo, Queue queueDemoT4 ){
+        return BindingBuilder.bind(queueDemoT4).to(fanoutExchangeDemo);
+    }
+}

+ 15 - 3
rebbitmq/producer/src/test/java/com/lovecoding/rabbitmq/producer/ProducerApplicationTests.java

@@ -23,18 +23,30 @@ class ProducerApplicationTests {
 
 
     @Test
     @Test
     void t1() throws InterruptedException {
     void t1() throws InterruptedException {
-
         for (int i = 0; i < 100; i++) {
         for (int i = 0; i < 100; i++) {
             rabbitTemplate.convertAndSend(
             rabbitTemplate.convertAndSend(
                     "testQueue", "测试消息ID :" + i );
                     "testQueue", "测试消息ID :" + i );
             Thread.sleep(100);
             Thread.sleep(100);
         }
         }
-
     }
     }
 
 
+    @Test
+    void t2(){
+        rabbitTemplate.convertAndSend(
+                "com.lc.fanout.demo", "", "广播模式的消息发送" );
+    }
 
 
+    @Test
+    void t3(){
+        rabbitTemplate.convertAndSend(
+                "com.lc.direct.demo", "北京", "晴转多云" );
+    }
 
 
-
+    @Test
+    void t4(){
+        rabbitTemplate.convertAndSend(
+                "com.lc.direct.demo", "哈尔滨", "多云有雷阵雨" );
+    }
 
 
     @Test
     @Test
     void sendMsg() throws IOException, TimeoutException {
     void sendMsg() throws IOException, TimeoutException {