1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- package com.sf.helloworld;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- // 是消费者
- public class Consumer {
- public static void main(String[] args) throws Exception {
- // 先连接mq 然后生产消息
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- // 浏览器控制台的端口号是15672 mq服务本身的端口号是5672
- factory.setPort(5672);
- factory.setUsername("guest");
- factory.setPassword("guest");
- // 通过连接工厂创建连接
- Connection connection = factory.newConnection();
- // 通过连接 创建通道
- Channel channel = connection.createChannel();
- // 从mq中取消息
- // 匿名内部类
- // DeliverCallback deliverCallback = new DeliverCallback() {
- // @Override
- // public void handle(String consumerTag, Delivery message) throws IOException {
- //
- // }
- // };
- // 声明一个投递回调的接口类
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- byte[] body = delivery.getBody();
- String message = new String(body);
- System.out.println(message);
- // 投递的封皮的标记
- long deliveryTag = delivery.getEnvelope().getDeliveryTag();
- System.out.println(deliveryTag);
- };
- // 声明一个取消回调的接口类
- CancelCallback cancelCallback = (consumerTag) -> {
- System.out.println(consumerTag);
- };
- // 将两个处理逻辑给到队列
- // 对应参数为 队列名 是否自动应答 如何消费消息 如何取消消费
- channel.basicConsume("hello", true, deliverCallback, cancelCallback);
- // 关闭资源
- channel.close();
- connection.close();
- }
- }
|