Consumer.java 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package com.sf.helloworld;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. // 是消费者
  5. public class Consumer {
  6. public static void main(String[] args) throws Exception {
  7. // 先连接mq 然后生产消息
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. // 浏览器控制台的端口号是15672 mq服务本身的端口号是5672
  11. factory.setPort(5672);
  12. factory.setUsername("guest");
  13. factory.setPassword("guest");
  14. // 通过连接工厂创建连接
  15. Connection connection = factory.newConnection();
  16. // 通过连接 创建通道
  17. Channel channel = connection.createChannel();
  18. // 从mq中取消息
  19. // 匿名内部类
  20. // DeliverCallback deliverCallback = new DeliverCallback() {
  21. // @Override
  22. // public void handle(String consumerTag, Delivery message) throws IOException {
  23. //
  24. // }
  25. // };
  26. // 声明一个投递回调的接口类
  27. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  28. byte[] body = delivery.getBody();
  29. String message = new String(body);
  30. System.out.println(message);
  31. // 投递的封皮的标记
  32. long deliveryTag = delivery.getEnvelope().getDeliveryTag();
  33. System.out.println(deliveryTag);
  34. };
  35. // 声明一个取消回调的接口类
  36. CancelCallback cancelCallback = (consumerTag) -> {
  37. System.out.println(consumerTag);
  38. };
  39. // 将两个处理逻辑给到队列
  40. // 对应参数为 队列名 是否自动应答 如何消费消息 如何取消消费
  41. channel.basicConsume("hello", true, deliverCallback, cancelCallback);
  42. // 关闭资源
  43. channel.close();
  44. connection.close();
  45. }
  46. }