如何创建个人网站模板杭州营销网站建设平台

diannao/2026/1/23 0:34:57/文章来源:
如何创建个人网站模板,杭州营销网站建设平台,网页制作古诗素材,网上做网站1. 自动提交最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true#xff0c;那么每过 5s#xff0c;消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制#xff0c;默认值是5s。消费者每次…1. 自动提交最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true那么每过 5s消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制默认值是5s。消费者每次获取新数据时都会先把上一次poll()方法返回的最大偏移量提交上去。可能造成的问题数据重复读假设我们仍然使用默认的 5s 提交时间间隔在最近一次提交之后的 3s 发生了再均衡再均衡之后消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s所以在这 3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量减小可能出现重复消息的时间窗不过这种情况是无法完全避免的。2. 手动提交(1) 同步提交// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try{while(true) {ConsumerRecords records consumer.poll(1000);for(ConsumerRecord record : records) {// 假设把记录内容打印出来就算处理完毕System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}try{// 只要没有发生不可恢复的错误commitSync() 方法会一直尝试直至提交成功// 如果提交失败我们也只能把异常记录到错误日志里consumer.commitSync();}catch(CommitFailedException e) {System.err.println(commit failed! e.getMessage());}}}finally {consumer.close();}(2) 异步提交手动提交有一个不足之处在 broker 对提交请求作出回应之前应用程序会一直阻塞这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量但如果发生了再均衡会增加重复消息的数量。这个时候可以使用异步提交只管发送提交请求无需等待 broker 的响应。// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try{while(true) {ConsumerRecords records consumer.poll(1000);for(ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}// 提交最后一个偏移量然后继续做其他事情。consumer.commitAsync();}}finally {consumer.close();}在成功提交或碰到无法恢复的错误之前commitSync()会一直重试但是commitAsync()不会这也是commitAsync()不好的一个地方。它之所以不进行重试是因为在它收到服务器响应的时候可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量2000这个时候发生了短暂的通信问题服务器收不到请求自然也不会作出任何响应。与此同时我们处理了另外一批消息并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡就会出现重复消息。commitAsync()也支持回调在broker作出响应时会执行回调// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(Map offsets, Exception exception) {if(offsets ! null) {System.out.println(commit offset successful!);}if(exception ! null) {System.out.println(commit offset fail! exception.getMessage());}}});}} finally {consumer.close();}可以在回调中重试失败的提交以下为思路使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前先检查回调的序列号和即将提交的偏移量是否相等如果相等说明没有新的提交那么可以安全地进行重试。如果序列号比较大说明有一个新的提交已经发送出去了应该停止重试。(3) 同步和异步组合提交一般情况下针对偶尔出现的提交失败不进行重试不会有太大问题因为如果提交失败是因为临时问题导致的那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交就要确保能够提交成功。try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());}// 如果一切正常我们使用 commitAsync() 方法来提交// 这样速度更快而且即使这次提交失败下一次提交很可能会成功consumer.commitAsync();}}catch (Exception e) {e.printStackTrace();}finally {try {// 使用 commitSync() 方法会一直重试直到提交成功或发生无法恢复的错误// 确保关闭消费者之前成功提交了偏移量consumer.commitSync();}finally {consumer.close();}}(4) 提交特定的偏移量不管是自动提交还是使用commitAsync()或者commitSync()来提交偏移量提交的都是 poll() 方法返回的那批数据的最大偏移量想要自定义在什么时候提交偏移量可以这么做Map currentOffsets new HashMap();int count 0;......try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1, no metadata));if (count % 1000 0) {// 这里调用的是 commitAsync()不过调用 commitSync() 也是完全可以的// 当然在提交特定偏移量时仍然要处理可能发生的错误consumer.commitAsync(currentOffsets, null);}count;}}}finally {consumer.close();}3. 分区再均衡监听器消费者在退出和进行分区再均衡之前应该做一些正确的事情提交最后一个已处理记录的偏移量(必须做)根据之前处理数据的业务不同你可能还需要关闭数据库连接池、清空缓存等程序如何能得知集群要进行分区再均衡了消费者 API 提供了再均衡监听器以下程序可以做到 kafka 消费数据的 Exactly Once 语义package com.bonc.rdpe.kafka110.consumer;import java.util.Collection;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;/*** Title RebalanceListenerConsumer.java* Description 再均衡监听器* Author YangYunhe* Date 2018-06-27 17:35:05*/public class RebalanceListenerConsumer {public static void main(String[] args) {Map currentOffsets new HashMap();Properties props new Properties();props.put(bootstrap.servers, 192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094);// 把auto.commit.offset设为false让应用程序决定何时提交偏移量props.put(auto.commit.offset, false);props.put(group.id, dev3-yangyunhe-group001);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumer consumer new KafkaConsumer(props);consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {/** 再均衡开始之前和消费者停止读取消息之后被调用* 如果在这里提交偏移量下一个接管分区的消费者就知道该从哪里开始读取了*/Overridepublic void onPartitionsRevoked(Collection partitions) {// 如果发生再均衡我们要在即将失去分区所有权时提交偏移量// 要注意提交的是最近处理过的偏移量而不是批次中还在处理的最后一个偏移量System.out.println(Lost partitions in rebalance. Committing current offsets: currentOffsets);consumer.commitSync(currentOffsets);}/** 在重新分配分区之后和新的消费者开始读取消息之前被调用*/Overridepublic void onPartitionsAssigned(Collection partitions) {long committedOffset -1;for(TopicPartition topicPartition : partitions) {// 获取该分区已经消费的偏移量committedOffset consumer.committed(topicPartition).offset();// 重置偏移量到上一次提交的偏移量的下一个位置处开始消费consumer.seek(topicPartition, committedOffset 1);}}});try {while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println(value record.value() , topic record.topic() , partition record.partition() , offset record.offset());currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1, no metadata));}consumer.commitAsync(currentOffsets, null);}} catch (Exception e) {e.printStackTrace();} finally {try{consumer.commitSync(currentOffsets);} catch (Exception e) {e.printStackTrace();} finally {consumer.close();System.out.println(Closed consumer successfully!);}}}}当然你也可以选择再均衡后从头开始消费consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {System.out.println(starting partitions rebalance...);}Overridepublic void onPartitionsAssigned(Collection partitions) {consumer.seekToBeginning(partitions);}});以上代码与 props.put(auto.offset.reset, earliest);是等效的。设置从最新消息开始消费consumer.subscribe(Collections.singletonList(dev3-yangyunhe-topic001), new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {System.out.println(starting partitions rebalance...);}Overridepublic void onPartitionsAssigned(Collection partitions) {consumer.seekToEnd(partitions);}});以上代码与props.put(auto.offset.reset, latest);等效。4. 涉及到数据库的 Exactly Once 语义的实现思路当处理 Kafka 中的数据涉及到数据库时那么即使每处理一条数据提交一次偏移量也可以造成数据重复处理或者丢失数据看以下为伪代码Map currentOffsets new HashMap();......while (true) {ConsumerRecords records consumer.poll(100);for (ConsumerRecord record : records) {currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 1);// 处理数据processRecord(record);// 把数据存储到数据库中storeRecordInDB(record);// 提交偏移量consumer.commitAsync(currentOffsets);}}假设把数据存储到数据库后没有来得及提交偏移量程序就因某种原因挂掉了那么程序再次启动后就会重复处理数据数据库中会有重复的数据。如果把存储到数据库和提交偏移量在一个原子操作里完成就可以避免这样的问题但数据存到数据库偏移量保存到kafka是无法实现原子操作的而如果把数据存储到数据库中偏移量也存储到数据库中这样就可以利用数据库的事务来把这两个操作设为一个原子操作同时结合再均衡监听器就可以实现 Exactly Once 语义以下为伪代码consumer.subscribe(Collections topics, new ConsumerRebalanceListener() {Overridepublic void onPartitionsRevoked(Collection partitions) {// 发生分区再均衡之前提交事务commitDBTransaction();}Overridepublic void onPartitionsAssigned(Collection partitions) {// 再均衡之后从数据库获得消费偏移量for(TopicPartition topicPartition : partitions) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));}}});/*** 消费之前调用一次 poll()让消费者加入到消费组中并获取分配的分区* 然后马上调用 seek() 方法定位分区的偏移量* seek() 设置消费偏移量设置的偏移量是从数据库读出来的说明本次设置的偏移量已经被处理过* 下一次调用 poll() 就会在本次设置的偏移量上加1开始处理没有处理过的数据* 如果seek()发生错误比如偏移量不存在则会抛出异常*/consumer.poll(0);for(TopicPartition topicPartition : consumer.assignment()) {consumer.seek(topicPartition, getOffsetFromDB(topicPartition));}while (true) {ConsumerRecords records consumer.poll(1000);for (ConsumerRecord record : records) {// 处理数据processRecord(record);// 把数据存储到数据库中storeRecordInDB(record);// 把偏移量存储到数据库中storeOffsetInDB(record.topic(), record.partition(), record.offset());}// 以上3步为一个事务提交事务这里在每个批次末尾提交一次事务是为了提高性能commitDBTransaction();}把偏移量和记录保存到用一个外部系统来实现 Exactly Once 有很多方法但核心思想都是结合 ConsumerRebalanceListener 和 seek() 方法来确保能够及时保存偏移量并保证消费者总是能够从正确的位置开始读取消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/89127.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站建设图标合集网站如何做团购

题目 28. 实现 strStr() 实现 strStr() 函数。 给定一个 haystack 字符串和一个 needle 字符串,在 haystack 字符串中找出 needle 字符串出现的第一个位置 (从0开始)。如果不存在,则返回 -1。 示例 1: 输入: haystack “hello”, needle “ll” 输…

企业展示网站 价钱网站建设免费软件

目录 修改约束 创建数据库 添加约束 删除约束 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 修改约束 如果说表结构的修改还在可以容忍的范畴之内,那么约束的修改是绝对 100% 禁止的 所有的约束一定要在…

wordpress 仿站 教程网2008iis网站属性

C11标准之前的auto_ptr这个智能指针不被广泛使用的原因就是:在某些应用场景下,拷贝构造函数的意义不明确,同理赋值语句也是这个道理,意义同样不明确,因为C11标准之前并不存在移动赋值和移动构造的概念,还有…

做淘宝网站要求与想法常州建设工程电子审图网站

vbs读取数据库值前端FlexGrid前导0出不来的原因 原因 系统设置问题 解决 修改系统默认数值显示: 1)控制面板找到“区域”,点击“更改日期、时间和数字模式”,在弹出窗口点击“其他设置” 2)在数字一栏中的“显示前…

制作网站教程哪个网站可以做加工代理的

学习python前纠结了下,到底是应该一个个知识点吃透,然后写些小程序。还是应该快速掌握基础语法,快速实践。思考后认为前者这么学习速度真心不高,于是花2天时间看了下python3的语法,虽然很多都不明白,但是带…

网站开发合同属于知识产权类吗谁给个网站啊急急急2021

文章目录前期准备应用场景1.constant_score查询-不考虑文档频率得分,与搜索关键字命中更多的返回结果2.sort排序-分数相同情况下,按照指定价格域排序3.不考虑文档频率TF/IDF情况下,不同域打分权重不同进行召回4.不考虑文档频率TF/IDF情况下&a…

网站开发专业职业规划会计培训

学习目标 加强理解DMA数据传输过程加强掌握DMA的初始化流程掌握DMA数据表查询理解源和目标的配置理解数据传输特点能够动态配置源数据学习内容 需求 uint8_t data; 串口接收(&data);data有数据了 实现串口的数据接收,要求采用dma的方式。 数据交互流程 CPU配置好DMA外…

网站建设研究的意义一般网站建设

全屏Java.math.BigDecimal.plus()方法实例java.math.BigDecimal.plus() 返回一个BigDecimal,其值是 (this),并且其刻度为是 this.scale().这种方法,它只是返回此BigDecimal是包括对称性与一元减号方法negate()。声明以下是java.math.BigDecim…

建设部网站怎么查询相关专业wordpress加入音乐播放器

最近AWS公布了新的客户端库,它实现了JMS 1.1规范 ,并使用他们的简单队列服务 (SQS)作为JMS提供者 (见杰夫巴尔的帖子在这里 )。 在我的文章中,我将向您展示如何设置Maven项目以使用Spring Frame…

做网站如何自动采集图片wordpress精美免费主题

据悉,诺基亚创新和测试强调了其AirScale无线产品组合的灵活性、可升级性和可扩展性,以适应技术初始应用中实现的5G频段。 通过证明AirScale能够支持低频和高频,运营商将能够从5G推出的第一天提供广泛的覆盖和室内覆盖,而无需进行复…

顶客网站和网摘网站国外wordpress

认识面向对象 Java是一门纯面向对象的语言(Object Oriented Program, OOP),在面向对象的世界里,一切皆为对象。面向对象是解决问题的一种思想,主要依靠对象之间的交互完成一件事情。  面向过程和面相对象并不是一门语言,而是解决…

做网站免费搭建为什么做可信网站

目录1.上传tar包2.解压3. 设置环境变量4.设置Hive的配置文件5.启动Hive6.安装MySQL7.下载MySQL的驱动包8.修改Hive的配置文件9.启动Hive10.查看MySQL数据库 目录 1.上传tar包 jar包地址:http://hive.apache.org/downloads.html 2.解压 tar -zxvf apache-hive-2…

百度网站收录个人作品网站

MySQL中的自动增量AUTO_INCREMENT,是有使用条件的 该列(column)的数据类型必须是数值型(这点容易理解,数值才能自增)该列必须被索引,比如定义为主键(PRIMARY KEY) 感悟…

免费素材下载网站有哪些网站开发实用技术第2版课后答案

文章目录 工作原理代码编写驱动方式全步进驱动半步进驱动微步进驱动 工作原理 工作原理简要说明,和单片机一起配合使用的步进电机多为28BYJ28 五线四相步进电机,配合ULN2003驱动板进行控制,如图所示,对于扭矩、精度要求较高的还有…

58同城网站建设推广排名电商直播app开发公司

一 选择排序 原理:选择排序很简单,他的步骤如下: 从左至右遍历,找到最小(大)的元素,然后与第一个元素交换。从剩余未排序元素中继续寻找最小(大)元素,然后与第二个元素进行交换。以此…

网站建设的方案模板下载个人导航网站怎么备案

目前展示了用Avalonia做几个主要流行的主界面,演示了一下组件的使用。用不同的实现方式实现一些方法。 1、独立大屏展示,类似一个实时监控,这是一种目前很方便的大屏效果。 主要涉及的内内容: (1)窗标题实…

小型网站开发开题报告范文六安杂谈百姓畅言

1. 正所谓圣人云:“无农不稳,无商不富”,“民无利则国不富,民不富则国无税,国无税则兵不强,兵不强则天下危”。 2. 播下一个行动,收获一种习惯;播下一种习惯,收获一种性格…

做网站维护学什么编程语言数字化营销系统

Laravel 中的所有异常都由类App\Exceptions\Handler集中处理,这个类有两个方法:report 和 render。【report 方法】report 方法用于记录异常并将其发送给外部服务。默认情况下,report 方法只是将异常传递给异常基类并写入日志进行记录&#x…

北京网站建设流程兴化建设局网站

前言 MVC(Model-View-Controller)是一种常用的软件架构模式。将MVC应用于Unity3D开发可以提高项目的可维护性和可扩展性,使代码更加清晰和易于理解。本文将详细介绍Unity3D中MVC开发模式的应用以及开发流程,并给出技术详解和代码…

网站开发前端的工作内容是什么中国建设银行网官方网站

随着汽车技术持续快速发展,推动更安全、更智能、更高效的驾驶体验一直是汽车创新的前沿。高级驾驶辅助系统( ADAS ) 是这场技术革命的关键参与者,是 指集成到现代车辆中的一组技术和功能,用于增强驾驶员安全、改善驾驶体验并协助完成各种驾驶任务。它使用传感器、摄像头、雷…