本文通过介绍分布式应用下各个场景的全局日志ID透传思路,以及介绍分布式日志追踪ID简单实现原理和实战效果,从而达到通过提高日志查询排查问题的效率。
开发排查系统问题用得最多的手段就是查看系统日志,相信不少人都值过班当过小秘吧:给下接口和出入参吧,麻烦看看日志里的有没有异常信息啊等等,但是在并发大时使用日志定位问题还是比较麻烦,由于大量的其他用户/其他线程的日志也一起输出穿行其中导致很难筛选出指定请求的全部相关日志,以及下游线程/服务对应的日志,甚至一些特殊场景的出入参只打印了一些诸如gis坐标、四级地址等没有单据信息的日志,使得日志定位起来非常不便
自己所在组负责的系统主要是web应用,其中涉及到的请求方式主要有:springmvc的servlet的http场景、jsf场景、MQ场景、resteasy场景、clover场景、easyjob场景,每一种场景都需要不同的方式进行logTraceId的透传,接下来逐个探析上述各个场景的透传方案。
在这之前我们先要简单了解一下日志中透传和打印logTraceId的方式,一般我们使用MDC进行logTraceId的透传与打印,但是基于MDC内部使用的是ThreadLocal所以只有本线程才有效,子线程服务的MDC里的值会丢失,所以这里我们要么是在所有涉及到父子线程的地方以编码侵入式自行实现值的传递,要么就是通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题,而本文采用的是比较粗糙地以编码侵入式来解决此问题。
这个场景相信大家都已经烂熟到骨子里了,主要思路是通过拦截器的方式进行logTraceId的透传,新建一个类实现HandlerInterceptor
preHandle:在业务处理器处理请求之前被调用,这里实现logTraceId的设置与透传
postHandle:在业务处理器处理请求执行完成后,生成视图之前执行,这里空实现就好
afterCompletion:在DispatcherServlet完全处理完请求后被调用,这里用于清除MDC的logTraceId
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @Slf4j public class TraceInterceptor implements HandlerInterceptor { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object o) throws Exception { try { String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, TraceUtils.getTraceId()); } }catch (RuntimeException e){ log.error( "mvc自定义log跟踪拦截器执行异常" ,e); } return true; } @Override public void postHandle(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(javax.servlet.http.HttpServletRequest httpServletRequest, javax.servlet.http.HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception { try { MDC.clear(); }catch (RuntimeException ex){ log.error( "mvc自定义log跟踪拦截器执行异常" ,ex); } } } |
相信大家对于jsf并不陌生,而jsf也支持自定义filter,基于jsf过滤器的运行方式(如下图),可以通过配置全局过滤器(继承AbstractFilter)的方式进行logTraceId的透传,需要注意的是jsf是在线程池中执行的所以一定要信任消息体中的logTraceId
jsf消费者过滤器:主要从上下文环境中获取logTraceId并进行透传,实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | @Slf4j public class TraceIdGlobalJsfFilter extends AbstractFilter { @Override public ResponseMessage invoke(RequestMessage requestMessage) { / / 设置traceId setAndGetTraceId(requestMessage); try { return this.getNext().invoke(requestMessage); } finally { } } / * * * 设置并返回traceId * @param requestMessage * @ return * / private void setAndGetTraceId(RequestMessage requestMessage) { try { String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(logTraceId) && logTraceIdObj = = null){ / / 如果 filter 和MDC都没有获取到则说明有遗漏,打印日志 if (log.isDebugEnabled()){ log.debug( "jsf消费者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}" , JSON.toJSONString(requestMessage)); } } else if (StringUtils.isBlank(logTraceId) && logTraceIdObj ! = null) { / / 如果MDC没有, filter 有,打印日志 if (log.isDebugEnabled()){ log.debug( "jsf消费者自定义log跟踪拦截器预警,MDC没有filter有traceId,jsf信息:{}" , JSON.toJSONString(requestMessage)); } } else if (StringUtils.isNotBlank(logTraceId) && logTraceIdObj = = null){ / / 如果MDC有, filter 没有,说明是源头已经有了,但是jsf是第一次调,透传 requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, logTraceId); } else if (StringUtils.isNotBlank(logTraceId) && logTraceIdObj ! = null){ / / MDC和fitler都有,但是并不相等,则存在问题打印日志 if (log.isDebugEnabled()){ log.debug( "jsf消费者自定义log跟踪拦截器预警,MDC和filter都有traceId,jsf信息:{}" , JSON.toJSONString(requestMessage)); } } }catch (RuntimeException e){ log.error( "jsf消费者自定义log跟踪拦截器执行异常" ,e); } } } |
jsf提供者过滤器:通过拿到消费者在消息体中透传的logTraceId来实现,实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | @Slf4j public class TraceIdGlobalJsfProducerFilter extends AbstractFilter { @Override public ResponseMessage invoke(RequestMessage requestMessage) { / / 设置traceId boolean isNeedClearMdc = transferTraceId(requestMessage); try { return this.getNext().invoke(requestMessage); } finally { if (isNeedClearMdc){ clear(); } } } / * * * 设置并返回traceId * @param requestMessage * @ return * / private boolean transferTraceId(RequestMessage requestMessage) { boolean isNeedClearMdc = false; try { String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); Object logTraceIdObj = requestMessage.getInvocationBody().getAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(logTraceId) && logTraceIdObj = = null){ / / 如果 filter 和MDC都没有获取到,说明存在遗漏场景或是提供给外部系统调用的接口,打印日志进行观察 String traceId = TraceUtils.getTraceId(); MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,traceId); requestMessage.getInvocationBody().addAttachment(LogConstants.JSF_LOG_TRACE_ID_KEY, traceId); if (log.isDebugEnabled()){ log.debug( "jsf生产者自定义log跟踪拦截器预警,filter和MDC都没有traceId,jsf信息:{}" , JSON.toJSONString(requestMessage)); } isNeedClearMdc = true; } else if (StringUtils.isBlank(logTraceId) && logTraceIdObj ! = null) { / / 如果MDC没有, filter 有,说明是被调用方,需要透传下去 MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceIdObj.toString()); isNeedClearMdc = true; } else if (StringUtils.isNotBlank(logTraceId) && logTraceIdObj = = null){ / / 如果MDC有, filter 没有,存在问题,打印日志 if (log.isDebugEnabled()){ log.debug( "jsf生产者自定义log跟踪拦截器预警,MDC有filter没有traceId,jsf信息:{}" , JSON.toJSONString(requestMessage)); } isNeedClearMdc = true; } else if (StringUtils.isNotBlank(logTraceId) && logTraceIdObj ! = null && !logTraceId.equals(logTraceIdObj.toString())){ / / MDC和fitler都有,但是并不相等,则信任 filter 透传结果 TraceUtils.resetTraceId(logTraceIdObj.toString()); if (log.isDebugEnabled()){ log.debug( "jsf生产者自定义log跟踪拦截器预警,MDC和fitler都有traceId,但是并不相等,jsf信息:{}" , JSON.toJSONString(requestMessage)); } } return isNeedClearMdc; }catch (RuntimeException e){ log.error( "jsf生产者自定义log跟踪拦截器执行异常" ,e); return false; } } / * * * 清除MDC * / private void clear() { try { MDC.clear(); }catch (RuntimeException e){ log.error( "jsf生产者自定义log跟踪拦截器执行异常" ,e); } } } |
说到MQ相信大家对于此就更不陌生了,此种场景主要通过在提供者发送消息时拿到上下文中的logTraceId,将其以扩展信息的方式设置进消息体中进行透传,而消费者则从消息体中进行获取
生产者:新建一个抽象类继承MessageProducer,覆写父类中的两个send方法(批量发送、单条发送),send方法中主要调用抽象加工消息体的方法(logTraceId属性赋值)和日志打印,在子类中进行发送前对消息体的加工处理,具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | @Slf4j public abstract class BaseTraceIdProducer extends MessageProducer { private static final String SEPARATOR_COMMA = "," ; public BaseTraceIdProducer() { } public BaseTraceIdProducer(TransportManager transportManager) { super (transportManager); } / * * * 获取消息体 - 单个 * @param messageContext * @ return * / protected abstract Message getMessage(MessageContext messageContext); / * * 获取消息体 - 批量 * * @param messageContext * @ return * / protected abstract List <Message> getMessages(MessageContext messageContext); / * * * 填充消息体上下文信息 * @param message * @param messageContext * / protected void fillContext(Message message,MessageContext messageContext) { if (message = = null){ return ; } if (StringUtils.isBlank(messageContext.getLogTraceId())){ String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY); messageContext.setLogTraceId(logTraceId); } if (StringUtils.isBlank(messageContext.getTopic())){ String topic = message.getTopic(); messageContext.setTopic(topic); } String businessId = message.getBusinessId(); messageContext.getBusinessIdBuf().append(SEPARATOR_COMMA).append(businessId); } / * * * traceId嵌入消息体中 * @param message * / protected void generateTraceIdIntoMessage(Message message){ if (message = = null){ return ; } try { String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(logTraceId)){ logTraceId = TraceUtils.getTraceId(); MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId); } message.setAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY,logTraceId); }catch (RuntimeException e){ log.error( "jmq2自定义log跟踪拦截器执行异常" ,e); } } / * * * 批量发送消息 - 无回调 * @param messages * @param timeout * @throws JMQException * / public void send( List <Message> messages, int timeout) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessages(messages); List <Message> messageList = this.getMessages(messageContext); / / 打印日志,方便排查问题 printLog(messageContext); super .send(messageList, timeout); } / * * * 单个发送消息 * @param message * @param transaction * @param <T> * @ return * @throws JMQException * / public <T> T send(Message message, LocalTransaction<T> transaction) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessage(message); Message msg = this.getMessage(messageContext); / / 打印日志,方便排查问题 printLog(messageContext); return super .send(msg, transaction); } / * * * 批量发送消息 - 有回调 * @param messages * @param timeout * @param callback * @throws JMQException * / public void send( List <Message> messages, int timeout, AsyncSendCallback callback) throws JMQException { MessageContext messageContext = new MessageContext(); messageContext.setMessages(messages); List <Message> messageList = this.getMessages(messageContext); / / 打印日志,方便排查问题 printLog(messageContext); super .send(messageList, timeout, callback); } / * * * 打印日志,方便排查问题 * @param messageContext * / private void printLog(MessageContext messageContext) { if (messageContext = = null){ return ; } if (log.isInfoEnabled()){ log.info( "MQ发送:traceId:{},topic:{},businessIds:[{}]" ,messageContext.getLogTraceId(),messageContext.getTopic(),messageContext.getBusinessIdBuf() = = null?"":messageContext.getBusinessIdBuf().toString()); } } } @Slf4j public class TraceIdEnvMessageProducer extends BaseTraceIdProducer { private static final String UAT_TRUE = String.valueOf(true); private boolean uat = false; public TraceIdEnvMessageProducer() { } public TraceIdEnvMessageProducer(TransportManager transportManager) { super (transportManager); } / * * * 环境变量打标 - 单个消息体 * @param message * / private void convertUatMessage(Message message) { if (message ! = null) { message.setAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT, UAT_TRUE); } } / * * * 消息转换 - 批量消息体 * @param messageContext * @ return * / private List <Message> convertMessages(MessageContext messageContext) { List <Message> messages = messageContext.getMessages(); if (!CollectionUtils.isEmpty(messages)) { Iterator messageIterator = messages.iterator(); while (messageIterator.hasNext()) { Message message = (Message)messageIterator. next (); if (this.isUat()){ this.convertUatMessage(message); } super .generateTraceIdIntoMessage(message); super .fillContext(message,messageContext); } } return messageContext.getMessages(); } / * * * 消息转换 - 单个消息体 * @param messageContext * @ return * / private Message convertMessage(MessageContext messageContext){ Message message = messageContext.getMessage(); if (this.isUat()){ this.convertUatMessage(message); } super .generateTraceIdIntoMessage(message); super .fillContext(message,messageContext); return message; } protected Message getMessage(MessageContext messageContext) { if (log.isDebugEnabled()){ log.debug( "current environment is UAT : {}" , this.isUat()); } return this.convertMessage(messageContext); } protected List <Message> getMessages(MessageContext messageContext) { if (log.isDebugEnabled()){ log.debug( "current environment is UAT : {}" , this.isUat()); } return this.convertMessages(messageContext); } public void setUat(boolean uat) { this.uat = uat; } boolean isUat() { return this.uat; } } |
消费者:新建一个抽象类继承MessageListener,覆写父类中的onMessage方法,主要进行设置日志traceId和消费完成后的traceId清理等,而在子类中进行一些自定义处理,具体代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | @Slf4j public abstract class BaseTraceIdMessageListener implements MessageListener { public BaseTraceIdMessageListener() { } public abstract void onMessageList( List <Message> messages) throws Exception; @Override public final void onMessage( List <Message> messages) throws Exception { try { if (CollectionUtils.isEmpty(messages)){ return ; } / / 设置日志traceId setLogTraceId(messages); this.onMessageList(messages); / / 消费完后清除traceId clear(); }catch (Exception e){ throw e; } finally { MDC.clear(); } } / * * * 设置日志traceId * @param messages * / private void setLogTraceId( List <Message> messages) { try { Message message = messages.get( 0 ); String logTraceId = message.getAttribute(LogConstants.JMQ2_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(logTraceId)){ logTraceId = TraceUtils.getTraceId(); } MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY,logTraceId); }catch (RuntimeException e){ log.error( "jmq2自定义log跟踪拦截器执行异常" ,e); } } / * * * 清除traceId * / private void clear() { try { MDC.clear(); }catch (RuntimeException e){ log.error( "jmq2自定义log跟踪拦截器执行异常" ,e); } } } @Slf4j public abstract class TraceIdEnvMessageListener extends BaseTraceIdMessageListener{ private String uat; public TraceIdEnvMessageListener() { } public abstract void onMessages( List <Message> var1) throws Exception; @Override public void onMessageList( List <Message> messages) throws Exception { Iterator iterator; Message message; if (this.getUat() ! = null && Boolean.valueOf(this.getUat())) { iterator = messages.iterator(); while (true) { while (iterator.hasNext()) { message = (Message)iterator. next (); if (message ! = null && Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) { this.onMessages(Arrays.asList(message)); } else { log.debug( "Ignore message: [BusinessId: {}, Text: {}]" , message.getBusinessId(), message.getText()); } } return ; } } else if (this.getUat() ! = null && !Boolean.valueOf(this.getUat())) { iterator = messages.iterator(); while (true) { while (iterator.hasNext()) { message = (Message)iterator. next (); if (message ! = null && !Boolean.valueOf(message.getAttribute(SplitMessage.JMQ_SPLIT_KEY_IS_UAT))) { this.onMessages(Arrays.asList(message)); } else { log.debug( "Ignore message: [BusinessId: {}, Text: {}]" , message.getBusinessId(), message.getText()); } } return ; } } else { this.onMessages(messages); } } public void setUat(String uat) { if (! "true" .equals(uat) && ! "false" .equals(uat)) { throw new IllegalArgumentException( "uat 属性值只能为 true 或 false." ); } else { this.uat = uat; } } public String getUat() { return this.uat; } } |
此场景类似于spinrg-mvc场景,也是http请求,需要通过拦截器在消息头中进行logTraceId的透传,主要有客户端拦截器,服务端:预处理拦截器、后置拦截器,代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 | @ClientInterceptor @Provider @Slf4j public class ResteasyClientInterceptor implements ClientExecutionInterceptor { @Override public ClientResponse execute(ClientExecutionContext clientExecutionContext) throws Exception { try { String logTraceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); ClientRequest request = clientExecutionContext.getRequest(); String headerTraceId = request.getHeaders().getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(logTraceId) && StringUtils.isBlank(headerTraceId)){ / / 如果 filter 和MDC都没有获取到则说明是调用源头 String traceId = TraceUtils.getTraceId(); TraceUtils.resetTraceId(traceId); request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,traceId); } else if (StringUtils.isBlank(headerTraceId)){ / / 如果MDC有但是 filter 没有则需要传递 request.header(LogConstants.HEADER_LOG_TRACE_ID_KEY,logTraceId); } }catch (RuntimeException e){ log.error( "resteasy客户端log跟踪拦截器执行异常" ,e); } return clientExecutionContext.proceed(); } } @Slf4j @Provider @ServerInterceptor public class RestEasyPreInterceptor implements PreProcessInterceptor { @Override public ServerResponse preProcess(HttpRequest request, ResourceMethod resourceMethod) throws Failure, WebApplicationException { try { MultivaluedMap<String, String> requestHeaders = request.getHttpHeaders().getRequestHeaders(); String headerTraceId = requestHeaders.getFirst(LogConstants.HEADER_LOG_TRACE_ID_KEY); if (StringUtils.isNotBlank(headerTraceId)){ / / 如果 filter 则透传 TraceUtils.resetTraceId(headerTraceId); } }catch (RuntimeException e){ log.error( "resteasy服务端log跟踪前置拦截器执行异常" ,e); } return null; } } @Slf4j @Provider @ServerInterceptor public class ResteasyPostInterceptor implements PostProcessInterceptor { @Override public void postProcess(ServerResponse serverResponse) { try { MDC.clear(); }catch (RuntimeException e){ log.error( "resteasy服务端log跟踪后置拦截器执行异常" ,e); } } } |
clover的大体机制主要是在项目启动的时候扫描到带有注解@HessianWebService的类进行服务注册并维持心跳检测,而clover端则通过servlet请求方式进行任务的回调,同时继承AbstractScheduleTaskProcess方式的任务是以线程池的方式进行业务的处理
基于上述原理我们需要解决两个问题:1.新建一个类继承ServiceExporterServlet,并在web.xml配置中进行servlet配置,代码如下;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | @Slf4j public class ServiceExporterTraceIdServlet extends ServiceExporterServlet { @Override public void service(ServletRequest req, ServletResponse res) throws ServletException, IOException { try { String traceId = MDC.get( "traceId" ); if (StringUtils.isBlank(traceId)) { MDC.put( "traceId" , TraceUtils.getTraceId()); } } catch (Exception e) { log.error( "clover请求servlet执行异常" , e); } try { super .service(req, res); } catch (Throwable e) { log.error( "clover请求servlet执行异常" , e); throw e; } finally { try { MDC.clear(); }catch (RuntimeException ex){ log.error( "clover请求servlet执行异常" ,ex); } } } } |
2.新建一个抽象类继承AbstractScheduleTaskProcess,在类中以编码形式进行父子线程的透传(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),所有任务均改为继承此类,关键代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | try { traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { log.warn( "clover自定义log跟踪拦截器预警,mdc没有traceId" ); } }catch (RuntimeException e){ log.error( "clover自定义log跟踪拦截器执行异常" ,e); } final String logTraceId = traceId; while (iterator.hasNext()) { final List <TcTask> list = ( List <TcTask>)iterator. next (); this.executor.submit(new Callable < Object >() { public Object call() throws Exception { try { if (StringUtils.isNotBlank(logTraceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, logTraceId); } }catch (RuntimeException e){ log.error( "clover自定义log跟踪拦截器执行异常" ,e); } Object var1; try { if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) { BaseTcTaskProcessWorker.logger.info( "正在执行任务[" + this.getClass().getName() + "],条数:" + list .size() + "..." ); } BaseTcTaskProcessWorker.this.executeTasks( list ); if (BaseTcTaskProcessWorker.logger.isInfoEnabled()) { BaseTcTaskProcessWorker.logger.info( "执行任务[" + this.getClass().getName() + "],条数:" + list .size() + "成功!" ); } var1 = null; } catch (Exception var5) { BaseTcTaskProcessWorker.logger.error(var5.getMessage(), var5); throw var5; } finally { try { MDC.clear(); }catch (RuntimeException ex){ log.error( "clover自定义log跟踪拦截器执行异常" ,ex); } latch.countDown(); } return var1; } }); } |
easyjob的大体机制是在项目启动的时候通过扫描实现接口Scheduler的类进行上报注册,同时启动一个acceptor(获取任务的线程池),而acceptor拉取到任务后会将父任务放进一个叫executor的线程池,子任务范进一个叫slowExecutor的线程池,我们可以新建一个抽奖类实现接口ScheduleFlowTask,复用clover场景硬编码方式进行父子线程logTraceId的透传处理(可优化:通过覆写MDCAdapter:通过阿里的TransmittableThreadLocal来解决父子线程传递问题),示例代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 | @Slf4j public abstract class AbstractEasyjobOnlyScheduleProcess<T> implements ScheduleFlowTask { / * * * EASYJOB平台UMP监控key前缀 * / private static final String EASYJOB_UMP_KEY_RREFIX = "trans.easyjob.dotask." ; / * * * EASYJOB单个任务处理分布式锁前缀 * / private static final String EASYJOB_SINGLE_TASK_LOCK_PREFIX = "basic_easyjob_single_task_lock_prefix_" ; / * * * 环境标识 - 开关配置进行环境隔离 * / @Value ( "${spring.profiles.active}" ) private String activeEnv; @Value ( "${task.scene.mark}" ) private String sceneMark = TaskSceneMarkEnum.PRODUCTION.getDesc(); / * * * easyJob维度线程池变量 * / private ThreadPoolExecutor easyJobExecutor; / * * * easyJob维度服务器个数 - 分片个数 * / private volatile int easyJobLastThreadCount = 0 ; / * * * easyjob多线程名称 * / private static final String EASYJOB_THREAD_NAME = "dts.easyJobs" ; / * * * 子类的泛型参数类型 * / private Class<T> argumentType; / * * * 无参构造 * / public AbstractEasyjobOnlyScheduleProcess() { / / 设置子类泛型参数类型 argumentType = this.getArgumentType(); } @Autowired private RedisHelper redisHelper; / * * * 非task表扫描待处理的任务数据 * @param taskServerParam * @param curServer * @ return * / protected abstract List <T> loadTasks(TaskServerParam taskServerParam, int curServer); / * * * 业务处理抽象方法 - 单个 * @param task * / protected abstract void doSingleTask(T task); / * * * 业务处理抽象方法 - 批量 * @param tasks * / protected abstract void doBatchTasks( List <T> tasks); / * * * 拼装ump监控key * @param prefix * @param taskNameKey * @ return * / private String getUmpKey(String prefix,String taskNameKey) { StringBuffer umpKeyBuf = new StringBuffer(); umpKeyBuf.append(prefix).append(taskNameKey); return umpKeyBuf.toString(); } / * * * easyjob平台异步任务回调方法 * @param scheduleContext * @ return * @throws Exception * / @Override public TaskResult doTask(ScheduleContext scheduleContext) throws Exception { String requestNo = TraceUtils.getTraceId(); try { String traceId = MDC.get(LogConstants.MDC_LOG_TRACE_ID_KEY); if (StringUtils.isBlank(traceId)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo); } } catch (Exception e) { log.error( "easyjob执行异常" , e); } EasyJobTaskServerParam taskServerParam = null; CallerInfo callerinfo = null; try { / / 条件转换 taskServerParam = EasyJobCoreUtil.transTaskServerParam(scheduleContext); String taskNameKey = getTaskNameKey(); String umpKey = getUmpKey(EASYJOB_UMP_KEY_RREFIX,taskNameKey); callerinfo = Profiler.registerInfo(umpKey, Constants.TRANS_BASIC, false, true); / / 多服务器,并且非子任务,本次不执行,提交子任务 if (taskServerParam.getServerCount() > 1 && !taskServerParam.isSubTask()) { submitSubTask(scheduleContext, taskServerParam,requestNo); return TaskResult.success(); } if (log.isInfoEnabled()) { log.info( "请求编号[{}],开始获取任务,任务ID[{}],任务名称[{}],执行参数[{}]" , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), JSON.toJSONString(taskServerParam)); } TaskServerParam cloverTaskServerParam = EasyJobCoreUtil.transferCloverTaskServerParam(taskServerParam); List <T> tasks = this.selectTasks(cloverTaskServerParam, taskServerParam.getCurServer()); if (log.isInfoEnabled()) { log.info( "请求编号[{}],获取任务ID[{}],任务名称[{}]共{}条" , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks = = null ? 0 : tasks.size()); } if (CollectionUtils.isNotEmpty(tasks)) { if (log.isInfoEnabled()) { log.info( "请求编号[{}],开始执行任务,任务ID[{}],任务名称[{}]" , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName()); } this.easyJobExecuteTasksInner(taskServerParam, tasks,requestNo); if (log.isInfoEnabled()) { log.info( "请求编号[{}],执行任务,任务ID[{}],任务名称[{}],执行数量[{}]完成...." , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), tasks.size()); } } return TaskResult.success(); } catch (Exception e) { Profiler.functionError(callerinfo); if (log.isInfoEnabled()) { log.error( "请求编号[{}],任务执行失败,任务ID[{}],任务名称[{}]" , requestNo, taskServerParam = = null ? " " : taskServerParam.getTaskId(), taskServerParam == null ? " " :taskServerParam.getTaskName(), e); } return TaskResult.fail(e.getMessage()); } finally { try { MDC.clear(); }catch (RuntimeException ex){ log.error( "easyjob执行异常" ,ex); } Profiler.registerInfoEnd(callerinfo); } } / * * * 多分片提交子任务 * @param scheduleContext 调度任务上下文参数 * @param taskServerParam 调度任务参数 * @param requestNo 调度任务参数 * @ return void * / private void submitSubTask(ScheduleContext scheduleContext, EasyJobTaskServerParam taskServerParam,String requestNo) throws IOException { log.info( "请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],开始提交子任务" , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount()); String jobClass = scheduleContext.getTaskGetResponse().getJobClass(); if (StringUtils.isBlank(jobClass)) { throw new RuntimeException( "jobClass get error" ); } for ( int i = 0 ; i < taskServerParam.getServerCount(); i + + ) { Map <String, String> dataMap = scheduleContext.getParameters(); / / 提交子任务标识 dataMap.put( "isSubTask" , "true" ); / / 给子任务进行编号 dataMap.put( "curServer" , String.valueOf(i)); / / 父任务名称传递子任务 dataMap.put( "taskName" , taskServerParam.getTaskName()); scheduleContext.commitSubTask(jobClass, dataMap, taskServerParam.getExpected(), taskServerParam.getTransactionalAccept()); } / / 父任务等待子任务执行完毕再更改状态,如果执行时间超过等待时间,抛异常 / / scheduleContext.waitForSubtaskCompleted(( long ) taskServerParam.getServerCount() * taskServerParam.getExpected()); log.info( "请求编号[{}],执行任务,任务ID[{}],任务名称[{}],子任务个数[{}],提交完成...." , requestNo, taskServerParam.getTaskId(), taskServerParam.getTaskName(), taskServerParam.getServerCount()); } / * * * 创建线程池,按配置参数执行task * @param param 执行参数 * @param tasks 任务集合 * @param requestNoStr * @ return void * / private void easyJobExecuteTasksInner(final EasyJobTaskServerParam param, List <T> tasks,String requestNoStr) { int threadCount = param.getThreadCount(); synchronized (this) { if (this.easyJobExecutor = = null) { this.easyJobExecutor = (ThreadPoolExecutor) EasyJobCoreUtil.createCustomeasyJobExecutorService(threadCount, EASYJOB_THREAD_NAME); this.easyJobLastThreadCount = threadCount; } else if (threadCount > this.easyJobLastThreadCount) { this.easyJobExecutor.setMaximumPoolSize(threadCount); this.easyJobExecutor.setCorePoolSize(threadCount); this.easyJobLastThreadCount = threadCount; } else if (threadCount < this.easyJobLastThreadCount) { this.easyJobExecutor.setCorePoolSize(threadCount); this.easyJobExecutor.setMaximumPoolSize(threadCount); this.easyJobLastThreadCount = threadCount; } } List < List <T>> lists = Lists.partition(tasks, param.getExecuteCount()); final CountDownLatch latch = new CountDownLatch(lists.size()); final String requestNo = requestNoStr; for (final List <T> list : lists) { this.easyJobExecutor.submit( new Callable < Object >() { public Object call() throws Exception { try { if (StringUtils.isNotBlank(requestNo)) { MDC.put(LogConstants.MDC_LOG_TRACE_ID_KEY, requestNo); } }catch (RuntimeException e){ log.error( "easyjob自定义log跟踪拦截器执行异常" ,e); } try { if (log.isInfoEnabled()) { log.info( "请求编号[{}],正在执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]..." , requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list .size()); } executeTasks( list ); if (log.isInfoEnabled()) { log.info( "请求编号[{}],执行任务,任务ID[{}],任务名称[{}],[{}],条数:[{}]成功!" , requestNo, param.getTaskId(), param.getTaskName(), Thread.currentThread().getName(), list .size()); } } catch (Exception e) { log.error(e.getMessage(), e); throw e; } finally { try { MDC.clear(); }catch (RuntimeException ex){ log.error( "easyjob自定义log跟踪拦截器执行异常" ,ex); } latch.countDown(); } return null; } } ); } try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException( "interrupted when processing data access request in concurrency" , e); } } / * * * 获取任务名称 * @ return * / private String getTaskNameKey(){ StringBuffer keyBuf = new StringBuffer(); keyBuf.append(activeEnv) .append(Constants.SEPARATOR_UNDERLINE) .append(this.getClass().getSimpleName()); return keyBuf.toString(); } protected void executeTasks( List <T> taskList) { if (CollectionUtils.isEmpty(taskList)) { return ; } this.doTasks(taskList); } / * * * 业务处理抽象方法 * @param list * / protected void doTasks( List <T> list ){ if (isDoBatchTasks()){ CallerInfo info = Profiler.registerInfo(getClass().getName() + "_batch" , Constants.TRANS_BASIC,false, true); try { / * * 开始执行各个子类真正业务逻辑 * / this.doBatchTasks( list ); } catch(CommonBusinessException ex){ log.warn(ex.getMessage()); } catch (Exception e) { Profiler.functionError(info); log.error( "任务处理失败,方法:{},任务:{}" ,ClassHelper.getMethod(),JSON.toJSONString( list ), e); } finally { Profiler.registerInfoEnd(info); } } else { for (T task : list ) { CallerInfo info = Profiler.registerInfo(getClass().getName(), Constants.TRANS_BASIC,false, true); if (task = = null) { continue ; } String lockKey = ""; try { / * * 开始执行各个子类真正业务逻辑 * / if (useConcurrentLock()) { lockKey = getLockKey(task); if (redisHelper.lock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey)) { this.doSingleTask(task); } else { lockKey = ""; log.warn( "lockKey:{},加载失败,正在被其他用户锁定,请重试!" ,lockKey); } } else { this.doSingleTask(task); } } catch(CommonBusinessException ex){ log.warn(ex.getMessage()); } catch (Exception e) { Profiler.functionError(info); log.error( "任务处理失败,方法:{},任务:{}" ,ClassHelper.getMethod(),JSON.toJSONString(task), e); } finally { Profiler.registerInfoEnd(info); if (StringUtils.isNotBlank(lockKey)) { redisHelper.unlock(RedisKeyDef.SyncLockKeyPrefix.TASK_PROCESS_LOCK_PREFIX, lockKey); } } } } } / * * * 获取实体类的实际类型 * * @ return * / private Class<T> getArgumentType() { return (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[ 0 ]; } / * * * 是否使用防并发锁 * 默认不使用,如需使用子类重写该方法 * @ return * / protected boolean useConcurrentLock() { return false; } / * * * 根所注解获取LockKey,可被子类重写,提高效率 * * @param businessObj 业务对象 * @ return concurrent lock key * / protected String getLockKey( T businessObj) { StringBuilder lockKey = new StringBuilder(EASYJOB_SINGLE_TASK_LOCK_PREFIX); / / 若存在注解指定的防重字段,则使用这些字段拼装防重Key,否则使用MQ业务主键防重 List <ValueEntryInfo> valueEntries = getAnnotaionConcurrentKeys(businessObj); if (!CollectionUtils.isEmpty(valueEntries)) { for (ValueEntryInfo valueEntry : valueEntries) { lockKey.append(Constants.SEPARATOR_UNDERLINE); lockKey.append(valueEntry.getValue()); } } else { throw new CommonBusinessException(String. format ( "此任务处理需要加分布式锁,但是未设置锁key,所以不做业务处理,请检查,任务信息:%s" ,JSON.toJSONString(businessObj))); } return lockKey.toString(); } / * * * 查找对象的ConccurentKey注解,获取防重字段,并排序返回 * * @param businessObj 业务对象 * @ return 有序的业务字段值列表 * / private List <ValueEntryInfo> getAnnotaionConcurrentKeys(T businessObj) { List <ValueEntryInfo> valueEntries = new ArrayList<ValueEntryInfo>(); Field[] fields = businessObj.getClass().getDeclaredFields(); for ( int i = 0 ; i < fields.length; i + + ) { ConcurrentKey concurrentKey = fields[i].getAnnotation(ConcurrentKey. class ); if (concurrentKey ! = null) { fields[i].setAccessible(true); Object fieldVal = null; try { ValueEntryInfo valueEntry = new ValueEntryInfo(); fieldVal = fields[i].get(businessObj); if (fieldVal ! = null) { valueEntry.setValue(String. format ( "%1$s" , fieldVal)); valueEntry.setOrder(concurrentKey.order()); valueEntries.add(valueEntry); } } catch (IllegalAccessException e) { log.error( "IllegalAccess-{}.{}" , businessObj.getClass().getName(), fields[i].getName()); } } } if (valueEntries.size() > 1 ) { / / 排序ConcurrentKey Collections.sort(valueEntries, new Comparator<ValueEntryInfo>() { @Override public int compare(ValueEntryInfo o1, ValueEntryInfo o2) { if (o1.getOrder() > o2.getOrder()) { return 1 ; } else if (o1.getOrder() = = o2.getOrder()) { return 0 ; } else { return - 1 ; } } }); } return valueEntries; } protected List <T> selectTasks(TaskServerParam taskServerParam, int curServer) { return this.loadTasks(taskServerParam, curServer); } / * * * 获取select时的任务创建开始时间 * @param serverArg * @ return * / protected Date getCreateTimeFrom(String serverArg){ return null; } / * * * 是否以批量方式处理任务 * @ return * / protected boolean isDoBatchTasks(){ return false; } } |
上述所述均为透传ID场景的原理和示例代码,实战效果如下图:调用jsf超时,跨系统查看日志进行排查,得知为慢sql引起
上述大部分场景已经抽出一个通用jar包,详细使用教程见我的另一篇文章:分布式日志追踪ID使用教程
作者:京东物流 张小龙
来源:京东云开发者社区 自猿其说 Tech 转载请注明来源