Qing 1 rok temu
rodzic
commit
3cc0b71245

+ 38 - 0
rabbitmq-demo/.gitignore

@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store

+ 8 - 0
rabbitmq-demo/.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 5 - 0
rabbitmq-demo/.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,5 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+  </profile>
+</component>

+ 24 - 0
rabbitmq-demo/pom.xml

@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.sf</groupId>
+  <artifactId>rabbitmq-demo</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <name>rabbitmq-demo</name>
+  <url>http://maven.apache.org</url>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>5.20.0</version>
+    </dependency>
+  </dependencies>
+</project>

+ 57 - 0
rabbitmq-demo/src/main/java/com/sf/helloworld/Consumer.java

@@ -0,0 +1,57 @@
+package com.sf.helloworld;
+
+import com.rabbitmq.client.*;
+
+import java.io.IOException;
+
+public class Consumer {
+
+    public static void main(String[] args) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("127.0.0.1");
+        factory.setPort(5672);
+        factory.setUsername("guest");
+        factory.setPassword("guest");
+        Connection connection = factory.newConnection();
+        // 连接要被复用  通过连接要得到通道
+        Channel channel = connection.createChannel();
+        // 如果队列已经存在 再次声明(使用相同的参数) 不会有任何操作
+        // 如果队列已经存在 再次声明(使用不同的参数) 会有报错
+        channel.queueDeclare("hello1", false, false,
+                false, null);
+//        DeliverCallback deliverCallback = new DeliverCallback() {
+//            @Override
+//            public void handle(String consumerTag, Delivery message) throws IOException {
+//                // 消费者标记  传递的内容
+//                String body = new String(message.getBody());
+//                // 消息的标记
+//                long deliveryTag = message.getEnvelope().getDeliveryTag();
+//                System.out.println(deliveryTag);
+//                System.out.println(body);
+//            }
+//        };
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            // 消费者标记  传递的内容
+            String body = new String(message.getBody());
+            // 消息的标记
+            long deliveryTag = message.getEnvelope().getDeliveryTag();
+            System.out.println(deliveryTag);
+            System.out.println(body);
+
+        };
+//        CancelCallback cancelCallback = new CancelCallback() {
+//            @Override
+//            public void handle(String consumerTag) throws IOException {
+//                System.out.println(consumerTag);
+//            }
+//        };
+        CancelCallback cancelCallback = consumerTag -> {
+            System.out.println(consumerTag);
+        };
+//        CancelCallback cancelCallback1 = System.out::println;
+        // 队列名 是否自动应答 回调接口(如何消费消息) 取消回调
+        channel.basicConsume("hello1", true, deliverCallback, cancelCallback);
+//        channel.close();
+//        connection.close();
+    }
+}

+ 29 - 0
rabbitmq-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -0,0 +1,29 @@
+package com.sf.helloworld;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class Producer {
+
+    public static void main(String[] args) throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+//        factory.setHost("127.0.0.1");
+//        factory.setPort(5672);
+//        factory.setUsername("guest");
+//        factory.setPassword("guest");
+        factory.setUri("amqp://test:123456@127.0.0.1:5672/test1");
+        Connection connection = factory.newConnection();
+        // 连接要被复用  通过连接要得到通道
+        Channel channel = connection.createChannel();
+        // 声明一个叫做 hello的队列
+        // 参数代表  是否持久化  是否排他(独自消费)  是否自动删除  其他参数
+        channel.queueDeclare("hello1", false, false,
+                false, null);
+        String message = "hello world 333";
+        channel.basicPublish("","hello1",null,message.getBytes());
+
+        channel.close();
+        connection.close();
+    }
+}

+ 20 - 0
rabbitmq-demo/src/main/java/com/sf/util/MqUtils.java

@@ -0,0 +1,20 @@
+package com.sf.util;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+public class MqUtils {
+
+    public static Channel getChannel() throws Exception {
+        ConnectionFactory factory = new ConnectionFactory();
+        factory.setHost("127.0.0.1");
+        factory.setPort(5672);
+        factory.setUsername("guest");
+        factory.setPassword("guest");
+        Connection connection = factory.newConnection();
+        // 连接要被复用  通过连接要得到通道
+        Channel channel = connection.createChannel();
+        return channel;
+    }
+}

+ 25 - 0
rabbitmq-demo/src/main/java/com/sf/workqueue/Consumer.java

@@ -0,0 +1,25 @@
+package com.sf.workqueue;
+
+import com.rabbitmq.client.CancelCallback;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+import java.io.IOException;
+
+public class Consumer {
+
+    private final static String QUEUE_NAME = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 23 - 0
rabbitmq-demo/src/main/java/com/sf/workqueue/Producer.java

@@ -0,0 +1,23 @@
+package com.sf.workqueue;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private final static String QUEUE_NAME = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            System.out.println(message);
+            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
+        }
+    }
+}