SubscribeMessageLog2.java 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package com.sf.publishandsubscribe;
  2. import com.rabbitmq.client.*;
  3. import java.io.File;
  4. import java.io.FileOutputStream;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 消息的消费
  9. */
  10. public class SubscribeMessageLog2 {
  11. //声明需要创建的交换器名称
  12. private final static String EXCHANGE_NAME = "logs";
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. System.out.println("消费者客户端2");
  15. //创建连接RabbitMQ服务器的连接
  16. ConnectionFactory factory = new ConnectionFactory();
  17. //设置连接RabbitMQ的服务器地址
  18. factory.setHost("192.168.180.133");
  19. //设置连接RabbitMQ的服务器AMQP端口号
  20. factory.setPort(5672);
  21. //创建一个连接
  22. Connection connection = factory.newConnection();
  23. //创建频道
  24. Channel channel = connection.createChannel();
  25. /*
  26. 创建一个名为logs,类型为fanout交换器
  27. RabbitMQ客户端提供了交换器类型的枚举BuiltinExchangeType
  28. */
  29. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
  30. //声明一个随机队列
  31. String queueName = channel.queueDeclare().getQueue();
  32. //将交换器和随机队列绑定到一起
  33. channel.queueBind(queueName,EXCHANGE_NAME,"");
  34. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  35. //获取订阅到的消息
  36. String message = new String(delivery.getBody());
  37. //将获取到的日志信息保存为本地文件 文件名为abc.log
  38. FileOutputStream fos = new FileOutputStream(new File("abc.log"), true);
  39. fos.write(message.getBytes());
  40. fos.flush();
  41. //关闭
  42. fos.close();
  43. };
  44. //获取订阅到的消息
  45. channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
  46. }
  47. }