RabbitMQ的大致复习
# RabbitMQ
## 环境搭建
```shell
docker run -d --name rabbitmq \
-p 5671:5671 -p 5672:5672 \
-p 4639:4639 -p 25672:25672 \
--restart=always \
-p 15671:15671 -p 15672:15672 rabbitmq:management
```
### 测试环境是否可用
`http://虚拟机ip:15672`


## 整合RabbitMQ
1. POM
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
```
2. YAML
```yaml
rabbitmq:
host: 192.168.10.131
port: 5672
username: guest
password: guest
```
3. 主启动
```java
@Configuration
@EnableRabbit
public class RabbitMQConfig {
}
```
### 单元测试
#### 大坑
> 这个坑出现了很多次了, 再次说明, 如果测试目录和源码目录不一致, 会导致组件无法注入的情况, 也会导致忽略文件不能正常忽略的错误
```java
@SpringBootTest
@RunWith(SpringRunner.class)
public class BitmallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void AmqpAdminTest1() {
amqpAdmin.declareExchange(new DirectExchange("hello_exchange", true, false));
}
@Test
public void AmqpAdminTest2() {
// 排他队列: 排他队列相当于加了个重量级锁, 一个队列不能被同时监听, 只能被一个客户端监听
amqpAdmin.declareQueue(new Queue("hello_queue", true, false, false));
}
@Test
public void AmqpAdminTest3() {
amqpAdmin.declareBinding(new Binding("hello_queue", Binding.DestinationType.QUEUE, "hello_exchange"
, "hello_routing_key", null));
}
@Test
public void sendMsg1() {
// 我们需要调用如下的方法, 先将目标Java对象转换, 然后再发送, 不调用则需要自己封装Message对象
// 1. 交换机 2. 路由键 3. 消息实体 4. 唯一标识
rabbitTemplate
.convertAndSend("hello_exchange",
"hello_routing_key",
"我是一条消息", new CorrelationData(UUID.randomUUID().toString()));
}
@Test
public void sendMsg2() {
rabbitTemplate.convertAndSend("hello_exchange",
"hello_routing_key",
new Mem(), new CorrelationData(UUID.randomUUID().toString()));
}
static class Mem implements Serializable {
private Integer num = 1;
}
}
```
> **上述定义: 排他队列的解释 API的解释**
### 发送消息的问题

> 由上图可知, Java存储的本质就是序列化, 所以, 想要存储在消息队列的对象所在的类都需要实现序列化接口, 否则会抛异常, 但是, 这种情况会影响程序扩展性, 不采用, 因此, 我们需要用JSON替代序列化
```java
@Configuration
@EnableRabbit
public class RabbitMQConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
```
### 其他测试以及结论
```java
@Component
@Slf4j
@RabbitListener(queues = "hello_queue")
public class HelloServiceImpl {
@RabbitHandler
public void listen4() {
// 有了之前的操作, 为什么需要由@RabbitHandler呢
// 因此一个消息队列里面由各种类型的消息组成, 因此, 为了让不同的消息有不同的处理方式, 因此需要该注解
// 参数写不同点类型代表处理这种类型的消息
}
/*@RabbitListener(queues = "hello_queue")
public void listen1(Object object) {
// 2023-09-03 21:29:25.068 INFO 67880 --- [ntContainer#0-1] c.j.b.o.service.impl.HelloServiceImpl :
// 消息体(Body:'"消息9"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json,
// contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false,
// receivedExchange=hello_exchange, receivedRoutingKey=hello_routing_key, deliveryTag=5,
// consumerTag=amq.ctag-Sx6_ThVA8lCPbrChKxt_iQ, consumerQueue=hello_queue])
// 消息类型class org.springframework.amqp.core.Message
// -------------------------------------------------------------
// 说明了接受的消息是Message类型
// 由消息体本身(数据本身), 消息的属性信息组成
log.info("消息体{}, 消息类型{}", object, object.getClass());
}*/
/* @RabbitListener(queues = "hello_queue")
public void listen2(String res, Channel channel) {
// 说明消息体可用自动封装
// 信道所在的包是 com.rabbitmq.client.Channel, 不要导错了
log.info("消息本体的封装对象{}, 信道{}", res, channel);
}*/
/* @RabbitListener(queues = "hello_queue")
public void listen3(String res) {
try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}
log.info("获取到消息:{}", res);
// 消息不可以被重复消费, 消息只能被消费一次
// 在并发的场景下, 默认采用轮询的方式分配消息
// 如果消费者没有处理完消息, 不能处理下一个消息, 必须等待当前消息处理结束
}*/
}
```


> 上述定义: 为什么是Message类型, 参数可用什么(3个), 消费者的特性
## 复习
### 消息丢失的三大场景
> 消息丢失不仅仅有下面的场景, 还有更多的场景, 例如到交换机就丢失了, 在丢列丢失了(TTL, 溢出, NACK等), 持久化失败等场景, 我们主要讨论过程中的消息丢失
> 前提: 消息的生产到最终的消费会经历三大过程
> 第一个过程是从P->B, 即生产者到消息中间件实体
> 第二个过程是从E->Q, 即交换机到队列
> 第三个过程是从Q->C, 即队列到消费者
### 解决消息丢失
#### 传统方案
> 如果想要解决消息丢失问题, 最传统的方案就是开启事务消息, 即在发送消息的信道上开启事务, 这确实可以解决消息丢失问题
> 但是, 如果开启了事务消息, 会导致开小变大, 吞吐量和延迟等参数变差, 因此, 不建议使用事务消息
#### 确认机制方案

> 概括而言, 确认机制分为两段, 分别这发送消息的确认机制, 还有接受消息的应答机制
1. ComfirmCallBack
> ConfirmCallBack: 被称为确认回调, 主要作用于P -> B这个过程, 即, 在单体应用的前提下, 如果RabbitMQ实体能正常获取消息, 那么就会触发确认回调, 在集群的前提下, 如果所有的节点都能正常获取消息, 才会触发确认回调
> 如果确认回调被执行, 仅仅能说明消息成功到达RabbitMQ, 不能保证其一定能推送到队列, 或者被消费, 只能保证其所在的流程是没问题的
```yaml
spring:
rabbitmq:
publisher-confirms: true
```
```java
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
log.info("唯一标识{}, 是否可以正确收到{}, 失败的原因{}", correlationData, ack, cause);
};
}
@PostConstruct // 这个注解下的方法不能有任何的参数
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(confirmCallback()); // 仅仅会自动注入组件, 并不会再次执行
}
```
2. ReturnCallBack
> RenturnCallBack: 被称为退回回调, 主要作用域E -> Q这个过程, 即, 在单体应用的前提下, 如果交换机的数据不能正常推送到队列, 那么就会触发退回回调, 在集群的前提下, 如果某一个交换机的数据不能推送到队列, 那么也会触发退回回调
> 如果没有执行退回回调, 只能说明消息能正常抵达队列, 并不能说明其一定可以被消费成功, 只能保证其所在流程是没有任何问题的
```yaml
spring:
rabbitmq:
publisher-returns: true
template:
mandatory: true # 异步执行退回回调
```
```java
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return (message, replyCode, replyText, exchange, routingKey) -> {
log.info("该消息无法正常抵达队列");
log.info("退回的消息:{}, 退回状态码{}, 退回原因{}, 交换机{}, 路由键{}", message, replyCode, replyText, exchange, routingKey);
};
}
@PostConstruct // 这个注解下的方法不能有任何的参数
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(confirmCallback()); // 仅仅会自动注入组件, 并不会再次执行
rabbitTemplate.setReturnCallback(returnCallback());
}
```
```java
2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 该消息无法正常抵达队列
2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 退回的消息:(Body:'"消息6"' MessageProperties [headers={__TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), 退回状态码312, 退回原因NO_ROUTE, 交换机hello_exchange, 路由键hello_routing_key1
2023-09-04 07:18:21.734 INFO 48404 --- [168.10.131:5672] c.j.bitmall.order.config.RabbitMQConfig : 唯一标识null, 是否可以正确收到true, 失败的原因null
```
#### 应答机制
> 在默认情况下, 如果消费者成功接收到消息, 会自动ACK, 即自动应答, 一旦RabbitMQ的消息队列接收到确认应答, 那么就会把对应的消息删除, 如果一直获取不到应答消息, 会一直存储
> 但是, 除非在极高吞吐量要求的前提下, 不建议使用自动应答, 如果使用自动应答, 那么意味着消费者端接收到消息就应答了, 消息队列就删除了对应的消息, 如果此时宕机, 消息没有办法被正常处理, 消息就彻底丢失了
> 因此, 我们需要采用手动确认应答, 但是, 在手动确认应答的前提下, 我们不能使用批量应答, 如果我们使用批量确认应答或者是否认应答, 会见信道内所有没有应答的消息都应答了, 这部分消息会有丢失的可能
##### 关闭微服务, 为什么RabbitMQ中消息队列里面的消息没了?
> 当我们关闭微服务的时候, 监听消息队列的方法会将所有的消息给处理完, 才能正常的关闭微服务, 除非我们直接将这个进程Kill掉, 才不会继续执行
#### 是否重新入队的理解
> 如果能重新入对, 那么保证了消息会被重新给新的消费者消费, 保证了消息一定能处理成功
> 如果不重新入对, 那么意味着消息会被丢弃, 消息不一定能处理成功, 不重新入对的情况下, 一般采用死信交换机解决消息丢失问题
##### 不ACK, NACK, REJECT的关系
> 如果不ACK, 那么就是不确认应答, 消息会一直存储在消息队列里面
> 如果NACK, 那么得分情况, 如果需要重新入对, 结果上和不ACK是同样的效果
> NACK和REJECT最大的区别是是否支持批量操作
##### deliverTag的说明
> DeliverTag本质上也是消息唯一ID, 但是他是在信道内单调底层的, 从1开始
##### 拒绝应答后, 为什么消息清空了?
> 因为拒绝应答后, 消息重新入队了, 被其他消费者成功处理了, 因此清空了
```yaml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
```