从零开始学Flink:实时流处理实战 - 教程

news/2025/11/8 16:49:55/文章来源:https://www.cnblogs.com/gccbuaa/p/19202650

在大数据处理领域,实时流处理正变得越来越重要。Apache Flink作为领先的流处理框架,提供了强大而灵活的API来处理无界数据流。本文将通过经典的SocketWordCount示例,深入探讨Flink实时流处理的核心概念和实现方法,帮助你快速掌握Flink流处理的实战技能。

一、实时流处理概述

1. 流处理的基本概念

流处理是一种持续处理无界数据的计算范式。与批处理不同,流处理系统需要在数据到达时立即处理,而不是等待完整数据集收集完毕。在Flink中,所有数据都被视为流,无论是有界的历史数据还是无界的实时数据流。

2. Flink流处理的优势

  • 低延迟: 毫秒级的数据处理延迟
  • 高吞吐: 能够处理大规模的数据流量
  • 精确一次处理: 通过检查点机制确保数据只被处理一次
  • 灵活的时间语义: 支持处理时间、事件时间和摄取时间
  • 丰富的状态管理: 内置多种状态后端,支持大规模状态存储

二、环境准备与依赖配置

1. 版本说明

  • Flink:1.20.1
  • JDK:17+
  • Gradle:8.3+

2. 核心依赖

dependencies {// Flink核心依赖implementation 'org.apache.flink:flink_core:1.20.1'implementation 'org.apache.flink:flink-streaming-java:1.20.1'implementation 'org.apache.flink:flink-clients:1.20.1'
}

三、SocketWordCount示例详解

1. 功能介绍

SocketWordCount是Flink中的经典示例,它通过Socket接收实时数据流,对数据流中的单词进行计数,并将结果实时输出。这个示例虽然简单,但包含了Flink流处理的核心要素:数据源连接、数据转换、并行处理和结果输出。

2. 完整代码实现

package com.cn.daimajiangxin.flink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
import java.time.Duration;
public class SocketWordCount {
public static void main(String[] args) throws Exception {
// 1. 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,确保容错性
env.enableCheckpointing(5000); // 每5秒创建一次检查点
// 设置并行度
env.setParallelism(2);
// 2. 从Socket读取数据
String hostname = "localhost";
int port = 9999;
// 支持命令行参数传入
if (args.length > 0) {
hostname = args[0];
}
if (args.length > 1) {
port = Integer.parseInt(args[1]);
}
DataStream<String> text = env.socketTextStream(hostname,port,"\n", // 行分隔符0);   // 最大重试次数// 3. 数据转换DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0)//添加基于处理时间的滚动窗口计算.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))// 使用sum聚合算子.sum(1);// 4. 输出结果wordCounts.print("Word Count");// 5. 启动作业env.execute("Socket Word Count");}// 可选:使用传统的FlatMapFunction实现方式public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(Tuple2.of(word, 1));}}}}}

3. 代码解析

3.1 执行环境创建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

这段代码创建了Flink的执行环境,并设置了并行度为2。执行环境是所有Flink程序的入口点,它负责管理作业的执行。

3.2 数据源连接
DataStream<String> text = env.socketTextStream(hostname, port);

这里使用socketTextStream方法从Socket连接中读取文本数据。这是Flink提供的一种内置数据源连接器,适用于测试和演示。

3.3 数据转换
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0)  // 按单词分组.sum(1);  // 累加计数

数据转换包含三个关键步骤:

  • 分词: 使用flatMap操作将每行文本分割成单词,并为每个单词生成(word, 1)的元组
  • 分组: 使用keyBy操作按单词进行分组
  • 聚合: 使用sum操作对每个单词的计数进行累加
3.4 结果输出
wordCounts.print("Word Count");

使用print方法将结果输出到控制台,这是一种内置的输出方式,非常适合调试和演示。

3.5 作业启动
env.execute("Socket Word Count");

最后,调用execute方法启动作业。注意,Flink程序是惰性执行的,只有调用execute方法才会真正触发计算。

四、Flink并行流处理机制

1. 并行度概念

并行度是指Flink程序中每个算子可以同时执行的任务数量。在SocketWordCount示例中,我们设置了全局并行度为2,这意味着每个算子都会有2个并行实例。

2. 数据流分区策略

Flink支持多种数据流分区策略,包括:

  • Forward Partitioning: 保持数据分区,一个输入分区对应一个输出分区
  • Shuffle Partitioning: 随机将数据分发到下游算子的分区
  • Rebalance Partitioning: 轮询将数据分发到下游算子的分区
  • Rescale Partitioning: 类似于rebalance,但只在本地节点内轮询
  • Broadcast Partitioning: 将数据广播到所有下游分区
  • Key Group Partitioning: 基于键的哈希值确定分区

在SocketWordCount中,keyBy操作使用了Key Group Partitioning策略,确保相同单词的数据被发送到同一个分区进行处理。

3. 并行执行图解

sadmermaid-diagram

这个图清晰地展示了Flink并行执行的流程,包括:

  1. Socket数据源连接
  2. FlatMap操作(并行度为2)及其两个子任务
  3. KeyBy/Sum操作(并行度为2)及其两个子任务
  4. Print输出操作(并行度为2)

五、运行SocketWordCount

1. 准备Socket服务器

在运行SocketWordCount程序之前,我们需要先启动一个Socket服务器作为数据源。以下是几种常用的Socket服务器搭建方法:

1.1 使用netcat工具

Linux/Mac系统

nc -lk 9999

参数说明:

  • -l: 表示监听模式,等待连接
  • -k: 表示保持连接,允许接受多个连接(对持续测试很有用)
  • 9999: 端口号

Windows系统

Windows有几种获取netcat的方式:

  1. 如果安装了Git,可以使用Git Bash:

    nc -l -p 9999
  2. 如果安装了Windows Subsystem for Linux (WSL):

    nc -lk 9999

参数说明:

  • -l: 表示监听模式,等待连接
  • -k: 表示保持连接,允许接受多个连接(对持续测试很有用)
  • 9999: 端口号
1.2 使用Java实现Socket服务端

如果你想使用Java代码来创建一个更可控的Socket服务器,可以参考以下示例:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SimpleSocketServer {
public static void main(String[] args) {
int port = 9999;
try (ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println("Socket服务器已启动,监听端口: " + port);
while (true) {
try (Socket clientSocket = serverSocket.accept();
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
System.out.println("客户端已连接,输入要发送的数据(输入'exit'退出):");
String inputLine;
while ((inputLine = in.readLine()) != null) {
if (inputLine.equalsIgnoreCase("exit")) {
break;
}
out.println(inputLine);
}
} catch (IOException e) {
System.err.println("客户端连接异常: " + e.getMessage());
}
}
} catch (IOException e) {
System.err.println("无法启动服务器: " + e.getMessage());
}
}
}

这个Java实现的Socket服务器具有以下特点:

  • 启动后持续监听9999端口
  • 接受客户端连接并允许发送数据
  • 支持通过输入’exit’退出当前客户端连接
  • 异常处理更加完善
1.3 测试Socket连接

在启动Socket服务器后,你可以使用以下方法测试连接是否正常:

  1. 使用telnet客户端测试:

    telnet localhost 9999
  2. 使用netcat作为客户端测试:

    nc localhost 9999
1.4 常见问题与解决方法
  1. 端口被占用

  2. 防火墙阻止

  3. 权限问题(Linux/Mac):

  4. Windows特殊情况

六、高级特性扩展

1. 添加窗口计算

添加基于处理时间的滚动窗口计算:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new Tokenizer()).keyBy(value -> value.f0).window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5))).sum(1);

sad20251007145023

七、常见问题与解决方案

1. 连接被拒绝错误

问题:程序抛出Connection refused错误。

解决方案:确保Socket服务器已启动,并且监听在正确的端口上。

2. 结果不符合预期

问题:输出的单词计数结果不符合预期。

解决方案:检查分词逻辑是否正确,确保单词的大小写处理和分隔符使用得当。

3. 性能问题

问题:程序处理速度较慢。

解决方案:调整并行度,增加资源配置,或优化数据转换逻辑。

八、最佳实践

1. 生产环境配置

2. 代码优化建议

  • 避免使用全局变量:确保函数是无状态的或正确管理状态
  • 合理设置并行度:避免过度并行化导致的资源浪费

九、总结与展望

SocketWordCount虽然是一个简单的示例,但它涵盖了Flink流处理的核心概念和基本流程。通过这个示例,我们学习了如何创建Flink执行环境、连接数据源、进行数据转换、设置并行处理以及输出结果。

在实际应用中,Flink可以处理更复杂的流处理场景,如实时数据分析、欺诈检测、推荐系统等。后续我们还将深入学习Flink的窗口计算、状态管理、Flink SQL等高级特性,帮助你构建更强大的实时数据处理应用。

通过本文的学习,相信你已经对Flink实时流处理有了更深入的理解。实践是掌握技术的最好方法,不妨尝试修改SocketWordCount示例,添加更多功能,如窗口计算、状态管理等,进一步提升你的Flink技能!


源文来自:http://blog.daimajiangxin.com.cn

源码地址:https://gitee.com/daimajiangxin/flink-learning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/959833.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【STM32方案开源】基于STM32的智能语音台灯框架

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

2025年实验室全钢通风橱订制厂家权威推荐榜单:实验室全钢排风柜/全钢结构步入式通风柜/全钢台式通风柜源头厂家精选

在现代实验室建设中,全钢通风橱作为保障实验人员安全的核心装备,其性能直接关系到实验室的运营安全与效率。据实验室设备行业数据显示,2025年中国实验室通风设备市场规模预计达到87亿元,年复合增长率稳定在10%-12%…

flask: 对Flask-SQLAlchemy查询得到的数据遍历处理

一,代码: @user.route("/list/") def user_list():meta = {"title": "用户列表","code": 200,"msg": ""}# 得到数据库中的数据users = User.query.all…

go 工作区(workspace)模式

使用go写一个处理数学运算的小通用包 go.mod文件如下 module gitee.com/demo_go/utils_mathgo 1.25.3utilsmath.go 文件内容如下 package utilsmathimport "fmt"func AddInt(a, b int) int {fmt.Printf(&quo…

# [NOIP 2016 提高组] 天天爱跑步 题解

简要题意 给定一个拥有 \(n\) 个节点的树和 \(m\) 条运动路径,求对于每个点 \(u\) , 在 \(w_i\) 时刻经过此点的玩家数量。 思路 暴力 首先暴力模拟每个玩家的运动路径来计算对每个节点 \(u\) 是否有贡献是不可取的,…

2025年搓管机全套管实力厂家权威推荐榜单:旋挖全套管/全回转钻机全套管/全回转全套管源头厂家精选

在基建工程持续发展的推动下,搓管机全套管作为桩基施工的关键设备,其性能直接关系到施工效率与工程质量。据基建行业数据显示,2025年中国桩工机械市场规模预计达到387亿元,年复合增长率保持在12%-15% 的区间。 搓管…

842318 - Frequently asked questions about validations and substitutions

Symptom This note deals with frequently asked questions about validation and substitution maintenance. Solution I. Creating, activating and transporting validations and substitutions 1. Which transacti…

jmter题目

一. 基础HTTP GET接口请求测试 (一). 打开JMeter,新建测试计划,右键添加“线程组”(线程数1、循环次数1)。(二). 线程组下添加“HTTP请求”,服务器名称/IP填写httpbin.org,端口80,请求方法选GET,路径填写…

提高组数学:扩展欧几里得

同余\({\Huge\equiv}\)是同余符号 \[a \equiv b \pmod{n} \]读作:\(a\)与\(b\)模\(n\)同余 定义:\(a\)除以\(n\)的余数等于\(b\)除以\(n\)的余数。 例 \[10 \equiv 6 \pmod{2} \]\(\because\) \[10 \% 2 = 0 \\ 6 \%…

2025广州人力资源服务推荐榜:精典人才领衔,派遣/外包靠谱公司精选3家

在企业用工需求不断升级的当下,广州人力资源服务市场愈发成熟,人力资源派遣、外包、劳务外包等服务成为企业降本增效的关键选择。本次精选 3 家口碑过硬的服务商,其中广州精典人才创新有限公司以全维度优势登顶,为…

51汇编--外部中断

51汇编--外部中断光二极管L0~L6)。外部中断1使P1.7翻转(P1.7连接到发光二极管。按键次数,并将计数结果显示在发光二极管上(P1.0~P1.6连接7个发。将外部中断0和外部中断1分别设置为高优先级和低优先级。将单脉冲信…

第182天委派与非委派约束

非约束委派 第一种做法域控管理员得有登陆过主机: 类似PTT横向的第三种手法 第二种手法的利用: 需要DC的版本在windows server 2012以上 在漏洞利用的时候需要注意修改host的内容约束委派

51汇编-跑马灯

51汇编-跑马灯P1口接八只发光二极管,编写程序使发光二极管逐个循环点亮,形成跑马灯效果,要求编写延时子程序。本文完全免费,非VIP文章,如果您发现变为VIP文章,请邮箱联系我:openwebsite@foxmail.comP1口接八只发…

51汇编--AD和DA

51汇编--AD和DA模/数转换的正确性。写入到DAC0832的数据来自内部RAM 30H单元(可在程序暂。2.升关状态为1时选择DAC0832的输出为ADC0809的模拟输入,验证数模和。1.开关状态为0时选择电位器输出为ADC0809的模拟输入,将…

flask:用Flask-SQLAlchemy访问mysql

一,安装第三方库 $ pip3 install Flask-SQLAlchemy 二,建立到数据库的连接 import os from dotenv import load_dotenvfrom flask import Flask,jsonifyfrom flask_sqlalchemy import SQLAlchemy# 加载变量 dotenv_p…

51汇编--定时器与计数器

51汇编--定时器与计数器2.用定时器1的方式2计数,T1脚接单脉冲发生器输出,用(P1.0~P1.6连。1.用定时器0的方式1定时,P1.7接发光二极管L7,使该发光二极管每秒钟。闪烁5次,要求采用中断方式。尝试改变闪烁频率,观…

2025年废棉开花机制造企业权威推荐榜单:化纤块开花机/废布专用开花机/纤维专用开花机源头厂家精选

在资源循环利用政策的推动下,废棉开花机已成为纺织废料再生行业的核心装备,其性能直接关系到纤维回收品质与生产效率。 废棉开花机作为纺织废料回收的关键设备,通过开松、分梳、清洁等工序,将废棉、废布和化纤块等…

2025年杭州工厂外贸代运营公司权威推荐榜单:海外社媒推广/海外社媒营销/外贸推广源头公司精选

在跨境电商高速发展与政策双重驱动下,杭州外贸代运营行业已形成专业化、精细化的服务生态。据行业报告显示,2024年中国电商代运营市场规模已突破2800亿元,年增长率达19.3%,其中长三角地区服务商占据全国40%以上市场…

51汇编--数码管显示

51汇编--数码管显示把LED_CS接到CS0,若接到其他位置,需要改段地址和位地址的D12~D14位。将内部RAM30H~32H单元中存储的6位十进制数显示在6个数码管上。要求编写将一个内存单元中的压缩BCD码转换为两个七段显示码的子…

深入解析:Isaac Lab 2.3深度解析:全身控制与增强遥操作如何重塑机器人学习

深入解析:Isaac Lab 2.3深度解析:全身控制与增强遥操作如何重塑机器人学习pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-fami…