Description or Example
# 日志架构
## 为什么需要日志?
> 因为消息在传输过程中可能会丢失, 因此需要记录日志
## 知识点
### 将mq抽取一个微服务的若干原因
1. **抽取了mq微服务, 发送消息成功与其他微服务解耦, 如果对发送消息进行进一步封装将会非常的简单**
## 为什么使用MongoDB
1. 日志绝大部分是一些无用数据,且数据与数据之间并没有强烈的关系,因此,不需要MySQL这样的的关系型数据库
2. MongoDB以文档的形式存储数据,对于日志可读需求强的数据非常适合
3. 不适合用MySQL作为日志存储,因为日志的数据量大,会导致MySQL对日日志的查询效率变得极低,导致数据库整体压力大,系统性能下滑
4. 不适合用Redis,因为这样日志会耦合其他功能, Redis性能会被拖累
5. MongoDB是文档类型数据库,轻量级,适合日志记录
## Docker安装MongoDB
```shell
docker run --name mongodb -d -p 27017:27017 \
--privileged=true \
-v /bitmail/mongodb/data:/data/db \
--restart=always \
mongo
```
### 创建用户名和密码

### 注意
> **直接使用上面的代码即可, 官方文档的docker创建会存在两个问题, 一个是环境变量的过期, 一个是容器名称的错误**
[参考地址](https://www.mongodb.com/zh-cn/compatibility/docker)
## MongoDB整合Springboot
### 1. 引入依赖
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
```
### 2. 配置文件
```java
data:
mongodb:
host: 192.168.10.131
port: 27017
username: root
password: root
```
### 3. 创建数据库
> 直接在navicat创建数据库即可
### 4. CURD测试
[原生API参考文档](https://mongodb.net.cn/manual/crud/)
[整合Springboot的blog](https://blog.csdn.net/qq_46112274/article/details/117425532)
[springbootAPI的参考blog](https://juejin.cn/post/7222676391464828986)
# 整合业务
## 声明文档和集合
```java
@Document(collection = "bitmall_mq_message") // 声明集合的名字, 如果集合不存在会自动创建
@Data
@Accessors(chain = true)
public class MessageLog {
/**
* 消息的ID
*/
@Id
private String messageId;
/**
* 消息的内容
*/
private String content;
/**
* 消息发送的目标交换机
*/
@Field("to_exchange") // 指定映射关系
private String toExchange;
/**
* 目标的路由键
*/
@Field("routing_key")
private String routingKey;
/**
* 消息状态
*/
@Field("message_status")
private Integer messageStatus;
/**
* 创建时间
*/
@Field("create_time")
private DateTime createTime;
/**
* 更新时间
*/
@Field("update_time")
private DateTime updateTime;
/**
* 消息的类型, 否则发送消息的时候不知道发什么类型的消息
*/
@Field("clazz")
private String clazz;
}
```

## 自定义消息的状态
```java
public enum MsgStatus {
MESSAGE_NEW(0, "已发送状态"),
MESSAGE_SEND(1, "已抵达MQ"),
MESSAGE_UN_CATCH(2, "未抵达队列"),
MESSAGE_UN_ACK(3, "成功处理消息");
private Integer code;
private String msg;
MsgStatus(Integer code, String msg) {
this.code = code;
this.msg = msg;
}
public Integer getCode() {
return code;
}
public String getMsg() {
return msg;
}
}
```
# BUG修复
## 消息队列里面的消息不能被监听器正确处理
> 在没有引入日志系统之前, 是一切正常的, 但是, 一旦引入了日志系统, 就会出现如下情况
```java
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'no match' threw exception
```
### 根本原因
> **因为我们在发送消息的时候会封装一个`Message`对象, 一般我们去`new`这个对象都需要传入一个`byte[]`字节数组, 之前时使用了`JSON.toJSONByte()`方法, 因此, content-type全都是`application/json`, 但是, 这种方法是不可取的, 因为这样会导致消息不能被区分开**
> **因此, 我们看向源码**


## 解决数据库时区问题
### MySQL
> 修改`my.cnf`文件
```text
[mysqld]
default-time-zone=+08:00
```
### mongoDB
> **mongoDB不需要解决时区问题, 因为mongoDB读取到Java对象的时候, 已经是东八区的时间了, 在navicat显示的时间是标准时间**
> **即时间读取过来会自动变成东八区时间, 不需要解决时区问题**
### 消息重试带来的问题
#### 是否有必要消息重新入队
> **从消息丢失上的角度来说, 消息不重新入队是完全没有问题的, 因为日志数据库已经记录了该消息, 但是, 如果不重新入队, 那么这个消息被处理的就会面临长时间等待才能被处理, 用户体验极差, 因此, 还是需要重新入队**
#### 怎么解决这个问题
> **可以通过配置文件解决这个问题**
```yaml
spring:
rabbitmq:
listener:
retry:
enabled: true # 开启重试
max-attempts: 3# 最大重试次数
```
> **如果不开启重试, 监听器会陷入一个奇怪的死循环中, 监听器处理失败, 进入队列, 监听器处理失败....**
> <font color='red'>**如果设置了该参数, 如果当前监听器监听失败, 当前监听器不再重试, 交予其他监听器重试, 一共重试3次**</font>
# 日志更新与创建代码
```java
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return (correlationData, ack, cause) -> {
// 唯一标识和消息的id一直
if (correlationData != null) {
String messageId = correlationData.getId();
sendMsgUtil.callbackUpdate(messageId, MQConstant.MsgStatus.MESSAGE_ATTACH_MQ.getCode());
}
};
}
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return (message, replyCode, replyText, exchange, routingKey) -> {
// 获取消息的ID
String messageId = message.getMessageProperties().getMessageId();
sendMsgUtil.callbackUpdate(messageId, MQConstant.MsgStatus.MESSAGE_UN_CATCH_QUEUE.getCode());
};
}
```
```java
@Component
public class SendMsgUtil {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
public void convertAndSendAndSaveLog(String toExchange, String toRoutingKey, Object messageBody) {
// 封装MessageProperties, 主要是想要获取消息的id
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString().replace("-", ""));
messageProperties.setReceivedExchange(toExchange);
messageProperties.setReceivedRoutingKey(toRoutingKey);
// 这里构建消息需要充分区分消息
Message message = rabbitTemplate.getMessageConverter().toMessage(messageBody, messageProperties);
// 继续发送消息, 取消锁定
rabbitTemplate.convertAndSend(toExchange, toRoutingKey, message, processor -> {
// 发消息前记录日志
MessageLog messageLog = new MessageLog();
MessageProperties properties = processor.getMessageProperties();
messageLog.setMessageId(properties.getMessageId())
.setToExchange(properties.getReceivedExchange())
.setRoutingKey(properties.getReceivedRoutingKey())
.setContent(JSON.toJSONString(messageBody))
.setMessageStatus(MQConstant.MsgStatus.MESSAGE_SEND.getCode())
.setClazz(messageBody.getClass().getName())
.setCreateTime(new DateTime());
mongoTemplate.insert(messageLog);
return message;
}, new CorrelationData(messageProperties.getMessageId()));
}
public void callbackUpdate(String messageId, Integer status) {
MessageLog messageLog = mongoTemplate.findById(messageId, MessageLog.class);
if (messageLog != null) {
messageLog.setMessageStatus(status)
.setUpdateTime(new DateTime());
mongoTemplate.save(messageLog);
}
}
public void finish(String messageId) {
MessageLog messageLog = mongoTemplate.findById(messageId, MessageLog.class);
if (messageLog != null) {
messageLog.setMessageStatus(MQConstant.MsgStatus.MESSAGE_ACK.getCode())
.setUpdateTime(new DateTime());
mongoTemplate.save(messageLog);
}
}
}
```
```java
@Configuration
@EnableScheduling
@EnableAsync
public class ScheduleConfig {
}
```
```java
@Component
public class RetrySendSchedule {
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
@Scheduled(cron = "0 0 0 * * ?")
@Async
public void retry() throws ClassNotFoundException {
List<MessageLog> messageLogs = mongoTemplate.find(new Query()
.addCriteria(Criteria
.where("message_status")
.in(MQConstant.MsgStatus.MESSAGE_SEND.getCode(), MQConstant.MsgStatus.MESSAGE_UN_CATCH_QUEUE.getCode())),
MessageLog.class
);
if (!messageLogs.isEmpty()) {
for (MessageLog messageLog : messageLogs) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(messageLog.getMessageId());
messageProperties.setReceivedExchange(messageLog.getToExchange());
messageProperties.setReceivedRoutingKey(messageLog.getRoutingKey());
String content = messageLog.getContent();
Message message = rabbitTemplate.getMessageConverter().toMessage(JSON.parseObject(content, Class.forName(messageLog.getClazz())), messageProperties);
rabbitTemplate.convertAndSend(messageLog.getToExchange(), messageLog.getRoutingKey(),
message, new CorrelationData(messageLog.getMessageId()));
// 重新发送消息不需要记录日志
}
}
}
}
```
## 为什么重新发送消息不需要记录日志
> **因为现在是重新发送, 原来就有对应的日志了, 如果还创建一个日志, 假设当前不成功, 那么需要重新发送的消息就会指数增加, 但是绝大部分都是没用的, 因此不需要记录日志**