济南做网站的公司有哪些公司企业官网建设

pingmian/2026/1/21 0:23:15/文章来源:
济南做网站的公司有哪些,公司企业官网建设,中国企业排行,广告视频素材网站1.3 SparkStreaming与Kafka整合 1.3.1 整合简述 kafka是做消息的缓存#xff0c;数据和业务隔离操作的消息队列#xff0c;而sparkstreaming是一款准实时流式计算框架#xff0c;所以二者的整合#xff0c;是大势所趋。 ​ 二者的整合#xff0c;有主要的两大版本。 kaf…1.3 SparkStreaming与Kafka整合 1.3.1 整合简述 kafka是做消息的缓存数据和业务隔离操作的消息队列而sparkstreaming是一款准实时流式计算框架所以二者的整合是大势所趋。 ​ 二者的整合有主要的两大版本。 kafka作为一个实时的分布式消息队列实时的生产和消费消息在实际开发中Spark Streaming经常会结合Kafka来处理实时数据。Spark Streaming 与 kafka整合需要引入spark-streaming-kafka.jar该jar根据kafka版本有2个分支分别是spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。jar包分支选择原则 0.10.0kafka版本0.8.2.1选择 08 接口 kafka版本0.10.0选择 010 接口 sparkStreaming和Kafka整合一般两种方式Receiver方式和Direct方式 Receiver方式(介绍) Receiver方式基于kafka的高级消费者API实现高级优点高级API写起来简单不需要去自行去管理offset系统通过zookeeper自行管理不需要管理分区副本等情况系统自动管理消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据高级缺点不能自行控制 offset不能细化控制如分区、副本、zk 等。Receiver从kafka接收数据存储在Executor中Spark Streaming 定时生成任务来处理数据。 默认配置的情况Receiver失败时有可能丢失数据。如果要保证数据的可靠性需要开启预写式日志简称WALWrite Ahead LogsSpark1.2引入只有接收到的数据被持久化之后才会去更新Kafka中的消费位移。接收到的数据和WAL存储位置信息被可靠地存储如果期间出现故障这些信息被用来从错误中恢复并继续处理数据。 还有几个需要注意的点 在Receiver的方式中Spark中的 partition 和 kafka 中的 partition 并不是相关的如果加大每个topic的partition数量仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度 对于不同的 Group 和 Topic 可以使用多个 Receiver 创建不同的Dstream来并行接收数据之后可以利用union来统一成一个Dstream 如果启用了Write Ahead Logs复制到文件系统如HDFS那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER) WAL将接收的数据备份到HDFS上保证了数据的安全性。但写HDFS比较消耗性能另外要在备份完数据之后还要写相关的元数据信息这样总体上增加job的执行时间增加了任务执行时间 总体上看 Receiver 方式不适于生产环境 1.3.2  Direct的方式 Direct方式从Spark1.3开始引入的通过 KafkaUtils.createDirectStream 方法创建一个DStream对象Direct方式的结构如下图所示。 Direct 方式特点如下 对应Kafka的版本 0.8.2.1 Direct 方式 Offset 可自定义 使用kafka低阶API 底层实现为KafkaRDD 该方式中Kafka的一个分区与Spark RDD对应通过定期扫描所订阅Kafka每个主题的每个分区的最新偏移量以确定当前批处理数据偏移范围。与Receiver方式相比Direct方式不需要维护一份WAL数据由Spark Streaming程序自己控制位移的处理通常通过检查点机制处理消费位移这样可以保证Kafka中的数据只会被Spark拉取一次。 引入依赖 dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion3.1.2/version /dependency 模拟kafka生产数据 package com.qianfeng.sparkstreaming ​ import java.util.{Properties, Random} ​ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} ​ /*** 向kafka中test主题模拟生产数据也可以使用命令行生产kafka-console-producer.sh --broker-list qianfeng01:9092,hadoop02:9092,hadoop03:9092 -topic test*/ object Demo02_DataLoad2Kafka {def main(args: Array[String]): Unit {val prop new Properties()//提供Kafka服务器信息prop.put(bootstrap.servers,qianfeng01:9092)//指定响应的方式prop.put(acks,all)//请求失败重试的次数prop.put(retries,3)//指定key的序列化方式key是用于存放数据对应的offsetprop.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer)//指定value的序列化方式prop.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer)//创建producer对象val producer new KafkaProducer[String,String](prop)//提供一个数组数组中数据val arr Array(hello tom,hello jerry,hello dabao,hello zhangsan,hello lisi,hello wangwu,)//提供一个随机数随机获取数组中数据向kafka中进行发送存储val r new Random()while(true){val message arr(r.nextInt(arr.length))producer.send(new ProducerRecord[String,String](test,message))Thread.sleep(r.nextInt(1000))   //休眠1s以内}} } 实时消费kafka数据 package com.qianfeng.sparkstreaming ​ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} ​ ​ /*** sparkStreaming消费Kafka中的数据*/ object Demo03_SparkStreamingWithKafka {def main(args: Array[String]): Unit {//1.创建SparkConf对象val conf new SparkConf().setAppName(SparkStreamingToKafka).setMaster(local[*])//2.提供批次时间val time Seconds(5)//3.提供StreamingContext对象val sc new StreamingContext(conf, time)//4.提供Kafka配置参数val kafkaConfig Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG - qianfeng01:9092,ConsumerConfig.GROUP_ID_CONFIG - qianfeng,key.deserializer - org.apache.kafka.common.serialization.StringDeserializer,value.deserializer - org.apache.kafka.common.serialization.StringDeserializer,)//5.读取Kafka中数据信息生成DStreamval value KafkaUtils.createDirectStream(sc,//本地化策略将Kafka的分区数据均匀的分配到各个执行Executor中LocationStrategies.PreferConsistent,//表示要从使用kafka进行消费【offset谁来管理从那个位置开始消费数据】ConsumerStrategies.Subscribe[String, String](Set(test), kafkaConfig))//6.将每条消息kv获取出来val line: DStream[String] value.map(record record.value())//7.开始计算操作line.flatMap(_.split( )).map((_, 1)).reduceByKey(_ _).print()//line.count().print()   //每隔5s的数据条数//8.开始任务sc.start()sc.awaitTermination()} } 说明 简化的并行性不需要创建多个输入Kafka流并将其合并。 使用directStreamSpark Streaming将创建与使用Kafka分区一样多的RDD分区这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分区之间有一对一的映射关系。 效率在第一种方法中实现零数据丢失需要将数据存储在预写日志中这会进一步复制数据。这实际上是效率低下的因为数据被有效地复制了两次:一次是Kafka另一次是由预先写入日志WriteAhead Log复制。这个第二种方法消除了这个问题因为没有接收器因此不需要预先写入日志。只要Kafka数据保留时间足够长。 正好一次Exactly-once的语义第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移量。传统上这是从Kafka消费数据的方式。虽然这种方法结合提前写入日志可以确保零数据丢失即至少一次语义但是在某些失败情况下有一些记录可能会消费两次。发生这种情况是因为Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此在第二种方法中我们使用不使用Zookeeper的简单Kafka API。在其检查点内Spark Streaming跟踪偏移量。这消除了Spark Streaming和Zookeeper/Kafka之间的不一致因此Spark Streaming每次记录都会在发生故障的情况下有效地收到一次。为了实现输出结果的一次语义将数据保存到外部数据存储区的输出操作必须是幂等的或者是保存结果和偏移量的原子事务。 Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/87345.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站建设技术部奖惩制度外发加工网接单

在 Oracle 和 OB Oracle 租户下调用存储过程时,两者表现并不一致,导致获取到的 SQL 文本拼接不完整,影响到了业务侧的功能测试。本文将针对这个问题进行相关的测试和验证。 作者:赵黎明,爱可生 MySQL DBA 团队成员&…

网站开发专业都有哪些课程Wordpress虚拟域名

初中英语语法——形容词与副词(二)比较的用法语法解释1、形容词与副词比较级和最高级的规则变化单音节词与部分双音节词:(1)一般情况加-er,-estlong-longer-longest strong-stronger-strongestclean-cleaner-cleanest(2)以不发音的e结尾的词,…

现在网站开发哪个语言好宿州市住房和城乡建设局网站

简介:vPaaS全新定义企业级音视频应用开发 1月5日,阿里云视频云“低代码音视频工厂vPaaS“正式上线,极大程度降低音视频开发门槛,打破传统音视频技术壁垒,全新定义企业级的音视频应用开发。 低代码音视频工厂基于云原生…

电子商务网站建设岗位要求做金融类网站

前后端交互时的数据传输模型 前后端交互流程 前后端交互的流程: 前端与后端开发人员之间主要依据接口进行开发 前端通过Http协议请求后端服务提供的接口后端服务的控制层Controller接收前端的请求Contorller层调用Service层进行业务处理Service层调用Dao持久层对数据持久化 …

景区官方网站建设方案网站屏蔽省份

气候变化对农业、生态系统、社会经济以及人类的生存与发展具有深远影响,是当前全球关注的核心议题之一。IPCC(Intergovernmental Panel on Climate Change,政府间气候变化专门委员会)的第六次评估报告明确;指出&#x…

app手机软件开发沧州快速关键词排名优化

文章目录1. 问题描述2. 问题分析2.1 回溯法求解2.2 DP状态转移方程法2.3 DP状态转移表法1. 问题描述 找零问题,在贪心算法讲过。但是贪心不一定能得出最优解。假设有几种不同币值的硬币v1,v2,.……vn(单位是元)。如果…

手游网站建设方案预算短视频制作自学教程

一.选择字体 二.批量替换 编辑—>替换相同字体

网站建设是属于虚拟产品吗网站设计公司电话

目录 无法加载响应数据解决 无法加载响应数据 上线项目时 改用服务器上的redis和MySQL 出现请求能请求到后端,后端也能正常返回数据,但是在前端页面会显示 以为是跨域问题,但是环境还在本地,排除跨域问题以为是服务器问题&#…

外国教程网站有哪些网站推广公司需要多少钱

Service的表现形式为IP地址端口号的方式,即工作在TCP/IP层,而对于基于HTTP的服务来说,Service机制很难实现,7层应用的复杂转发逻辑。kubernetes在1.1版本开始引入ingress资源对象,用于将集群外部的客户端请求路由到集群…

苏州网站建设培训开发一个商城网站需要多少钱

SpringBoot中的RestTemplate使用笔记 为了方便使用,这里我封装成一个工具类来静态调用RestTemplate以下代码是基于SpringBoot2.4.2版本写的案例 需要配置的application.yml如下 server:port: 7024servlet:context-path: /demosession:timeout: 30m #默认会话过期…

太和县建设银行网站网站开发技巧

1.安装 sudo apt-get install mysql-server sudo mysql -u root -p2.关系模型 在关系数据库中,一张表中的每一行数据被称为一条记录。一条记录就是由多个字段组成的。 每一条记录都包含若干定义好的字段。同一个表的所有记录都有相同的字段定义。 对于关系表&#…

滨湖区知名做网站选哪家备案 几个网站

1.问题简介 1.1问题描述 在这个问题中,你将面临一个经典的机器学习分类挑战——猫狗大战。你的任务是建立一个分类模型,能够准确地区分图像中是猫还是狗。 1.2预期解决方案 你的目标是通过训练一个机器学习模型,使其在给定一张图像时能够准…

网站建设经验交流发言wordpress底部自豪采用

通信公司“内鬼” 批量提供手机卡 超6万张手机卡用来发涉赌短信 2023年10月2日,据报道2022年12月,湖北省公安厅“雷火”打击整治治安突出问题专项行动指挥部研判发现,有人在湖北随州利用虚拟拨号设备GOIP发出大量赌博短信。随州市公安局研判…

网站建设+人员+年终总结响应式网站算几个页面

1143.最长公共子序列 题目要求:给定两个字符串 text1 和 text2,返回这两个字符串的最长公共子序列的长度。 一个字符串的 子序列 是指这样一个新的字符串:它是由原字符串在不改变字符的相对顺序的情况下删除某些字符(也可以不删…

湖南餐饮网站建设上海建站模板厂家

给定一个 n 个点 m 条边的有向图,图中可能存在重边和自环,所有边权均为非负值。 请你求出 1 号点到 n 号点的最短距离,如果无法从 1 号点走到 n 号点,则输出 −1 。 输入格式 第一行包含整数 n 和 m 。 接下来 m 行每行包含三个…

wordpress 免费主题站交互 网站

这是目录 **一、引言****二、Spring Boot Starter基本概念****三、Spring Boot Starter的主要特点****四、Spring Boot Starter的应用场景****五、Spring Boot Starter的实现原理****六、自定义spring boot starter****为什么要创建自定义Starter?****创建自定义Spr…

小蘑菇网站建设软件要做好网络营销首先要

描述 E5071C网络分析仪提供同类产品中最高的RF性能和最快的速度,具有宽频率范围和多功能。E5071C是制造和R&D工程师评估频率范围高达20 GHz的RF元件和电路的理想解决方案。特点: 宽动态范围:测试端口的动态范围> 123 dB(典型值)快速测量速度:41毫秒全2端口…

网站seo自己怎么做网站开发者工具的网络选项

1、题目 2、工具 jd-gui:Java反汇编器。 ​python:编写自动化脚本。 3、方法 下载压缩包,解压得到一个.class文件。 ​题目已经说了是java逆向,所以使用jd-gui打开该文件。 代码如下: import java.io.PrintStream; …

如何建设网站的论文跨境电商亚马逊

预加载图片是提高用户体验的一个很好方法。图片预先加载到浏览器中,访问者便可顺利地在你的网站上冲浪,并享受到极快的加载速度。这对图片画廊及图片占据很大比例的网站来说十分有利,它保证了图片快速、无缝地发布,也可帮助用户在…

投资建设网站一站式服务门户

3.组件插槽 3-1组件插槽 注意 插槽内容可以访问到父组件的数据作用域,因为插槽内容本身就是在父组件模版中定义的 插槽内容无法访问子组件的数据.vue模版中的表达式只能访问其定义时所处的作用域,这和JavaScript的词法作用域是一致的,换言之: 父组件模版的表达式只能访问父组…