package com.sf.publishandsubscribe; import com.rabbitmq.client.*; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 消息的消费 */ public class SubscribeMessageLog2 { //声明需要创建的交换器名称 private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("消费者客户端2"); //创建连接RabbitMQ服务器的连接 ConnectionFactory factory = new ConnectionFactory(); //设置连接RabbitMQ的服务器地址 factory.setHost("192.168.180.133"); //设置连接RabbitMQ的服务器AMQP端口号 factory.setPort(5672); //创建一个连接 Connection connection = factory.newConnection(); //创建频道 Channel channel = connection.createChannel(); /* 创建一个名为logs,类型为fanout交换器 RabbitMQ客户端提供了交换器类型的枚举BuiltinExchangeType */ channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //将交换器和随机队列绑定到一起 channel.queueBind(queueName,EXCHANGE_NAME,""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //获取订阅到的消息 String message = new String(delivery.getBody()); //将获取到的日志信息保存为本地文件 文件名为abc.log FileOutputStream fos = new FileOutputStream(new File("abc.log"), true); fos.write(message.getBytes()); fos.flush(); //关闭 fos.close(); }; //获取订阅到的消息 channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { }); } }