回顾上期的问题,当我们搭建完成Skywalking的搭建,顺利完成应用监控之后,就会面临一类问题,怎么利用获取的监控数据,包括三方面:
1 应用的Trace和SW收集Service/Endpoint不一定完全一致,可能定位不到,更无法在UI展示
2 按Trace-Span进行下钻分析,SW并不支持,更别说,对于按Trace不同Span特征(可以理解为一项业务在不同阶段的特征数据)进行分析
3 业务本身要求监控展示统一技术标准

好在SkyWalking提供了GraphQL数据接口,并配合OAL观察查询语句,使得用户可以直接通过简单的GraphQL查询语言获得数据,
其原理和提供原生查询介绍可以参考官方文档:SW 官方文档
或者网络材料Skywalking-11:Skywalking查询协议——案例分析

我们监控使用grafana8, 因此选择 ,在grafana8通过GraphQL数据接口,接入SW监控数据,接入的过程参考
Linux环境安装开发grafana插件
以及grafana结合Skywalking追踪Trace

但是grafana本身的数据处理能力太弱,于是可以选择在grafana和Skywalking之间增加一个java开发的数据处理模块TraceProcessor,通过TraceProcessor获取SW的trace和Span数据,然后进行加工处理后在ES进行持久化,然后由grafana直接展示ES的数据。

TraceProcessor的主要架构是基于多线程多任务的定时任务,定时获取,计算Trace数据,并支持Graphql,ES接口,以及按配置定制任务的能力,架构如下
基于Skywalking开发分布式监控(三)-LMLPHP
我们先从配置工具 config tools入手,希望通过配置文件完成配置数据源(graqhQL)和持久化工具(ES)以及各类定时任务的配置关联,运行时通过反射方式加载各个操作类和定时任务(参考java以SSL方式连ES),以满足敏捷灵活的开发需求

{
 "datasource" : {
    "name": "datasource.GraphQLServiceImp",
    "para": {
        "url":"http://127.0.0.1:8090/graphql"       
    }
 },
 "targetdb" : {
    "name": "target.EsServiceImp",
    "para": {
        "url":"http://127.0.0.1:9200"
    }
 },
 "tasks" : [
    {
      "name": "task.QueryTraces",
      "para" : {
          "serviceName" : "TradeService",
          "endpointName" : "OrderSend",
          "businessTag" : { "key": "businessTag", "value": "Auto"},
          "tags" : {},
          "traces_index" :  "traces_-"
      },
      "switch" : "on",
      "interval" : "60"
    },
  ...
  {
     "name": "task.Caculator",
     "para" : {
       "businessTags" :[{ "key": "businessTag", "value": "Auto"},{"key": "systemFlag","value": "RealTime"}],
       "traces_index" :  "traces-",
       "stat_index" : "traces_index-"
     },
     "switch" : "on",
     "interval" : "60",
     "delay" : 10
   },
  ...
 ]
}

config.json的总体风格定义 执行类(包括路径)和参数,例如数据源graphql,连接,查询执行类就是datasource.GraphQLServiceImp,参数只有一个url http://127.0.0.1:8090/graphql 具体连接参考备忘:python和 java graphql client连Sky walking Server查询数据的联通性中的java部分

除了联通性,datasource.GraphQLServiceImp包含方法还有(以接口形式罗列)

//GraphQLServiceImp对应的interface
public interface DatasourceService {
    //联通及初始化方法
    public void initConnect(String url);
    //按ServiceName查询ServiceId
    public String queryServiceId(String ServiceName);
    //按ServiceName和EndpointName查询EndpointId
    public String queryEndPointId(String endpointName,String serviceName);
    //单页查询,按ServiceId,EndpointId,start_time和End_time以及tags查询trace (page=1)
    public ArrayNode getTotalTraces(String serviceId,String endpointId,String start_time, String end_time,JsonNode tags);
    //多页查询,按ServiceId,EndpointId,start_time和End_time以及tags,和pages 查询trace 
    public ArrayNode getTotalTraces2(String serviceId,String endpointId,String start_time,
                                            String end_time,JsonNode tags,int pageNum);
     //按TraceId查span
    public ArrayNode getTraceSpans(String traceId);

}

同理,ES执行类是target.EsServiceImp,参数url为http://127.0.0.1:9200,同样按接口方式罗列操作方法
具体连接可以参考java以SSL方式连ES

public interface TargetdbService {
    //初始化连接
    public void initConnect(String url,String userName,String password);
    //按索引名判断是否存在
    public boolean isExisted(String indexName);
    //按key value判断健值对是否在指定索引中存在
    public boolean isNotInTheIndex(String indexName,String key,String value);
    //按索引名和mapping创建索引
    public boolean createForm(String indexName, XContentBuilder mapping);
    //按索引名删除索引
    public boolean deleteForm(String indexName);
    //按索引名和关键值seqNo,插入Map
    public boolean insertDate(String indexName, String seqNo,Map dataMap);
     //按索引名,批量更新map(List)
    public boolean updateDate(String indexName,Map<String, List<Map<String,Object>>> resultMap);
    //按关键字(startTime,endTime和tag标签)查询索引
    public Map<String, Object> queryData(String indexName, ArrayNode businessTags, String startTime,
                                         String endTime, String resultTag);


}

对于定时任务,分为两类:
1) tarce查询任务:按需求和Skywalking查询条件,定时查询并筛选Trace,加上业务标签,并持久化

2) trace的指标计算任务:根据查询数据,按业务标签定时计算指标,例如每分钟请求数,平均/最大/百分位数延时、并持久化

这些任务可以根据定时框架调用,分为定时任务类TaskManager

public class TaskManager {
    private ScheduledExecutorService executorService;

    public TaskManager() {
        executorService = Executors.newScheduledThreadPool(10);
    }

    public void addTask(Runnable task, long delay, long period, TimeUnit timeUnit) {
        executorService.scheduleAtFixedRate(task, delay, period, timeUnit);
    }

    public void shutdown() {
        executorService.shutdown();
    }

}

和调度类MyTaskProcess

public class MyTaskProcess {
    private final static Logger logger = LoggerFactory.getLogger(MyTaskProcess.class);
    public static void main(String[] args) {
        TaskManager taskManager = new TaskManager();   //任务管理器
        try{
            // 读入配置文件
            ConfigParser config=new ConfigParser("config.json");

            // 连接SW Server 数据接口
            DatasourceService datasourceInstance=config.getDatasource();
            String datasourceUrl= config.getGraphqlUrl();
            datasourceInstance.initConnect(datasourceUrl);

            // 连接ES,获得可用的数据库
            TargetdbService targetdbInstance=config.getTargetdb();
            String targetdbUrl=config.getTargetDBUrl();
            targetdbInstance.initConnect(targetdbUrl);

            
            logger.info("start:: {} ...",new Date());
            //读入任务列表,并且遍历
            ArrayNode taskList=config.getTaskList();

            taskList.forEach(JsonNode->{
                String taskName=JsonNode.get("name").asText();
                String switch_on=JsonNode.get("switch").asText();

                
                logger.info("taskName:: {} switch_on:: {}",taskName,switch_on);

                if(switch_on.equals("on")){
                    //判断开关是否打开
                    
                    int interval=JsonNode.get("interval").asInt();
                    int delay=1; // 默认延迟
                    if(null!=JsonNode.get("delay"))
                        if(JsonNode.get("delay").asInt()>0){
                            delay=JsonNode.get("delay").asInt();
                            logger.info("delay:: {}",delay);
                        }
                    try {
                        TaskService task=(TaskService)config.getClass(taskName);
                        task.init(JsonNode.get("para"),datasourceInstance,targetdbInstance);
                        taskManager.addTask((Runnable) task, delay, interval, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }


                }

            });
          

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 注册钩子线程
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Shutting down SWTraceProcessor...");
                taskManager.shutdown();
            }));
        }

        //taskManager.shutdown();
    }
}

后续我们会给出一个例子,探讨对trace数据深加工的目标和具体实现

02-23 17:19