网站建设需要数学济邦建设有限公司官方网站
网站建设需要数学,济邦建设有限公司官方网站,wordpress 初始化插件,小程序源码使用教程上一篇我们简单介绍了基于SkyWalking自定义增强的基本架构#xff0c;即通过把Trace数据导入数据加工模块进行加工#xff0c;进行持久化#xff0c;并赋能grafana展示。
现在我们给出一个例子#xff0c;对于量化交易系统#xff0c;市场交易订单提交#xff0c;该订单…上一篇我们简单介绍了基于SkyWalking自定义增强的基本架构即通过把Trace数据导入数据加工模块进行加工进行持久化并赋能grafana展示。
现在我们给出一个例子对于量化交易系统市场交易订单提交该订单可以走模拟盘也可以走实盘可以自动提交也可以走人工提交订单提交后会把交易所给到的订单信息反馈回来。 需要监控的需求很简单可以按自动实盘/虚拟盘人工实盘/虚拟盘订单分类监控提交和反馈流程满足指标项
1 每分钟延时、延时百分位P50/75/90/95/99 MAX、每分钟请求数排名前5的慢请求等监控项metrics
2 以及按排名前5的慢请求对应的SPAN进行抓取分析出最慢的SPAN
那么SW原生监控有啥问题呢 1 需要根据该流程在不同阶段的特征才能定位该流程按Trace-Span模型来说即需要一个Trace链根据不同Span提供的特征才能抓取该TraceSW并不支持
例如 分辨人工/自动订单实际上是按Trace相关EndpointName来的 人工订单走页面EntrySpan的 endpointName为POST:/api/trade/order/send 但自动订单由程序发起EntrySpan的 endpointName为“rpc.OrderTradeService.send”
而分辨是否走实盘/虚拟盘则是在后续Span按tag systemFlag1或2来确认 而SW的搜索显然是不支持的
问题2 反馈消息是根据交易所API生成的不是一个标准通讯架构只能根据自定义用户增强customize-enhance生成的localSpan形成跟踪链那SW原生Trace查询根本没法按endpoint名字搜索只能按tag搜索然后按时间取定位效率非常低还有一个上一篇说了SW对Trace和Span不提供metric聚合项
那增强计算模块怎么解决上述问题 对问题1 按人工、自动、虚拟、实盘形成4个搜索项然后定时基本同时执行把搜索结果叠加到ES索引中按订单编号trade_id更新索引项利用ES的向量特征形加上业务标签供下游按业务标签定位需要的Trace
对问题2 按预先设计的Tag值标识反馈消息然后按Tag搜索把搜索结果叠加到ES索引中按订单编号trade_id更新索引项利用ES的向量特征形加上业务标签供下游按业务标签定位需要的Trace
对问题3 按业务标签计算各监控项metrics并按时间点汇总最慢的5个Trace查找Span
我们按配置config来说明 关于问题1我们配置了4个搜索项
tasks : [{ #查找按EndpointNamerpc.OrderTradeService.send查找自动订单并且在ES索引中增加业务标签 businessTag Autoname: task.QueryTraces, para : {serviceName : TradeService,endpointName : rpc.OrderTradeService.send,businessTag : { key: businessTag, value: Auto},tags : {},traces_index : traces- #索引名xx-后面跟着日期},switch : on, #搜索项有效interval : 60 #每隔60秒执行一次},{ #查找按EndpointNamePOST:/api/trade/order/send查找人工订单并且在ES索引中增加业务标签 businessTag manualname : task.QueryTraces,para : {serviceName : TradeService,endpointName : POST:/api/trade/order/send,businessTag : { key: businessTag, value: manual},tags : {},traces_index : traces-},switch : on,interval : 60},{ #查找按tag systemFlag1 查找人工订单并且在ES索引中增加业务标签 systemFlag 1 实盘name : task.QueryTraces,para : {serviceName : TradeService,endpointName : ,businessTag : { key: systemFlag, value: sim},tags : { key: systemFlag, value: 1},traces_index : traces-},switch : on,interval : 60},{ #查找按tag systemFlag2 查找人工订单并且在ES索引中增加业务标签 systemFlag 2 实盘name : task.QueryTraces,para : {serviceName : TradeService,endpointName : ,businessTag : { key: systemFlag, value: RealTime},tags : { key: systemFlag, value: 2},traces_index : traces-},switch : on,interval : 60},task.QueryTraces是查询程序按每分钟1次的节奏按Graphql接口查询需要用到的接口按ServiceName按SW内置查询searchService接口查ServiceId 按SW内置查询searchEndpoint接口查EndpointId 然后根据ServiceId EndpointId调用或者ServiceId和预置Tag按SW内置查询接口queryBasicTraces查询相关Traces注意点如下 1 查询窗口要注意也就是要防止Trace形成前执行查询语句建议做成滑动窗口可以调节窗口的大小或者隔几秒多试几次比如10秒执行3次 2 要注意应用多页查询queryBasicTraces有页数限制一次最多1000条要查全需要比较完整多页查询结构 查询完更新ES索引之后 很容易根据业务标签获取我们所需的Traces
同理对问题2我们引入配置文件实际上我们利用FIX报文msgtype8 报文的特征来标识反馈消息然后按ordStatus表示是否是成交或者订单有效的报文即按tags msgType8, ordStatus2/0 查询相关Traces
{name : task.QueryTraces,para : {serviceName : APIService,endpointName : ,businessTag : { key: OrdStatus, value: deal},tags : [{ key: msgType, value: 8},{key: ordStatus,value: 2}],traces_index : traces-},switch : on,interval : 60},{name : task.TracesQueryInfo,para : {serviceName : APIService,endpointName : ,businessTag : { key: OrdStatus, value: effect},tags : [{ key: msgType, value: 8},{key: ordStatus,value: 0}],traces_index : traces-},switch : on,interval : 60},对于问题3,我们配置两种计算模块 一是 task.Caculator用于计算各类Metrics,与SW无关二是 task.SpanInfo计算 ES索引库中 按大于95%分位数延时的慢Traces逐条查找全部Span
{ # 按业务标签查人工实盘的订单tracesbusinessTagmanualsystemFlagRealTime计算监控项name: task.Caculator,para : {businessTags :[{ key: businessTag, value: manual},{key: systemFlag,value: RealTime}],traces_index : traces-, # 源索引stat_index : traces_index- #监控项索引},switch : on,interval : 60,delay : 10 # 比源索引执行慢10秒},{ # 按业务标签查自动虚拟盘的订单tracesbusinessTagautosystemFlagsim计算监控项name: task.Caculator,para : {businessTags :[{ key: businessTag, value: Auto},{key: systemFlag,value: sim}],traces_index : traces-,stat_index : traces_index-},switch : on,interval : 60,delay : 10},{ # 按业务标签查自动实盘的订单tracesbusinessTagautosystemFlagRealtime计算监控项name: task.Caculator,para : {businessTags :[{ key: businessTag, value: Auto},{key: systemFlag,value: RealTime}],traces_index : traces-,stat_index : traces_index-},switch : on,interval : 60,delay : 10},{ # 按业务标签查反馈提交有效订单OrdStatuseffectsystemFlagRealtime计算监控项name: task.Caculator,para : {businessTags : { key: OrdStatus, value: effect},traces_index : traces-,stat_index : traces_index-},switch : on,interval : 60,delay : 10},{ # 计算 ES索引库中 按大于95%分位数延时的慢Traces逐条查找全部Spanname: task.SpanInfo,para : {percentile : 0.95,traces_index : traces-,span_index : traces_index-},switch : on,interval : 60,delay : 10}我们看一下订单提交计算结果索引
以及慢Trace相关Span的索引 关于task.QueryTracestask.Caculatortask.SpanInfo主要代码如下 task.QueryTraces
public class QueryTraces extends AbstractTraceQuery implements TaskService,Runnable{private static final Lock lock new ReentrantLock(); //对不同任务的竞争性资源加锁ObjectMapper objectMapper new ObjectMapper();String serviceName,serviceId,endpointName,endpointId,traces_index;ArrayNode businessTags;JsonNode businessTag,tags;DatasourceService datasource;TargetdbService targetdb;Overridepublic void run() {logger.info(QueryInfo begin...);if(.equals(serviceId)){//防止获取不到serviceIdserviceIdthis.datasource.queryServiceId(serviceName);if(.equals(serviceId)){//第二次获取不成功就终止线程logger.error(query serviceId fail);return;}}if(endpointName.equals()){//检查tags是否为空为空就终止线程if(tags.isNull() || tags.isMissingNode()) {logger.error(endpointName tags is both empty);return;}} else{if(.equals(endpointId)){//防止获取不到endpointIdendpointIdthis.datasource.queryEndPointId(endpointName,serviceName);if(.equals(endpointId)){//第二次获取不成功就终止线程logger.error(query endpointId fail);return;}}}targetdb.createForm(traces_index);String endTimegetTimeEndPoint(1,40);String startTimegetTimeEndPoint(3,41);int retry3; //重试次数int lastArraylistSize0;ArrayNode traceList JsonNodeFactory.instance.arrayNode();logger.info(QueryInfo startTime:: {} endTime:: {},startTime,endTime);try{while(retry0){//查询SW的traces数据注意有可能需要分页查询traceListgetMultiPageResult(datasource,serviceId,endpointId,startTime,endTime,tags);logger.info(traceList:: {} retry:: {},traceList.toString(),retry);if(traceList.size()lastArraylistSize){//如果查到结果打业务标签并按TraceId调批量更新目标库lastArraylistSizetraceList.size();MapString, ListMapString,Object traceMap genTraceMap(businessTags, traceList); //结果集合targetdb.updateDate(traces_index,traceMap);//打时间戳logger.info(TracesQuery update is done. {},System.currentTimeMillis());}try {// 暂停执行5秒钟Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}retry--;}}catch (Exception e) {e.printStackTrace();return;}}Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {......}
}task.Caculator
public class Caculator extends AbstractTraceQuery implements TaskService,Runnable {private final static Logger logger LoggerFactory.getLogger(TracesQueryInfo.class);private static final Lock lock new ReentrantLock(); //对不同任务的竞争性资源加锁String traces_index, stat_index;ArrayNode businessTags;JsonNode businessTag;DatasourceService datasource;TargetdbService targetdb;private MapString,Object traceProcess(MapString,Object sourceMap){//处理traces查询结果AtomicInteger durationSum new AtomicInteger();AtomicInteger count new AtomicInteger();AtomicInteger maxDurationnew AtomicInteger();double durationAvg,p50,p75,p90,p95,p99;ArrayListInteger durationArray new ArrayList();; //延时集合,用于计算分位数sourceMap.entrySet().stream().forEach((Map.EntryString,Object entry) - {count.getAndIncrement();String traceId entry.getKey();System.out.println(traceId:: traceId);Integer duration (int) Double.parseDouble(entry.getValue().toString());durationSum.addAndGet(duration);if (duration maxDuration.get()) {maxDuration.getAndSet(duration);}durationArray.add(duration);});durationAvg(durationSum.get())/(count.get());p50percentile(durationArray.toArray(new Integer[durationArray.size()]),0.5);p75percentile(durationArray.toArray(new Integer[durationArray.size()]),0.75);p90percentile(durationArray.toArray(new Integer[durationArray.size()]),0.90);p95percentile(durationArray.toArray(new Integer[durationArray.size()]),0.95);p99percentile(durationArray.toArray(new Integer[durationArray.size()]),0.99);MapString,Object resultMap new HashMap();resultMap.put(max_resp,maxDuration.get());resultMap.put(mean_resp,durationAvg);resultMap.put(count,count.get());resultMap.put(p50,p50);resultMap.put(p75,p75);resultMap.put(p90,p90);resultMap.put(p95,p95);resultMap.put(p99,p99);return resultMap;}Overridepublic void run() {if(targetdb.isExisted(traces_index)){logger.info(TracesStatInfo begin...);String endTime getTimeUtcEndPoint(1,30);String startTimegetTimeUtcEndPoint(2,31);logger.info(startTime:: {} endTime:: {},startTime,endTime);try{// 在es trace表中按bussinesTagList 查找local_time_stamp在当前时间范围内的记录logger.info(statQuery queryDate begins ... {},System.currentTimeMillis());MapString, Object dataMaptargetdb.queryData(traces_index,businessTags,startTime,endTime,duration);MapString, Object resMap new HashMap();if(null!dataMap) {//MapString, Object resMap new HashMap();logger.info(TracesStatInfo resultMap:: {} , dataMap.toString());resMap traceProcess(dataMap);// targetdb.createForm(stat_index);//targetdb.insertDate(stat_index, seqNo, resMap);}else{//找不到置0logger.info(StatInfo resultMap is null );resMap.put(max_resp, 0);resMap.put(mean_resp, 0);resMap.put(count, 0);resMap.put(p50, 0);resMap.put(p75, 0);resMap.put(p90, 0);resMap.put(p95, 0);resMap.put(p99, 0);}//打业务标签和时间戳resMap getMapWithTags(businessTags, resMap);String seqNo generateSeqNo(); //生成序号// 加锁lock.lock();targetdb.createForm(stat_index);targetdb.insertDate(stat_index, seqNo, resMap)}catch(Exception e){e.printStackTrace();return;}finally {// 释放锁lock.unlock();}}else{logger.info(trace_index {} is not existed,traces_index);}}Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {.....}
}task.SpanInfo
public class SpanInfo extends AbstractTraceQuery implements TaskService,Runnable{private final static Logger logger LoggerFactory.getLogger(SpanQueryInfo.class);private static final Lock lock new ReentrantLock(); //对不同任务的竞争性资源加锁String traces_index, span_index;DatasourceService datasource;TargetdbService targetdb;double percentile;private MapString,Object findTraces(MapString,Object sourceMap,double percentile){ArrayListInteger durationArray new ArrayList();; //延时集合,用于计算分位数MapString,Object resultMap new HashMap(); //结果集合//计算percentile分位sourceMap.entrySet().stream().forEach((Map.EntryString,Object entry) -{Integer duration (int) Double.parseDouble(entry.getValue().toString());durationArray.add(duration);});double percentileData percentile(durationArray.toArray(new Integer[0]), percentile);logger.info(percentileData:: {},percentileData);//查找超过percentile的traceIdsourceMap.entrySet().stream().forEach((Map.EntryString,Object entry) -{double duration (double) Double.parseDouble(entry.getValue().toString());if(durationpercentileData){String traceIdentry.getKey().toString();resultMap.put(traceId,duration);}});return resultMap;}Overridepublic void run() {logger.info(SpanInfo begin...);//建表targetdb.createForm(span_index);try{logger.info(SpanInfo try begin...);//找到当前trace_index索引中所有高出95%的值的traceId集合MapString, Object dataMaptargetdb.queryAllData(traces_index,duration);if(null!dataMap) {logger.info(SpanInfo resultMap:: {} , dataMap.toString());//查找高于percentile分位数的值MapString, Object resMap findTraces(dataMap, percentile);logger.info(spanInfo foundedMap:: {} , resMap.toString());//遍历查询结果如果span_index中不存在则查询span后插入span_indexresMap.entrySet().stream().forEach((Map.EntryString, Object entry) - {String traceId entry.getKey();if (targetdb.isNotInTheIndex(span_index, traceId, traceId)) {//按traceId查询spanArrayNode spanList datasource.getTraceSpans(traceId);MapString, ListMapString, Object spansMap genSpanMap(traceId, spanList); //组成SpanList//插入span_indextargetdb.updateDate(span_index, spansMap);}});}else{logger.info(SpanInfo resultMap is null );}}catch(Exception e){e.printStackTrace();return;}}Overridepublic void init(JsonNode paraData, DatasourceService datasourceService, TargetdbService targetdbService) {....}
}完成索引持久化后就可以以grafana访问ES库形成展示这部分不展开看一下效果
姑且算抛砖引玉吧希望各位大佬也分享一下方案
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/88384.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!