dede网站地图不显示文章列表wordpress怎么发博客
dede网站地图不显示文章列表,wordpress怎么发博客,wordpress 产品筛选,北京网站建设 网站制作文章目录 一. 基础逻辑二. DirtyManager1. 初始化2. 收集脏数据并check3. 关闭资源 三. DirtyDataCollector1. 初始化2. 收集脏数据并check3. run#xff1a;消费脏数据4. 释放资源 四. LogDirtyDataCollector 一. 基础逻辑
脏数据管理模块的基本逻辑是#xff1a; 当数据消… 文章目录 一. 基础逻辑二. DirtyManager1. 初始化2. 收集脏数据并check3. 关闭资源 三. DirtyDataCollector1. 初始化2. 收集脏数据并check3. run消费脏数据4. 释放资源 四. LogDirtyDataCollector 一. 基础逻辑
脏数据管理模块的基本逻辑是 当数据消费失败时将脏数据拦截并保存到dirtyDataCollector中全局metric判断脏数据达到设定值之后任务报错flink停止运行并将脏数据输出到flink日志中、或mysql的配置中。 对于代码实现 DirtyManager用于管理DirtyDataCollector串起DirtyDataCollector的生命周期DirtyDataCollector主要用于收集脏数据并输出到日志中mysql中脏数据数量达到设定值之后flink停止运行。 具体的DataCollector实现有 分别用于输出到taskmanager的日志、最后报错时jobmanager日志、输出到mysql表中。
所以这里有三层代码结构 DirtyManager管理DirtyDataCollectorDirtyDataCollector主要用于收集脏数据并输出并判断脏数据是否达到临界值具体的DataCollector的实现具体的输出实现输出到日志输出到mysql。 接下来我们逐个看每层的具体实现逻辑
二. DirtyManager
DirtyManager用于管理DirtyDataCollector串起DirtyDataCollector的生命周期open、run、close主要流程如下 设置系统配置给DirtyDataCollector开启DirtyManager线程主要用于DirtyDataCollector消费脏数据收集脏数据关闭资源DirtyDataCollector、DirtyManager的线程资源。 1. 初始化
初始化DirtyManager 根据配置加载特定的DirtyDataCollector用于脏数据的收集获取系统信息jobId、jobName、operationName获取脏数据metric用于定期合并脏数据为全局脏数据。 public DirtyManager(DirtyConfig dirtyConfig, RuntimeContext runtimeContext) { //通过反射注册DirtyDataCollectorthis.consumer DataSyncFactoryUtil.discoverDirty(dirtyConfig); MapString, String allVariables runtimeContext.getMetricGroup().getAllVariables(); this.jobId allVariables.get(JOB_ID); this.jobName allVariables.getOrDefault(JOB_NAME, defaultJobName); this.operationName allVariables.getOrDefault(OPERATOR_NAME, defaultOperatorName); this.errorCounter runtimeContext.getLongCounter(Metrics.NUM_ERRORS);
}2. 收集脏数据并check
被具体的连接器调用 具体当连接器生产数据或写数据到数据源报错时调用此方法收集脏数据 创建线程用于异步执行DirtyDataCollector开始消费脏数据到日志或mysql表中添加脏数据条数同步到全局脏数据metric中脏数据信息存到队列中等待具体的脏数据收集器消费子流程判断脏数据条数是否大于总脏数据条数 public void collect(Object data, Throwable cause, String field, long globalErrors) { if (executor null) { execute(); } DirtyDataEntry entity new DirtyDataEntry(); entity.setJobId(jobId); entity.setJobName(jobName); entity.setOperatorName(operationName); entity.setCreateTime(new Timestamp(System.currentTimeMillis())); entity.setDirtyContent(toString(data)); entity.setFieldName(field); entity.setErrorMessage(ExceptionUtil.getErrorMessage(cause)); //积累metricerrorCounter这里直接同步到jobmanagererrorCounter.add(1L); //将脏数据添加到队列等待消费。consumer.offer(entity, globalErrors);
}/** * 创建线程用于异步执行DirtyDataCollector */public void execute() { if (executor null) { executor new ThreadPoolExecutor( MAX_THREAD_POOL_SIZE, MAX_THREAD_POOL_SIZE, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ChunJunThreadFactory( dirty-consumer, true, (t, e) - { log.error( String.format( Thread [%s] consume failed., t.getName()), e); }), new ThreadPoolExecutor.CallerRunsPolicy()); } //初始化DirtyDataCollector比如脏数据定时发送到mysql时的线程注册 consumer.open(); //拿出一个线程执行DirtyDataCollector的execute方法 executor.execute(consumer);
} 3. 关闭资源
/** Close manager. */
public void close() { if (!isAlive.get()) { return; } //先关闭datacollector的资源if (consumer ! null) { consumer.close(); } //再关闭executor线程if (executor ! null) { executor.shutdown(); } isAlive.compareAndSet(true, false);
}三. DirtyDataCollector
处于第二层的dirtyDataCollector实现了脏数据的临时保存并等待具体DataCollector的消费 它的基本逻辑是 当脏数据消费失败时将脏数据拦截并保存到consumeQueue中等待被消费全局的metric脏数据达到设定值之后任务报错flink停止运行并将脏数据输出到flink日志中。 1. 初始化
在DirtyManager实例化时注册DirtyDataCollector时的操作 这里获取脏数据最大值允许消费脏数据失败的条数以及对具体DataCollector的初始化我们下节分析。 public void initializeConsumer(DirtyConfig conf) { this.maxConsumed conf.getMaxConsumed(); this.maxFailedConsumed conf.getMaxFailedConsumed(); this.init(conf);
}被DirtyManager调用在开启脏数据收集器线程之前执行 初始化具体脏数据收集器目前之后mysql脏数据收集器实现了此方法消费线程、mysql连接 public void open() {
}2. 收集脏数据并check
offer方法被DirtyManager的collect方法调用 用于存储具体脏数据并更新单个slot的脏数据条数。每添加一条脏数据就判断脏数据是否达到了设定值如果是则抛出异常。 其中globalErrors是上文AccumulatorCollector定期更新的结果。 //存储脏数据具体内容并更新单个slot的脏数据条数
public synchronized void offer(DirtyDataEntry dirty, long globalErrors) { consumeQueue.offer(dirty); addConsumed(1L, dirty, globalErrors);
}/** * 添加脏数据 * 通过metric判断此时的脏数据条数是否已经超过全局设置的脏数据条数 * param count * param dirty * param globalErrors */
protected void addConsumed(long count, DirtyDataEntry dirty, long globalErrors) { consumedCounter.add(count); // 因为总体的脏数据需要tm和jm进行通讯每tm心跳1s会有延迟且当单slot运行时误差将达到最大 // 所以这里需要判断延迟情况 long max consumedCounter.getLocalValue() globalErrors ? consumedCounter.getLocalValue() : globalErrors; // 但这里仍然有误差此时如果所有的slot都消费了脏数据那么其他slot的脏数据就记录不到。也就是会多消费脏数据 // 所以这里要有取舍是否要消费完全准确的脏数据 if (max maxConsumed) { StringJoiner dirtyMessage new StringJoiner(\n) .add(\n****************Dirty Data Begin****************\n) .add(dirty.toString()) .add(\n****************Dirty Data End******************\n); throw new NoRestartException( String.format( The dirty consumer shutdown, due to the consumed count exceed the max-consumed [%s], maxConsumed) dirtyMessage); }
} 3. run消费脏数据
由DirtyManager开启脏数据消费线程 具体的DataCollector(log、mysql)消费脏数据发送到Taskmanager日志或mysql表中。 /** * 开启脏数据消费线程 * 定时消费脏数据发送到执行脏数据管理器中log、mysql等 */
Override
public void run() { while (isRunning.get()) { try { //指定的DataCollector消费脏数据DirtyDataEntry dirty consumeQueue.take(); consume(dirty); } catch (Exception e) { //未成功将脏数据收集到脏数据管理模块中 addFailedConsumed(e, 1L); } }
}/** * 消费脏数据用于输出到日志、mysql等 */protected abstract void consume(DirtyDataEntry dirty) throws Exception; 4. 释放资源
不同的DataCollector有不同的操作下节分析
public abstract void close();四. LogDirtyDataCollector
实现比较简单拿到的数据直接打印到Taskmanager中关闭时设定isRunning为false
/** * 没有线程调用即输出到日志中 */
Slf4j
public class LogDirtyDataCollector extends DirtyDataCollector { private static final long serialVersionUID 7366317208451727471L; private Long printRate; Override protected void init(DirtyConfig conf) { this.printRate conf.getPrintRate(); } /** * 输出脏数据到taskmanager * param dirty dirty-data which should be consumed. */ Override protected void consume(DirtyDataEntry dirty) { if (consumedCounter.getLocalValue() % printRate 0) { StringJoiner dirtyMessage new StringJoiner(\n) .add(\nDirty Data) .add(dirty.toString()) .add(\n); log.warn(dirtyMessage.toString()); } } Override public void close() { isRunning.compareAndSet(true, false); log.info(Print consumer closed.); }
} 下篇分析MysqlDirtyDataCollector是如何消费数据。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/87797.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!