问题描述
线上系统中,使用 Skywalking 监视和追踪服务过程中,发现 RabbitMQ 的消费丢失 TID,导致在排查线上问题时遇到很多麻烦,不能及时发现问题。
2023-03-09 05:55:34.774 [service-trade] [TID:N/A] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.j.trade.consumer.OrderConsumer
尝试解决
@Trace 注解
添加依赖包:
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<!-- 版本选择跟 skywalking 一致 -->
<version>8.5.0</version>
</dependency>
在 RabbitMQ 消费方法上加上注解 @Trace
@Trace
@RabbitListener(queues = {"hong.queue.test"})
public void testMessageConsumer(Channel channel, Message message) {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("MQ消息消费:{}", body);
// todo ......
}
测试日志输出如下,发现输出日志都有 TID 了(至于为什么 TID 不一样,后期再去研究)。
2023-03-09 10:21:01.474 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO o.a.c.c.C.[Tomcat].[localhost].[/] -Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-03-09 10:21:01.474 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO o.s.web.servlet.DispatcherServlet -Initializing Servlet 'dispatcherServlet'
2023-03-09 10:21:01.476 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO o.s.web.servlet.DispatcherServlet -Completed initialization in 2 ms
2023-03-09 10:21:01.538 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO c.l.l.controller.TestController -测试开始......
2023-03-09 10:21:01.538 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO c.l.l.service.impl.TestServiceImpl -处理逻辑......
2023-03-09 10:21:01.613 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO c.l.l.service.impl.TestServiceImpl -发送MQ消息:67461805-f78f-4432-8153-6673ceed269f
2023-03-09 10:21:01.632 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [async-thread-pool-1] INFO c.l.l.service.impl.HandleServiceImpl -这是异步执行的......
2023-03-09 10:21:01.633 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [async-thread-pool-1] INFO c.l.l.service.impl.ToolServiceImpl -这是子线程里面再调子线程的
2023-03-09 10:21:01.658 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.72.16783284616540001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.l.logtrace.consumer.TestConsumer -MQ消息消费:67461805-f78f-4432-8153-6673ceed269f
2023-03-09 10:21:01.659 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.72.16783284616540001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.l.l.service.impl.ToolServiceImpl -MQ消费处理:67461805-f78f-4432-8153-6673ceed269f
不过存在两个小问题:
- 如果通过添加
@Trace
注解来解决,那么现有系统中很多MQ消费的方法上都要加上这个注解,这工作量还是有点大的,而且代码侵入性比较高; - TID 不一致(又不像是问题,有时业务场景又需要区分MQ生产和消费,TID 一致反倒是个麻烦,看业务需要吧)。
不过针对问题1,可以通过其他方式来优化这个问题,比如使用增强或者使用插件在编译时加上 @Trace
注解,不过比较麻烦;针对问题2目前还没解决思路,有解决思路的大佬麻烦提供下。
自定义插件(支持Spring RabbitMQ)
以上两个问题不得不促使寻求新的解决方式,在 SkyWalking 自定义插件(Spring RabbitMQ)具体分析过程 看到了相关描述,大致思路是:在 Spring RabbitMQ Client 执行消费逻辑的方法上实现增强,于是尝试了一遍,发现可以正常追踪整条链路,问题解决。
Skywalking 官方提供的 RabbitMQ 插件存在缺陷,其只针对 RabbitMQ 官方原生 Client 实现扩展,但我们在项目中一般不直接使用原生 Client,而是使用Spring RabitMQ Client,因S pring RabitMQ Consumer 中存在跨线程操作,导致跟踪ID断链。
操作步骤
- 仿造 rabbitmq-plugin 新增
SpringRabbitMQConsumerInstrumentation
:
public class SpringRabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
@Override
protected ClassMatch enhanceClass() {
// 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer 类中增强拦截
return NameMatch.byName("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer");
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new DeclaredInstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// 拦截 executeListener 方法,因为 Spring RabbitMQ 是在此方法执行消费逻辑的
return named("executeListener").and(takesArgumentWithType(0, "com.rabbitmq.client.Channel"));
}
@Override
public String getMethodsInterceptor() {
return "com.lanweihong.skywalking.apm.plugin.rabbitmq.SpringRabbitMQConsumerInterceptor";
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
- 新增
SpringRabbitMQConsumerInterceptor
类,用于 Spring RabbitMQ 执行消费逻辑前处理:
public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "SpringRabbitMQ/";
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
Message message = (Message) allArguments[1];
MessageProperties properties = message.getMessageProperties();
Channel channel = (Channel) allArguments[0];
String url = channel.getConnection().getAddress().toString().replace("/", "") + ":" + channel.getConnection().getPort();
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" +
properties.getReceivedExchange() + "Queue/" + properties.getReceivedRoutingKey() +
CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_TOPIC.set(activeSpan, properties.getReceivedExchange());
Tags.MQ_QUEUE.set(activeSpan, properties.getReceivedRoutingKey());
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
if (properties.getHeaders() != null && properties.getHeaders().get(next.getHeadKey()) != null) {
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
}
}
ContextManager.extract(contextCarrier);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
- 在 resources 目录下新增文件
skywalking-plugin.def
,内容如下:
spring-rabbitmq-5.x=com.lanweihong.skywalking.apm.plugin.rabbitmq.define.SpringRabbitMQConsumerInstrumentation
- 编译打包,并将 jar 包放到 Skywalking 目录下的 plugin 文件夹中,然后重启项目发送请求调试。
请求日志输出如下,可看到整个链路都有 TID 了。
2023-03-09 09:18:40.340 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO o.a.c.c.C.[Tomcat].[localhost].[/] -Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-03-09 09:18:40.340 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO o.s.web.servlet.DispatcherServlet -Initializing Servlet 'dispatcherServlet'
2023-03-09 09:18:40.341 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO o.s.web.servlet.DispatcherServlet -Completed initialization in 1 ms
2023-03-09 09:18:40.424 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO c.l.l.controller.TestController -测试开始......
2023-03-09 09:18:40.424 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO c.l.l.service.impl.TestServiceImpl -处理逻辑......
2023-03-09 09:18:40.479 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO c.l.l.service.impl.TestServiceImpl -发送MQ消息:fe201f16-96c1-4860-a4db-e7cd2ee10d2c
2023-03-09 09:18:40.499 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [async-thread-pool-1] INFO c.l.l.service.impl.HandleServiceImpl -这是异步执行的......
2023-03-09 09:18:40.499 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [async-thread-pool-1] INFO c.l.l.service.impl.ToolServiceImpl -这是子线程里面再调子线程的
2023-03-09 09:18:40.531 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.l.logtrace.consumer.TestConsumer -MQ消息消费:fe201f16-96c1-4860-a4db-e7cd2ee10d2c
2023-03-09 09:18:40.531 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO c.l.l.service.impl.ToolServiceImpl -MQ消费处理:fe201f16-96c1-4860-a4db-e7cd2ee10d2c
此方式不需要修改项目代码,不过需要运维人员重新部署 Skywalking 插件包。
总结
解决方式有两种:
- 使用 Skywalking 提供的注解
@Trace
,对项目有一定的代码侵入性; - 改造 Skywalking 提供的 rabbitmq 插件或自定义插件,添加对 Spring RabbitMQ 的支持,此方式对现有项目代码无侵入。
插件代码已上传Github:apm-sniffer-hong