最近一直在负责业务监控告警相关的开发;由于组织架构调整,从原来的服务端架构组分离出来成立工程效率组,很多原来不是我们组负责的项目也开始陆续交接到了我们手里;

以前一直由业务部门负责开发的sensoragent项目就是由我来交接,交接的时候才发现有很多问题,该应用是业务部门用来从MQ消费消息往神策发送数据的,由于数据量很大,该应用在生产已经扩大到了12点,而我们普通应用在生产也就四个点;虽然已经扩大到了12点,生产仍然有很大的消息堆积来不及处理,所以每天都能收到大量的消息堆积告警;

下图是我的告警(目前公司只有开发运维角色才会接收到告警)

记一次性能优化(线程相关)-LMLPHP

因为这两天迭代结束,有时间来处理下自己交接项目的一个分析和优化:

我查看了应用在openshift里面的一些情况,同时也分析了一下granfa上面的应用运行状况:在看到openshift里面的时候发现了一些端倪:如图

记一次性能优化(线程相关)-LMLPHP

记一次性能优化(线程相关)-LMLPHP

项目里用到了十个线程去消费 MQ的数据,但是openshift上面可以看出,每次只有一个线程在处理,而且会出现Blocked的情况:

点开以后发现blocked的线程阻塞在send方法:

@Override public void send(Map<String, Object> message) {
      synchronized (messageList) {
        messageList.add(message);
        if (messageList.size() >= bulkSize) {
          flush();
        }
      }
    } 

这是神策官方sdk提供的的方法,可以看到加了锁,每次consumer消费完信息,都会在内存里面存一份数据然后统一flush到神策。

我们再来看交接过来的项目是怎么使用的;

记一次性能优化(线程相关)-LMLPHP

SensorsAnalytics这个对象是个单例对象,这样存在的问题在于十个线程共用一个对象,存在现在安全,所以有了synchronized关键字,但也会导致串行化,同时但list集合满了,会执行flush方法,而该方法:如下

@Override public void flush() {
      synchronized (messageList) {
        String sendingData = null;
        try {
          sendingData = jsonMapper.writeValueAsString(messageList);
        } catch (JsonProcessingException e) {
          messageList.clear();
          if (throwException) {
            throw new RuntimeException("Failed to serialize data.", e);
          }
        }

        try {
          this.httpConsumer.consume(sendingData);
          messageList.clear();
        } catch (IOException e) {
          if (throwException) {
            throw new RuntimeException("Failed to dump message with BatchConsumer.", e);
          }
        } catch (HttpConsumer.HttpConsumerException e) {
          if (throwException) {
            throw new RuntimeException("Failed to dump message with BatchConsumer.", e);
          }
        }
      }
    }

 该方法会往神策发送数据,并且每次发送的时间很长,这也会严重阻碍到线程继续消费运行;

目前我的解决方案是,为每个线程分配一个SensorsAnalytics对象:

代码改进:

@Slf4j
public class SensorsAnalyticsThreadLocal {

    private static final String PROPERTIES_PATH = "/application.properties";
    private static final String URL_SENSORS_DATA = "url.sensorsanalytics";
    private static final String SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE = "sensorsdata.batchconsumer.bulksize";
    private static Properties props;

    static {
        Resource resource = new ClassPathResource(PROPERTIES_PATH);
        try {
            props = PropertiesLoaderUtils.loadProperties(resource);
        } catch (IOException e) {
            log.error("初始化配置失败", e);
        }
    }

    private static ThreadLocal<SensorsAnalytics> sensorsAnalyticsThreadLocal = ThreadLocal.withInitial(() -> {
                String sensorsDataUrl = props.getProperty(URL_SENSORS_DATA);
                Integer bulkSize = Integer.valueOf(props.getProperty(SENSORS_DATA_BATCH_CONSUMER_BULK_SIZE));
                return new SensorsAnalytics(new SensorsAnalytics.BatchConsumer(sensorsDataUrl, bulkSize, true));
            }
    );

    public static SensorsAnalytics get() {
        return sensorsAnalyticsThreadLocal.get();
    }

    public static void set(SensorsAnalytics sensorsAnalytics) {
        sensorsAnalyticsThreadLocal.set(sensorsAnalytics);
    }

    public static void remove() {
        sensorsAnalyticsThreadLocal.remove();
    }
}

  

今天中午上到了生产,并且下来效果卓有成效,以前下午积压最严重的时候,也没有告警了:

01-16 02:41