问题描述

线上系统中,使用 Skywalking 监视和追踪服务过程中,发现 RabbitMQ 的消费丢失 TID,导致在排查线上问题时遇到很多麻烦,不能及时发现问题。

2023-03-09 05:55:34.774 [service-trade] [TID:N/A] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.j.trade.consumer.OrderConsumer

尝试解决

@Trace 注解

添加依赖包:

<dependency>
    <groupId>org.apache.skywalking</groupId>
    <artifactId>apm-toolkit-trace</artifactId>
    <!-- 版本选择跟 skywalking 一致 -->
    <version>8.5.0</version>
</dependency>

在 RabbitMQ 消费方法上加上注解 @Trace

    @Trace
    @RabbitListener(queues = {"hong.queue.test"})
    public void testMessageConsumer(Channel channel, Message message) {
        String body = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("MQ消息消费:{}", body);
        // todo ......
    }

测试日志输出如下,发现输出日志都有 TID 了(至于为什么 TID 不一样,后期再去研究)。

2023-03-09 10:21:01.474 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  o.a.c.c.C.[Tomcat].[localhost].[/] -Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-03-09 10:21:01.474 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  o.s.web.servlet.DispatcherServlet -Initializing Servlet 'dispatcherServlet'
2023-03-09 10:21:01.476 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  o.s.web.servlet.DispatcherServlet -Completed initialization in 2 ms
2023-03-09 10:21:01.538 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  c.l.l.controller.TestController -测试开始......
2023-03-09 10:21:01.538 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  c.l.l.service.impl.TestServiceImpl -处理逻辑......
2023-03-09 10:21:01.613 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [http-nio-8082-exec-1] INFO  c.l.l.service.impl.TestServiceImpl -发送MQ消息:67461805-f78f-4432-8153-6673ceed269f
2023-03-09 10:21:01.632 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [async-thread-pool-1] INFO  c.l.l.service.impl.HandleServiceImpl -这是异步执行的......
2023-03-09 10:21:01.633 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.58.16783284614510001] [async-thread-pool-1] INFO  c.l.l.service.impl.ToolServiceImpl -这是子线程里面再调子线程的
2023-03-09 10:21:01.658 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.72.16783284616540001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.l.logtrace.consumer.TestConsumer -MQ消息消费:67461805-f78f-4432-8153-6673ceed269f
2023-03-09 10:21:01.659 [log-trace] [TID:6b32986b43ba450d838ceece1e300204.72.16783284616540001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.l.l.service.impl.ToolServiceImpl -MQ消费处理:67461805-f78f-4432-8153-6673ceed269f

不过存在两个小问题:

  1. 如果通过添加 @Trace 注解来解决,那么现有系统中很多MQ消费的方法上都要加上这个注解,这工作量还是有点大的,而且代码侵入性比较高;
  2. TID 不一致(又不像是问题,有时业务场景又需要区分MQ生产和消费,TID 一致反倒是个麻烦,看业务需要吧)。

不过针对问题1,可以通过其他方式来优化这个问题,比如使用增强或者使用插件在编译时加上 @Trace 注解,不过比较麻烦;针对问题2目前还没解决思路,有解决思路的大佬麻烦提供下。

自定义插件(支持Spring RabbitMQ)

以上两个问题不得不促使寻求新的解决方式,在 SkyWalking 自定义插件(Spring RabbitMQ)具体分析过程 看到了相关描述,大致思路是:在 Spring RabbitMQ Client 执行消费逻辑的方法上实现增强,于是尝试了一遍,发现可以正常追踪整条链路,问题解决。

Skywalking 官方提供的 RabbitMQ 插件存在缺陷,其只针对 RabbitMQ 官方原生 Client 实现扩展,但我们在项目中一般不直接使用原生 Client,而是使用Spring RabitMQ Client,因S pring RabitMQ Consumer 中存在跨线程操作,导致跟踪ID断链。

操作步骤

  1. 仿造 rabbitmq-plugin 新增 SpringRabbitMQConsumerInstrumentation
public class SpringRabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    @Override
    protected ClassMatch enhanceClass() {
        // 在 org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer 类中增强拦截
        return NameMatch.byName("org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer");
    }

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
                new DeclaredInstanceMethodsInterceptPoint() {
                    @Override
                    public ElementMatcher<MethodDescription> getMethodsMatcher() {
                        // 拦截 executeListener 方法,因为 Spring RabbitMQ 是在此方法执行消费逻辑的
                        return named("executeListener").and(takesArgumentWithType(0, "com.rabbitmq.client.Channel"));
                    }

                    @Override
                    public String getMethodsInterceptor() {
                        return "com.lanweihong.skywalking.apm.plugin.rabbitmq.SpringRabbitMQConsumerInterceptor";
                    }

                    @Override
                    public boolean isOverrideArgs() {
                        return false;
                    }
                }
        };
    }
}
  1. 新增 SpringRabbitMQConsumerInterceptor 类,用于 Spring RabbitMQ 执行消费逻辑前处理:
public class SpringRabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {

    public static final String OPERATE_NAME_PREFIX = "SpringRabbitMQ/";
    public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        ContextCarrier contextCarrier = new ContextCarrier();
        Message message = (Message) allArguments[1];
        MessageProperties properties = message.getMessageProperties();
        Channel channel = (Channel) allArguments[0];
        String url = channel.getConnection().getAddress().toString().replace("/", "") + ":" + channel.getConnection().getPort();
        AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" +
                properties.getReceivedExchange() + "Queue/" + properties.getReceivedRoutingKey() +
                CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
        Tags.MQ_BROKER.set(activeSpan, url);
        Tags.MQ_TOPIC.set(activeSpan, properties.getReceivedExchange());
        Tags.MQ_QUEUE.set(activeSpan, properties.getReceivedRoutingKey());
        activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
        SpanLayer.asMQ(activeSpan);
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            if (properties.getHeaders() != null && properties.getHeaders().get(next.getHeadKey()) != null) {
                next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
            }
        }
        ContextManager.extract(contextCarrier);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {
        ContextManager.stopSpan();
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  1. 在 resources 目录下新增文件 skywalking-plugin.def,内容如下:
spring-rabbitmq-5.x=com.lanweihong.skywalking.apm.plugin.rabbitmq.define.SpringRabbitMQConsumerInstrumentation
  1. 编译打包,并将 jar 包放到 Skywalking 目录下的 plugin 文件夹中,然后重启项目发送请求调试。

请求日志输出如下,可看到整个链路都有 TID 了。

2023-03-09 09:18:40.340 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  o.a.c.c.C.[Tomcat].[localhost].[/] -Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-03-09 09:18:40.340 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  o.s.web.servlet.DispatcherServlet -Initializing Servlet 'dispatcherServlet'
2023-03-09 09:18:40.341 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  o.s.web.servlet.DispatcherServlet -Completed initialization in 1 ms
2023-03-09 09:18:40.424 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  c.l.l.controller.TestController -测试开始......
2023-03-09 09:18:40.424 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  c.l.l.service.impl.TestServiceImpl -处理逻辑......
2023-03-09 09:18:40.479 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [http-nio-8082-exec-1] INFO  c.l.l.service.impl.TestServiceImpl -发送MQ消息:fe201f16-96c1-4860-a4db-e7cd2ee10d2c
2023-03-09 09:18:40.499 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [async-thread-pool-1] INFO  c.l.l.service.impl.HandleServiceImpl -这是异步执行的......
2023-03-09 09:18:40.499 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [async-thread-pool-1] INFO  c.l.l.service.impl.ToolServiceImpl -这是子线程里面再调子线程的
2023-03-09 09:18:40.531 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.l.logtrace.consumer.TestConsumer -MQ消息消费:fe201f16-96c1-4860-a4db-e7cd2ee10d2c
2023-03-09 09:18:40.531 [log-trace] [TID:eb09457235754221b7baac5954e8769c.59.16783247203170001] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] INFO  c.l.l.service.impl.ToolServiceImpl -MQ消费处理:fe201f16-96c1-4860-a4db-e7cd2ee10d2c

此方式不需要修改项目代码,不过需要运维人员重新部署 Skywalking 插件包。

总结

解决方式有两种:

  1. 使用 Skywalking 提供的注解 @Trace,对项目有一定的代码侵入性;
  2. 改造 Skywalking 提供的 rabbitmq 插件或自定义插件,添加对 Spring RabbitMQ 的支持,此方式对现有项目代码无侵入。

插件代码已上传Github:apm-sniffer-hong

参考文献及项目

  1. Skywalking中RabbitMQ消费链路被隔断
  2. wxf / apm-sniffer-pro
文章目录