123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- package com.sf.publishandsubscribe;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * 消息的订阅
- */
- public class SubscribeMessageLog1 {
- //声明需要创建的交换器名称
- private final static String EXCHANGE_NAME = "logs";
- public static void main(String[] args) throws IOException, TimeoutException {
- System.out.println("消费者客户端1");
- //创建连接RabbitMQ服务器的连接
- ConnectionFactory factory = new ConnectionFactory();
- //设置连接RabbitMQ的服务器地址
- factory.setHost("192.168.180.133");
- //设置连接RabbitMQ的服务器AMQP端口号
- factory.setPort(5672);
- //创建一个连接
- Connection connection = factory.newConnection();
- //创建频道
- Channel channel = connection.createChannel();
- /*
- 创建一个名为logs,类型为fanout交换器
- RabbitMQ客户端提供了交换器类型的枚举BuiltinExchangeType
- */
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
- //声明一个随机队列
- String queueName = channel.queueDeclare().getQueue();
- //将交换器和随机队列绑定到一起
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- //获取订阅到的消息
- String message = new String(delivery.getBody());
- System.out.println("订阅到的消息为: "+message);
- };
- //获取订阅到的消息
- channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
- }
- }
|