فهرست منبع

0809 amqp的fanout

Qing 8 ماه پیش
والد
کامیت
221ac4d831

+ 17 - 0
mq-demo/pom.xml

@@ -37,4 +37,21 @@
         </dependencies>
     </dependencyManagement>
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.11.0</version>
+                <configuration>
+                    <!-- 在controller直接接收参数生效 -->
+                    <parameters>true</parameters>
+                    <!-- 指定编译的jdk版本 -->
+                    <source>17</source>
+                    <target>17</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

+ 2 - 2
mq-demo/springboot-demo/src/main/java/com/sf/SpringbootDemoApplication.java → mq-demo/springboot-demo/src/main/java/com/sf/DemoApplication.java

@@ -4,10 +4,10 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 @SpringBootApplication
-public class SpringbootDemoApplication {
+public class DemoApplication {
 
     public static void main(String[] args) {
-        SpringApplication.run(SpringbootDemoApplication.class, args);
+        SpringApplication.run(DemoApplication.class, args);
     }
 
 }

+ 44 - 0
mq-demo/springboot-demo/src/main/java/com/sf/config/FanoutConfig.java

@@ -0,0 +1,44 @@
+package com.sf.config;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FanoutConfig {
+
+    // 这里也是创建一个持久化的交换机
+    // 要么是交换机已创建 并且是持久化的
+    // 要么是交换机未创建
+    // 对应解决方案是 要么删掉旧的 要么给一个新的
+    @Bean
+    public FanoutExchange fanoutExchange(){
+        return new FanoutExchange("fanoutExchange1");
+    }
+
+    @Bean
+    public Queue fanoutQueue(){
+        return new Queue("fanoutQueue");
+    }
+
+    // 此处方法的入参名要和前面声明的bean名称 保持一致
+    @Bean
+    public Binding fanoutBinding(Queue fanoutQueue, FanoutExchange fanoutExchange){
+        // 将队列绑定到交换机上
+        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
+    }
+
+    @Bean
+    public Queue fanoutQueue2(){
+        return new Queue("fanoutQueue2");
+    }
+
+    // 绑定第二个队列
+    @Bean
+    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
+        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
+    }
+}

+ 16 - 0
mq-demo/springboot-demo/src/main/java/com/sf/config/RabbitMqConfig.java

@@ -0,0 +1,16 @@
+package com.sf.config;
+
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RabbitMqConfig {
+
+    // 创建一个持久化的队列
+    // 注意引入的Queue的包 是amqp的
+    @Bean
+    public Queue queue() {
+        return new Queue("hello4");
+    }
+}

+ 18 - 0
mq-demo/springboot-demo/src/main/java/com/sf/listener/FanoutListener.java

@@ -0,0 +1,18 @@
+package com.sf.listener;
+
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FanoutListener {
+
+    @RabbitListener(queues = "fanoutQueue")
+    public void receiveFanout(String message) throws InterruptedException {
+        System.out.println("fanout1接受消息:" + message);
+    }
+
+    @RabbitListener(queues = "fanoutQueue2")
+    public void receiveFanout2(String message) throws InterruptedException {
+        System.out.println("fanout2接受消息:" + message);
+    }
+}

+ 22 - 0
mq-demo/springboot-demo/src/main/java/com/sf/listener/MyRabbitListener.java

@@ -0,0 +1,22 @@
+package com.sf.listener;
+
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+// 接受消息的监听器 声明为组件
+@Component
+public class MyRabbitListener {
+
+    @RabbitListener(queues = "hello4")
+    public void receive(String message) throws InterruptedException {
+        System.out.println("监听器1接受消息:" + message);
+        Thread.sleep(100);
+    }
+
+    @RabbitListener(queues = "hello4")
+    public void receive2(String message) throws InterruptedException {
+        System.out.println("监听器2接受消息:" + message);
+        Thread.sleep(1000);
+    }
+
+}

+ 4 - 0
mq-demo/springboot-demo/src/main/resources/application.yml

@@ -6,3 +6,7 @@ spring:
     port: 5672
     username: guest
     password: guest
+    listener:
+      simple:
+        # 设置预取值
+        prefetch: 1

+ 39 - 0
mq-demo/springboot-demo/src/test/java/com/sf/DemoApplicationTests.java

@@ -0,0 +1,39 @@
+package com.sf;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class DemoApplicationTests {
+
+    // 使用rabbitmq的工具类
+    // jdbcTemplate
+    // redisTemplate
+    // restTemplate
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @Test
+    void contextLoads() throws InterruptedException {
+        String queueName = "hello4";
+        String msg = "hello world";
+        for (int i = 0; i < 30; i++) {
+            // 发送消息 参数是 队列名和消息
+            rabbitTemplate.convertAndSend(queueName, msg + i);
+            Thread.sleep(100);
+        }
+
+    }
+
+    @Test
+    public void sendFanout() throws InterruptedException {
+        String exchangeName = "fanoutExchange1";
+        String message = "hello world";
+        rabbitTemplate.convertAndSend(exchangeName, "", message);
+    }
+
+
+}

+ 0 - 13
mq-demo/springboot-demo/src/test/java/com/sf/SpringbootDemoApplicationTests.java

@@ -1,13 +0,0 @@
-package com.sf;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class SpringbootDemoApplicationTests {
-
-    @Test
-    void contextLoads() {
-    }
-
-}