珠海移动网站建设公司排名pagespeed WordPress

web/2025/9/30 19:22:34/文章来源:
珠海移动网站建设公司排名,pagespeed WordPress,网站制作网站建,专业设计网站推荐文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器#xff0c;将文件看作流来读取数据。如下面的例子所示#xff1a; StreamExe… 文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器将文件看作流来读取数据。如下面的例子所示 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat new TextInputFormat(null);DataStreamSourceString source env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);StreamExecutionEnvironment.readFile()接收如下参数 FileInputFormat参数负责读取文件中的内容。文件路径。如果文件路径指向单个文件那么将会读取这个文件。如果路径指向一个文件夹FileInputFormat将会扫描文件夹中所有的文件。PROCESS_CONTINUOUSLY将会周期性的扫描文件以便扫描到文件新的改变。30000L表示多久扫描一次监听的文件。 FileInputFormat是一个特定的InputFormat用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后这些splits可以分发到不同的读任务这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split读取被split定义的文件范围然后返回对应的数据。 DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。 在Flink 1.7中Flink提供了一些类这些类继承了FileInputFormat并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件而CsvInputFormat使用逗号分隔符来读取文件。 二、flink 写入文件系统——StreamFileSink 该Sink不但可以将数据写入到各种文件系统中而且整合了checkpoint机制来保证Exacly Once语义还可以对文件进行分桶存储还支持以列式存储的格式写入功能更强大。 streamFileSink中输出的文件其生命周期会经历3中状态 in-progress Files 当前文件正在写入中Pending Files 当处于 In-progress 状态的文件关闭closed了就变为 Pending 状态Finished Files 在成功的 Checkpoint 后Pending 状态将变为 Finished 状态 下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 ! 数据文件格式是行式存储格式 BucketAssignerString, String assigner new DateTimeBucketAssigner(yyyy-MM-dd, ZoneId.of(Asia/Shanghai));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(savePath),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();其中特别说明了如果使用 FileSink 在 STREAMING 模式的时候必须开启 checkpoint不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态不能保证整个写入流程的安全性。 所以在我们上述的示例中我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后 ​ 将数据以列式存储的格式输出到文件中 三、查看完整代码 import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId; import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK状态后端env.setStateBackend(new FsStateBackend(hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略重启3次间隔10s// 使用 externalized checkpoints这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com;String savePath hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01;TextInputFormat textInputFormat new TextInputFormat(null);DataStreamSourceString source env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssignerString, String assigner new DateTimeBucketAssigner(yyyy-MM-dd, ZoneId.of(Asia/Shanghai));StreamingFileSinkString fileSink StreamingFileSink.StringforRowFormat(new Path(savePath),new SimpleStringEncoder(UTF-8)).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line - JSONObject.parseObject(line)).filter(line - line.getString(text).length() 200 line.getInteger(id) % 7 0).map(line - JSON.toJSONString(line)).addSink(fileSink);env.execute();} }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/84615.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深圳仿站定制模板建站建设网站的基本工作流程

思路: 1、定义一个空列表来存储所有的奇数 2、判断是奇数就追加到列表的末尾 3、打印所有的奇数 代码如下: list [ ] #定义一个列表来存储所有的奇数 for i in range (1,100):if i % 2 ! 0: #判断是否为奇数list.append(i) #追加到列表的末尾 prin…

个人博客网站设计的目的用wordpress修改现有网页

游戏AI:大模型在游戏内容生成与交互体验优化中的应用 1. 背景介绍 随着人工智能技术的不断发展,游戏AI已经从简单的决策树和有限状态机,发展到了基于机器学习和深度学习的复杂系统。大模型,如GPT-3等,在游戏内容生成…

博客网站设计及说明单位网站建设的请示

Zeng Q, Chen B, Zhang S, et al. Full-scale ab initio simulations of laser-driven atomistic dynamics[J]. npj Computational Materials, 2023, 9(1): 213.核心研究内容: 本文研究了激光驱动的原子动力学的全尺度从头算模拟。研究的重点是探讨在极端条件下材料…

网站技术的解决方案网站 蓝色

**单片机设计介绍, 基于单片机体温心率检测仪系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机体温心率检测仪系统设计是一个综合性的项目,旨在通过单片机及其外围电路实现对人体体温和心…

网站建设模块需求分析wordpress游客登录可见

Java中synchronized的使用实例synchronized关键字,代表这个方法加锁,相当于不管哪一个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、D等)正在用这个方法,若有则要等正在使用synchronized方法的线…

微信的微网站模板论坛网站模板免费下载

⚠申明: 未经许可,禁止以任何形式转载,若要引用,请标注链接地址。 全文共计3077字,阅读大概需要3分钟 🌈更多学习内容, 欢迎👏关注👀【文末】我的个人微信公众号&#xf…

济南 制作网站 公司Linux主机设置网站首页

本文主要阐述HDFSRPC安全认证相关的实现。主要介绍Token相关的实现。 写在前面 相关blog https://blog.csdn.net/hncscwc/article/details/124722784 https://blog.csdn.net/hncscwc/article/details/124958357 Token由来 在探究完Kerberos,我一直在想一个问题…

淘宝网站建设的策划书做阀门网站电话

一、虚拟地址转换成物理地址 涉及到的部件: MMU:虚拟地址—MMU—>物理地址。MMU会控制整个流程(查快表、查慢表等等)TLB快表:组号(若为组相联TLB)、TLB标记、有效位、页框号页表&#xff08…

网站建设 网站开发 区别做网站开发需要什么技能

2019独角兽企业重金招聘Python工程师标准>>> 欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。 删除元素 删除元素本身比较简单,就是采用二叉树的删除规则。 (1)如果删除的位置有两…

小企业网站 优帮云网站的设计与制作

目录 一、概述 二、持续集成的典型操作流程 2.1 概述 2.2 持续集成的操作流程图 2.3 持续集成关键流程说明 三、构建持续集成流水线的方式 3.1 依托云厂商能力 3.2 采用开源产品 3.3 企业自研 四、构建持续化集成流水线 4.1 基于GitHub的持续集成流水线(公…

江苏鑫圣建设工程有限公司网站营销公司排行

变量命名的规范,对于我们编程,大家都知道是非常重要的,上次给大家推荐过一个命名辅助工具《程序员还在为变量取名苦恼,那是因为你不知道,这个变量命名神器》,但大家一致反馈存在2个问题:1、网速…

台州网站平面设计做网站哪些方面会侵权

之前使用 html2canvas 和 jsPDF 实现html转pdf,但是客户说不能复制pdf中的文字,要改一下,先说不能复制的方法,再说可以复制的方法 一,html2canvas 和 jsPDF(图片插入pdf不可复制) 创建pdf.js文…

如何快速找到做网站的客户河北石家庄网络公司

目录 1.1 三张报表的作用 1.2 三张报表长的样子 1.2.1 资产负债表 1.2.2 利润表 1.2.3 现金流 1.3 BI指标构建 1.3.1 盈利能力指标构建 1.3.2 营运能力指标构建 1.3.3 偿债能力指标构建 转眼间,一年又悄然而逝,时光荏苒,岁月如梭 &a…

各类大型网站建设变身变装 wordpress

假如按工业交换机的端口号构造来分,工业交换机大概可分成:固定不动端口号工业交换机和模块化设计工业交换机二种不一样的构造。实际上也有一种是二者兼具,那便是在出示基础固定不动端口号的基本以上再配置一定的拓展扩展槽或控制模块。今天&a…

wordpress网站搬迁黄浦做网站

目录 1、简介 2、添加WS配置 3、发送请求 4、处理请求 5、常用模式和用例 6、自定义BodyReadables和BodyWritables 6、独立WS 7、访问AsyncHttpClient 8、配置WS 1、简介 有时我们想从一个play应用程序中调用其他HTTP服务。Play提供了WS库来进行异步HTTP方法调用。 …

上虞网站建设文广网络域名买卖网站

基本的XML Schema的使用就是这样,下面我们要介绍XML Schema的另外一个核心的部分也是Schema最关键的一个部分,就是关于Schema的名字空间(namespace)的问题。在上面关于XML Schema的介绍中,为了把读者集中到对XML Schema的语法理解上&#xff…

网站与网页的区别最新网站架构

小编杂谈新能源已经完成了至少5期的博文了,Boss告诉小编,如果还不介绍我们的产品和方案,黄花菜都凉了,所以小编这期博文就重点介绍一下Microchip在储能上的产品介绍,重点聊聊Microchip储能中使用的光伏逆变器的解决方案…

国外创意海报设计网站seo 网站标题长度

struts2 1-1:为什么每次请求都要创建一个Action对象? 是出于对线程安全的考虑,每个request都不会相互影响 1-2:ModelDriven拦截器的配置中refreshModelBeforeResult解决了什么问题? 先把旧的model对象从ValueStack…

缅甸网站后缀公司刚做网站在那里找图片做

我们经常在实际开发中会用到一些转换类,比如在金融界中,我们需要将1转换为“壹”,2转换成“贰”。还有类似这样的需求,食堂在一周内每天的菜单都是不一样的,周一为鱼香肉丝鸡腿,周二为爆炒土豆丝鲅鱼&#…

深圳定制网站制作报价北京网站建设推荐华网天下

C语言中文件定位函数主要是:fseek, ftell, fsetpos, fgetpos。 先来讲前两个函数,这是最基本的定位函数: fseek函数:能把文件指针移动到文件任何位置,其原型是:int fseek(FILE *fp, long offset, int fromw…