最近一直在负责业务监控告警相关的开发;由于组织架构调整,从原来的服务端架构组分离出来成立工程效率组,很多原来不是我们组负责的项目也开始陆续交接到了我们手里;
以前一直由业务部门负责开发的sensoragent项目就是由我来交接,交接的时候才发现有很多问题,该应用是业务部门用来从MQ消费消息往神策发送数据的,由于数据量很大,该应用在生产已经扩大到了12点,而我们普通应用在生产也就四个点;虽然已经扩大到了12点,生产仍然有很大的消息堆积来不及处理,所以每天都能收到大量的消息堆积告警;
下图是我的告警(目前公司只有开发运维角色才会接收到告警)
因为这两天迭代结束,有时间来处理下自己交接项目的一个分析和优化:
我查看了应用在openshift里面的一些情况,同时也分析了一下granfa上面的应用运行状况:在看到openshift里面的时候发现了一些端倪:如图
项目里用到了十个线程去消费 MQ的数据,但是openshift上面可以看出,每次只有一个线程在处理,而且会出现Blocked的情况:
点开以后发现blocked的线程阻塞在send方法:
@Override public void send(Map<String, Object> message) { synchronized (messageList) { messageList.add(message); if (messageList.size() >= bulkSize) { flush(); } } }
这是神策官方sdk提供的的方法,可以看到加了锁,每次consumer消费完信息,都会在内存里面存一份数据然后统一flush到神策。
我们再来看交接过来的项目是怎么使用的;
SensorsAnalytics这个对象是个单例对象,这样存在的问题在于十个线程共用一个对象,存在现在安全,所以有了synchronized关键字,但也会导致串行化,同时但list集合满了,会执行flush方法,而该方法:如下
@Override public void flush() { synchronized (messageList) { String sendingData = null; try { sendingData = jsonMapper.writeValueAsString(messageList); } catch (JsonProcessingException e) { messageList.clear(); if (throwException) { throw new RuntimeException("Failed to serialize data.", e); } } try { this.httpConsumer.consume(sendingData); messageList.clear(); } catch (IOException e) { if (throwException) { throw new RuntimeException("Failed to dump message with BatchConsumer.", e); } } catch (HttpConsumer.HttpConsumerException e) { if (throwException) { throw new RuntimeException("Failed to dump message with BatchConsumer.", e); } } } }
该方法会往神策发送数据,并且每次发送的时间很长,这也会严重阻碍到线程继续消费运行;
目前我的解决方案是,为每个线程分配一个SensorsAnalytics对象:
代码改进:
@Slf4j public class SensorsAnalyticsThreadLocal { private static final String PROPERTIES_PATH = "/application.properties"; private static final String URL_SENSORS_DATA = "url.sensorsanalytics"; private static final String SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE = "sensorsdata.batchconsumer.bulksize"; private static Properties props; static { Resource resource = new ClassPathResource(PROPERTIES_PATH); try { props = PropertiesLoaderUtils.loadProperties(resource); } catch (IOException e) { log.error("初始化配置失败", e); } } private static ThreadLocal<SensorsAnalytics> sensorsAnalyticsThreadLocal = ThreadLocal.withInitial(() -> { String sensorsDataUrl = props.getProperty(URL_SENSORS_DATA); Integer bulkSize = Integer.valueOf(props.getProperty(SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE)); return new SensorsAnalytics(new SensorsAnalytics.BatchConsumer(sensorsDataUrl, bulkSize, true)); } ); public static SensorsAnalytics get() { return sensorsAnalyticsThreadLocal.get(); } public static void set(SensorsAnalytics sensorsAnalytics) { sensorsAnalyticsThreadLocal.set(sensorsAnalytics); } public static void remove() { sensorsAnalyticsThreadLocal.remove(); } }
今天中午上到了生产,并且下来效果卓有成效,以前下午积压最严重的时候,也没有告警了: