出行项目案例

spark和kafka主要通过Scala实现,Hadoop和HBase主要基于java实现。

通过该项目,主要达到以下目的:

(1)通用的数据处理流程,入门大数据领域

(2)真实体验大数据开发工程师的工作

(3)企业级的项目,利用这个思路可以做二次拓展开发

(4)从0到有,数据抽取、数据存储、数据处理、展现

大数据平台架构图:

大数据没有事务的概念,需要不间断完整地把流程跑完,没有事务回滚的概念。

1. 项目需求

打车、叫车,出行的便捷问题等问题在一个出行平台建设中需要解决,与此同时安全出行也是重中之重,为了增加出行的便捷,提高出行的安全,对我们乘车的细节以及发生点我们迫切的需要及时知道,为此特地通过大数据的手段来处理我们海量的出行数据,做到订单的实时监控,乘车轨迹的的回放,虚拟打车站的选定等功能。

重点:乘车轨迹的的细节回放,虚拟打车站

2. 效果示意图

轨迹回放:

订单监控:指标的计算


3. 技术选型 


我们的项目建设主要是依据数据的生命周期来做的技术选型,目前主要依照的是大家都在用的一些技术,具体生产中应用要考虑实际的场景。比如人员、技术、接入难度、社区、版权等等各种问题

3.1 数据的生命周期

[参考其他数据平台的建设](https://www.sohu.com/a/242008443_468661)

数据的生产(web应用)>数据的传输>数据存储>计算>应用
 

3.2 数据传输

 数据采集:

采集框架名称主要功能版本
flume擅长日志数据的采集和解析1.9.0

消息中间件:

概述版本
KafkaLinkedIn用Scala语言实现,支持hadoop数据并行加载2.6.2

3.3 数据存储

框架名称主要用途版本
Hadoop分布式文件存储系统3.2.2
HbaseKey,value对的nosql数据库2.2.7

3.4 计算框架

框架名称基本介绍版本
Spark基于Spark,一站式解决批流处理问题3.1.1

4. 日志格式

本项目会使用到两份数据,原始文件名称为 order.txt以及gps。其中order.txt数据主要用来做我们的虚拟车站功能,gps主要用来做我们的数据回放功能。 日志存放在网盘中,可以下载,在/root目录下解压

gps数据:

字段名称类型示例备注
DRIVERID司机IDStringglox.jrrlltBMvCh8nxqktdr2dtopmlH
ORDERID订单IDStringjkkt8kxniovIFuns9qrrlvst@iqnpkwz
TIME时间戳String1501584540unix时间戳,单位为秒
LNG经度String104.04392GCJ-02坐标系
LAT纬度String30.67518GCJ-02坐标系

订单数据: 

字段ID字段名称字段样本描述
order_id订单IDstring类型且已脱敏
product_id产品线ID1滴滴专车, 2滴滴企业专车, 3滴滴快车, 4滴滴企业快车
city_id城市ID选取海口当地
district城市区号海口区号
county二级区县记录区县id
type订单时效0实时,1预约
combo_type订单类型1包车,4拼车
traffic_type交通类型1企业时租,2企业接机套餐,3企业送机套餐,4拼车,5接机,6送机,302跨城拼车
passenger_count乘车人数拼车场景,乘客选择的乘车人数
driver_product_id司机子产品线司机所属产品线
start_dest_distance乘客发单时出发地与终点的预估路面距离乘客发单时,出发地与终点的预估路面距离
arrive_time司机点击‘到达’的时间司机点击‘到达目的地’的时间
departure_time出发时间如果是实时单,出发时间(departure_time) 与司机点击‘开始计费’的时间(begin_charge_time)含义相同;如果是预约单,是指乘客填写的出发时间
pre_total_fee预估价格根据用户输入的起始点和目的地预估价格
normal_time时长分钟
bubble_trace_id
product_1level一级业务线1专车,3快车,9豪华车
dest_lng终点经度对应乘客填写的目的地对应的经度
dest_lat终点纬度对应乘客填写的目的地对应的纬度
starting_lng起点经度对应乘客填写的起始点对应的经度
starting_lat起点纬度对应乘客填写的起始点对应的纬度
year年份对应出行的年份
month月份对应出行的月份
day日期对应出行的日期

5. 项目架构

  • 数据采集 flume 去采集 order gps、发往kafka

  • spark 消费kafka数据存入redis里面(实时的监控)

  • spark 消费kafka数据存入hbase里面(计算绿点)

6. 环境搭建

所有的软件安装,请先bing XX分布式环境安装,然后在结合文档看本项目是如何配置的,否则如果你本身不清楚如何安装的,看下面的安装步骤会很懵 前提:我们的集群,使用了三台机器,机器的基础配置建议4C8G+50G-MEM的配置,否则项目会很卡,大数据环境存储基于MEM,计算基于内存,存在许多IO,所以对性能的要求是较高,学习的话建议大家可以按量购买云产品使用

安装环境 centos7.3

  • 所有的软件都基于root用户安装(生产环境中用普通用户),软件都安装在/root目录下

  • 三台机器需要提前设置好免密配置 免密配置参考

  • 所有的文件下载后,都需要改名称,例如mv hadoop-3.2.2 hadoop,其他的软件也需要改名称,目的是方便管理、升级

  • 所有涉及到的脚本都可以在doc目录下找到

节点角色
Hadoop01

HDFS: namenode,datanode,secondarynamenode

YARN: resourcemanager, nodemanager

Kafka

Spark: master,worker

Zookeeper: QuorumPeerMain

HBase: Hmaster,regionServer

Hadoop02

HDFS: datanode,

YARN: nodemanager

Spark: worker

kafka

Zookeeper: QuorumPeerMain

Hbase: regionServer

Hadoop03

HDFS: datanode

YARN: nodemanager

Spark: worker

kafka

flume

redis(docker)

Zookeeper: QuorumPeerMain

Hbase: regionServer

HDFS:基础数据的存储

YARN:计算调度,可以调度本地资源

kafka:流式处理

spark:计算软件,具体任务调度依赖于yarn,yarn依赖于HDFS

6.1 Java1.8安装

需要配置好环境变量

6.2 Hadoop

  • 节点1,下载hadoop wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz

  • 修改配置文件/root/hadoop/etc/hadoopworkersmapred-site.xmlhadoop-env.shyarn-site.xmlcore-site.xmlhdfs-site.xml 具体文件内容参考放在了doc下

    • cd etc/hadoop  然后cat 以下文件

      • workers表示分别部署在哪几台机器上:cat workers

      • mapred-site.xml配置了环境的位置

      • hadoop-env.sh配置了环境变量

      • yarn-site.xml:yarn是分布式调度软件,主要用来配置yarn的主节点在哪里

      • core-site.xml:定义了数据保存到本地的哪个位置,以及数据的副本个数(生产上一般是三个副本或以上)

      • hdfs-site.xml

  • 分发到其他节点 节点2 节点3

  • 启动前要格式化NameNode

  • 启动整体集群:/root/hadoop/sbin/start-all.sh

  • jsp查看启动了哪些进程:NameNode、SecondaryNameNode和DataNode代表的是HDFS的进程节点;ResourceManager和NodeManager是yarn的节点。此时yarn和HDFS就启动好了。

6.3 Spark

官网:spark.apache.org

  • 节点1,下载对应包 wget https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz  (注意spark和hadoop版本的对应)

  • 修改 /root/spark/conf/workersspark-env.sh 两个文件

    • cd conf/     cat workers  : 表示worker节点在哪些地方启动,master在本机启动,workers需要自己配置

  • xsync /root/spark /root/spark 分发到其他节点

  • 启动/root/spark/sbin/start-all.sh

通过java -jar fileOperator.jar将日志中的数据以每秒一条的速度定向写入到dest文件夹的gps文件中,flume实时监控目标文件gsp,并把采集到的数据发送到kafka中,kafka中也是每秒一条的速度进行接收。

6.4 Flume

官网:Welcome to Apache Flume — Apache Flume

  • 节点3,下载 wget https://mirrors.bfsu.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

  • 先把kafka启动起来

  • 在节点3启动agent

  • flume启动:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-mem-kafka-all.conf -Dflume.root.logger=INFO,console &

  • jps中application代表flume进程启动起来了。

6.5 Kafka

  • 节点1,下载 wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz

  • 修改 server.properties 文件,注意brokerid,zookeeper地址修改

  • xsync /root/kafka /root/kafka

  • 三台服务器启动kafka:(前提:三台服务器启动zookeeper,参考:kafka入门-CSDN博客

  • # 三台服务器分别进到zookeeper路径下,启动zookeeper服务:[root@192 local]# cd apache-zookeeper-3.8.4-bin/
    [root@192 apache-zookeeper-3.8.4-bin]# bin/zkServer.sh --config conf start
    
  • bin/kafka-server-start.sh -daemon config/server.properties
    # daemon为后台启动 不占用当前页面
    # 如果启动不生效,可以使用
    [root@192 kafka_2.13-3.8.0]# nohup bin/kafka-server-start.sh config/server.properties &
    

    sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic chengdu

6.6 HBase

  • 节点1,下载wget https://archive.apache.org/dist/hbase/2.2.7/hbase-2.2.7-bin.tar.gz

  • 修改 conf/hbase-site.xml \ regionServers文件

  • 分发到其他三个节点

  • bin/start-hbase.sh

6.7 zookeeper

  • 节点1,下载wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz

  • 修改 zoo.cfg

  • /tmp/zookeeper/添加myid文件,文件内容分别为0,1,2

  • xsync /root/zookeeper /root/hadoop/zookeeper

HBase

6.8 Redis

Redis使用了docker安装,生产中使用的一般是集群,我们这里就不安装redis集群环境了

docker run --name myredis -p 6379:6379 -v /home/disk1/redis:/data -d redis redis-server --appendonly yes

7. 一致性语义

流处理的一致性语义

kafka 是一个stream消息系统,包含三种角色:

Producer:写数据  -->  broker:存储数据  -->  consumer:从kafka消费数据

1. 一个是Producer到broker端数据如果没有确认,则重复发送,确保数据不丢失
2. consumer消费数据如果失败,则从失败的位置开始消费,并且需要做一个去重

保证数据不丢失,并且数据是不重复的

at-most-once:数据最多发送一次。适用于不太重要的日志数据。

如果在 ack 超时或返回错误时 producer 不重试,则该消息可能最终不会写入 Kafka,因此不会传递给 consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。ack=0.

at-least-once:至少发送一次

如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终 consumer,这种策略可能导致重复的工作和不正确的结果。

exactly-once:精准一次

producer没有收到broker的回复时,需要重复发送数据,但是即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer(通过consumer端的重复值校验:Redis重复id去重的性质)。该语义是最理想的,但也难以实现,因为它需要消息系统本身与生产和消费消息的应用程序进行协作。ack =-1

并且设置offset自身存储,防止consumer消费失败。

Kafka

主要失败原因

- Broker失败:Kafka 作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是 n 份),所以 Kafka 可以容忍 n-1 个 broker 故障,意味着一个分区只要至少有一个 broker 可用,分区就可用。Kafka 的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本(ISR)。
- Producer 到 Broker 的 RPC 失败:Kafka 的持久性依赖于生产者接收broker 的 ack 。没有接收成功 ack 不代表生产请求本身失败了。broker 可能在写入消息后,发送 ack 给生产者的时候挂了,甚至 broker 也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在 Kafka 分区日志中重复,进而造成消费端多次收到这条消息。
- 客户端也可能会失败:Exactly-once delivery 也必须考虑客户端失败的情况。但是如何去区分客户端是真的挂了(永久性宕机)还是说只是暂时丢失心跳?追求正确性的话,broker 应该丢弃由 zombie producer 发送的消息。 consumer 也是如此,一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点( safe checkpoint )开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。

1. 增加业务处理

- 为每个消息增加唯一主键,生产者不做处理,由消费者根据主键去重
- producer(flume)设置ack=-1确保数据不丢失(flume的配置文件file-mem-kafka-all.conf中设置ack=-1,表示必须给回执,否则一直发送重复的消息)
- 消费者,往往是我们的流式任务,我们需要关闭自动提交 offset 的功能,业务保存offset,将保存offset与消费操作放入到一个事务当中去执行

Sparkstreming/Flink

- 支持一致性语义,只要是在任务调度过程中失败了,那么会去寻找checkpoint 拿到最新的副本数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70600.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PyEcharts 数据可视化:从入门到实战

一、PyEcharts 简介 PyEcharts 是基于百度开源可视化库 ECharts 的 Python 数据可视化工具,支持生成交互式的 HTML 格式图表。相较于 Matplotlib 等静态图表库,PyEcharts 具有以下优势: 丰富的图表类型(30)动态交互功…

五、Three.js顶点UV坐标、纹理贴图

一部分来自1. 创建纹理贴图 | Three.js中文网 ,一部分是自己的总结。 一、创建纹理贴图 注意:把一张图片贴在模型上就是纹理贴图 1、纹理加载器TextureLoader 注意:将图片加载到加载器中 通过纹理贴图加载器TextureLoader的load()方法加…

使用postman测试api接口基本步骤

测试一个已写好的 API 接口需要系统性地验证其功能、性能、安全性及异常处理能力。以下是使用 Postman 进行 API 接口测试的详细步骤和注意事项: 1. 确认接口文档 明确输入输出:了解接口的请求方法(GET/POST/PUT/DELETE)、URL、请…

綫性與非綫性泛函分析與應用_1.例題(下)-半母本

第1章 實分析與函數論:快速回顧(下) 五、基數;有限集和無限集相關例題 例題1:集合基數的判斷 判斷集合和集合B=\{a,b,c,d,e\}的基數關係。 解析: 可以構造一個雙射,例如,,,,。 所以,兩個集合具有相同的基數。 例題2:可數集的證明 證明整數集是可數集。 解析: …

Windows系统第一次运行C语言程序,环境配置,软件安装等遇到的坑及解决方法

明确需要编辑器和编译器,并选择自己要用什么(我选的编辑器是VSCode:Visual Studio Code;编译器是gcc)下载VSCode并配置环境变量(这里没啥问题),安装C/C的拓展安装Cygwin,…

浅拷贝和深拷贝的区别?可以举例说明

在编程中,浅拷贝和深拷贝是用于复制对象的两种不同方式,它们的主要区别在于复制对象时对对象内部成员的处理方式。今天我们对此进行讨论。 目录 1 浅拷贝 2 深拷贝 1 浅拷贝 浅拷贝创建一个新对象,新对象的属性值会复制原始对象的属性值…

微信小程序实现拉卡拉支付

功能需求:拉卡拉支付(通过跳转拉卡拉平台进行支付),他人支付(通过链接进行平台跳转支付) 1.支付操作 //支付 const onCanStartPay async (obj) > {uni.showLoading({mask: true})// 支付接口获取需要传…

使用ESP-IDF来驱动INMP441全向麦克风

之前的文章我们讲过了I2S。 I2S是什么通信协议?它如何传输音频数据?它和I2C是什么关系?_i2c接口和i2s-CSDN博客文章浏览阅读836次,点赞12次,收藏14次。这个可以参考ADC来理解,我们的ADC也是有左对齐和右对…

MobaXterm_Portable_v23.2 免费下载与使用教程(附安卓替代方案)

一、MobaXterm_Portable 简介 MobaXterm 是一款功能强大的全能终端工具,支持 SSH、SFTP、RDP、VNC、X11 转发 等多种协议,集成了终端、文件传输、远程桌面等功能。其便携版(Portable Edition)无需安装,解压即可使用&a…

【带你 langchain 双排系列教程】6.LangChain多模态输入与自定义输出实战指南

一、为什么需要多模态交互? 在真实业务场景中,数据从来都不是单一形式的。想象一个智能客服系统需要同时分析用户的文字描述、上传的产品图片和语音留言,或者一个内容审核系统需要检查文本、图像和视频的组合内容。传统单一模态的处理方式已…

【Bluedroid】AVRCP 连接源码分析(三)

接着上一篇【Bluedroid】AVRCP 连接源码分析(一)-CSDN博客,继续AVRCP连接的源码分析。 AVRC_OpenBrowse /packages/modules/Bluetooth/system/stack/avrc/avrc_api.cc /*****************************************************************…

基于大语言模型的推荐系统(1)

推荐系统(recommendation system)非常重要。事实上,搜索引擎,电子商务,视频,音乐平台,社交网络等等,几乎所有互联网应用的核心就是向用户推荐内容,商品,电影&…

高性能GPU计算:释放计算潜力的加速利器

高性能GPU计算:释放计算潜力的加速利器 大家好,我是Echo_Wish,今天我们来聊一聊 高性能GPU计算。近年来,随着人工智能、深度学习、科学计算等领域的快速发展,GPU(图形处理单元)作为计算加速的核心技术,逐渐成为数据处理的“核心大脑”。尤其是在深度学习模型训练和大规…

QT闲记-状态栏,模态对话框,非模态对话框

1、创建状态栏 跟菜单栏一样,如果是继承于QMainWindow类,那么可以获取窗口的状态栏,否则就要创建一个状态栏。通过statusBar()获取窗口的状态栏。 2、添加组件 通常添加Label 来显示相关信息,当然也可以添加其他的组件。通过addWidget()添加组件 3、设置状态栏样式 …

SHELL32!SHLoadPopupMenu函数分析之添加属性菜单项

SHELL32!SHLoadPopupMenu函数分析之添加属性菜单项 第一部分: // // user does not support pop-up only menu. // STDAPI_(HMENU) SHLoadPopupMenu(HINSTANCE hinst, UINT id) { HMENU hmenuParent LoadMenu(hinst, MAKEINTRESOURCE(id)); if (hmenuPare…

将RocketMQ集成到了Spring Boot项目中,实现站内信功能

1. 添加依赖 首先,在pom.xml中添加RocketMQ的依赖: <dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot

C语言基础要素:安装 Visual Studio 2022

安装 Visual Studio 2022 Visual Studio 是由微软开发的一款集成开发环境&#xff08;IDE&#xff09;&#xff0c;支持多种编程语言和开发平台。它提供了丰富的工具和功能&#xff0c;帮助开发者高效地编写、调试和部署应用程序。无论是桌面应用、Web 应用还是移动应用&#…

[ TypeScript ] “undefined extends xxx“ 总是为 true 的 bug

版本号 "typescript": "^5.7.3", "unplugin": "^2.2.0",说明 在使用 unplugin 时 , 我定义插件的参数是 必填的, 使用时却是一个可空参数, 不传参也不会报错, (options?: UserOptions) > Return &#x1f632;&#x1f632;&…

[通俗易懂C++]:指针和const

之前的文章有说过,使用指针我们可以改变指针指向的内容(通过给指针赋一个新的地址)或者改变被保存地址的值(通过给解引用指针赋一个新值): int main() {int x { 5 }; // 创建一个整数变量 x&#xff0c;初始值为 5int* ptr { &x }; // 创建一个指针 ptr&#xff0c;指向 …

华为昇腾服务器(固件版本查询、驱动版本查询、CANN版本查询)

文章目录 1. **查看固件和驱动版本**2. **查看CANN版本**3. **其他辅助方法**注意事项 在华为昇腾服务器上查看固件、驱动和CANN版本的常用方法如下&#xff1a; 1. 查看固件和驱动版本 通过命令行工具 npu-smi 执行以下命令查看当前设备的固件&#xff08;Firmware&#xff0…