SubscribeMessageLog1.java 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package com.sf.publishandsubscribe;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * 消息的订阅
  7. */
  8. public class SubscribeMessageLog1 {
  9. //声明需要创建的交换器名称
  10. private final static String EXCHANGE_NAME = "logs";
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. System.out.println("消费者客户端1");
  13. //创建连接RabbitMQ服务器的连接
  14. ConnectionFactory factory = new ConnectionFactory();
  15. //设置连接RabbitMQ的服务器地址
  16. factory.setHost("192.168.180.133");
  17. //设置连接RabbitMQ的服务器AMQP端口号
  18. factory.setPort(5672);
  19. //创建一个连接
  20. Connection connection = factory.newConnection();
  21. //创建频道
  22. Channel channel = connection.createChannel();
  23. /*
  24. 创建一个名为logs,类型为fanout交换器
  25. RabbitMQ客户端提供了交换器类型的枚举BuiltinExchangeType
  26. */
  27. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  28. //声明一个随机队列
  29. String queueName = channel.queueDeclare().getQueue();
  30. //将交换器和随机队列绑定到一起
  31. channel.queueBind(queueName,EXCHANGE_NAME,"");
  32. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  33. //获取订阅到的消息
  34. String message = new String(delivery.getBody());
  35. System.out.println("订阅到的消息为: "+message);
  36. };
  37. //获取订阅到的消息
  38. channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
  39. }
  40. }