Flink03-学习-套接字分词流自动写入工具 - 实践

news/2025/10/8 17:58:56/文章来源:https://www.cnblogs.com/wzzkaifa/p/19129920

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符
DataStreamSource<
String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n"
)
;

FlinkServer
继承Thread启动线程

package org.example.snow.demo3
;
import org.apache.flink.api.common.functions.FlatMapFunction
;
import org.apache.flink.api.java.functions.KeySelector
;
import org.apache.flink.api.java.tuple.Tuple2
;
import org.apache.flink.streaming.api.datastream.DataStreamSource
;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
;
import org.apache.flink.streaming.api.windowing.time.Time
;
import org.apache.flink.util.Collector
;
/**
* @author snowsong
*/
public
class FlinkServer
extends Thread{
@Override
public
void run(
) {
String ip = "0.0.0.0"
;
int port = 8886
;
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(
)
;
// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符
DataStreamSource<
String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n"
)
;
SingleOutputStreamOperator<
Tuple2<
String
, Long>
> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(
new FlatMapFunction<
String
, Tuple2<
String
, Long>
>(
) {
@Override
public
void flatMap(String s, Collector<
Tuple2<
String
, Long>
> collector)
throws Exception {
String[] splits = s.split("\\s"
)
;
for (String word : splits) {
collector.collect(Tuple2.of(word, 1L
)
)
;
}
}
}
)
;
SingleOutputStreamOperator<
Tuple2<
String
, Long>
> word = tuple2SingleOutputStreamOperator
.keyBy(
new KeySelector<
Tuple2<
String
, Long>
, Object>(
) {
@Override
public Object getKey(Tuple2<
String
, Long> stringLongTuple2)
throws Exception {
return stringLongTuple2.f0;
}
}
)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5
)
, Time.seconds(1
)
)
)
.sum(1
)
;
word.print(
)
;
try {
executionEnvironment.execute("stream!"
)
;
}
catch (Exception e) {
throw
new RuntimeException(e)
;
}
}
}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3
;
import java.io.OutputStream
;
import java.io.PrintWriter
;
import java.net.InetSocketAddress
;
import java.net.ServerSocket
;
import java.net.Socket
;
import java.util.Random
;
/**
* @author snowsong
*/
public
class RandomNumClient
extends Thread {
@Override
public
void run(
) {
// 随机生成数字
String ip = "0.0.0.0"
;
int port = 8886
;
try {
ServerSocket serverSocket =
new ServerSocket(
)
;
InetSocketAddress address =
new InetSocketAddress(ip, port)
;
// 灵活绑定服务器地址
serverSocket.bind(address)
;
// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来
Socket accept = serverSocket.accept(
)
;
// 获取输入流,读取客户端发送的数据
OutputStream outputStream = accept.getOutputStream(
)
;
// 包装成打印流,方便写入数据 true 自动刷新缓冲区
PrintWriter printWriter =
new PrintWriter(outputStream, true
)
;
Random random =
new Random(
)
;
// 遍历
for (
int i = 0
; i <
10
; i++
) {
// 生成随机数
int num = random.nextInt(10
) + 1
;
printWriter.println("随机数:" + num)
;
System.out.println("send to flink:" + num)
;
Thread.sleep(100
)
;
}
}
catch (Exception e) {
throw
new RuntimeException(e)
;
}
super.run(
)
;
}
}

启动类

package org.example.snow.demo3
;
/**
* @author snowsong
*/
public
class StartApp {
public
static
void main(String[] args)
throws Exception {
RandomNumClient randomNumClient =
new RandomNumClient(
)
;
FlinkServer flinkServer =
new FlinkServer(
)
;
flinkServer.start(
)
;
randomNumClient.start(
)
;
}
}

运行结果

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/931784.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[笔记]树论笔记+做题记录

树的性质树上任意两点间恰有一条简单路径。树上所有节点度数和为 \(O(n)\) 的。树上 \(m\) 个点两两产生的 LCA 去重后不超过 \(m-1\) 个。Proof:考虑找 LCA 的过程,两个点向上跳,重合时合并成一个点。最后剩下 \(1…

云服务器部署大数据组件

大数据集群规划hw101 hw102 hw103HDFS NameNodeDataNode DataNode SecondaryNameNodeDataNodeYARN NodeManager ResourceManagerNodeManager NodeManagerZookeeper QuorumPeerMain QuorumPeerMain QuorumPeerMainHive …

做a视频网站有哪些要制作自己的网站需要什么

目录 LNMP部署--nginx 搭建mysql数据库 安装mysql的过程&#xff1a; 部署PHP&#xff1a; ​编辑​编辑php的配置文件在哪 wordpress程序安装 LNMP部署--nginx 纯净--联网状态 环境变量中没有nginx 安装形式的选择&#xff1a; yum安装&#xff1a;自动下载安装包及…

wordpress金融网站模板正能量网站免费入口有限公司

华为云与伙伴共同打造联合解决方案 已成为更多企业的数字化转型利器 1月恒驰上云规划实施解决方案 完成上市宣讲并正式上架华为云官网 恒驰上云规划实施解决方案能力全景图&#xff1a;融合厂商云服务能力&#xff0c;一站式高效云迁移 从深入了解企业的本地IT环境、业务特点…

网页制作与网站建设技术大全 pdf公司形象vi设计

目录 一.基本查询回顾 二. 多表查询 三.自连接 四.子查询 1.单行子查询 2.多行子查询 3.多列子查询 4.在from子句中使用子查询 5.合并查询 一.基本查询回顾 准备数据库&#xff1a; 查询工资高于500或岗位为MANAGER的雇员&#xff0c;同时还要满足他们的姓名首字母为…

网课一

网课一下载安装了anaconda

购物网站开发分工都匀经济开发区建设局网站

今天小编为大家送上几条新闻了&#xff1a;1 微软给Edge浏览器添加离线小游戏谷歌浏览器在不联网的情况下&#xff0c;会有一个小恐龙跳一跳的游戏&#xff0c;而最近换上chromium内核的edge&#xff0c;也添加了自己的离线小游戏。这是一款冲浪游戏&#xff0c;非常类似于微软…

规模化网站SSL证书终极方案

对于拥有大量域名、子域名或需要频繁签发证书的企业和个人,单个购买商业证书成本极高,管理更是噩梦。最具性价比的方案核心在于:自动化 + 免费证书 + 集中管理。 一、 核心原则:为何这是最佳性价比方案? 成本为零…

极智项目 | 基于PyQT+Whisper实现的语音识别软件设计 - 指南

极智项目 | 基于PyQT+Whisper实现的语音识别软件设计 - 指南2025-10-08 17:39 tlnshuju 阅读(0) 评论(0) 收藏 举报pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !importa…

详细介绍:saveOrUpdate 有个缺点,不会把值赋值为null,解决办法

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

市场策划网站如何利用个人nas做网站

Unity 接入 Facebook SDK 的过程中遇到这个问题&#xff0c;查了很多帖子&#xff0c;不太直观&#xff0c;记录下来方便需要的同学参考 报上面错误的原因是在https://developers.facebook.com/apps/ 设置里没有填入有效的密钥 怎么填入这个密钥呢&#xff0c;其实很简单&…

国外网站顶部菜单设计谈谈网站建设会有哪些问题

美多商城完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;欢迎来到美多商城&#xff01;&#xff0c;项目准备。展示用户注册页面&#xff0c;创建用户模块子应用。用户注册业务实现&#xff0c;用户注册前端逻辑。图形验证码&#xff0c;图形验证码接口设…

详细介绍:录制mp4

详细介绍:录制mp4pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Co…

网站联盟如何实现深圳定制网站公司

LeetCode 118 生成杨辉三角&#xff08;Pascal’s Triangle&#xff09; 小白渣翻译 给定一个非负整数 numRows&#xff0c;生成杨辉三角的前 numRows 行。 在杨辉三角中&#xff0c;每个数是它左上方和右上方的数的和。 例子 这里是小白理解 那么这种题目一上来看&#xf…

【OpenGL ES】光栅化插值原理和射线拾取原理

1 前言 ​ 最近在推导光栅化插值公式和射线拾取公式,发现计算过程中有很多共同点,因此将它们放在一篇文章里介绍。具体共同点如下。都引入了四面体模型 都以四面体的三条边作为基向量构建坐标系(非直角坐标系) …

网站移动端推广佛山网站建设熊掌号

只针对不正常的情况才使用异常 异常只应该被用于不正常的条件&#xff0c;它们永远不应该被用于正常的控制流。《阿里手册》中&#xff1a;【强制】Java 类库中定义的可以通过预检查方式规避的RuntimeException异常不应该通过catch 的方式来处理&#xff0c;比如&#xff1a;Nu…

网站建设职业描述郑州网站免费制作

概述&#xff1a; el-switch 表示两种相互对立的状态间的切换&#xff0c;多用于触发「开/关」。 常见用法&#xff1a; 1、绑定v-model到一个Boolean类型的变量。可以使用active-color属性与inactive-color属性来设置开关的背景色。 2、使用active-text属性与inactive-tex…

塑料机械怎么做网站网站建设协议 模板

尽管 Hexo 支持 MarkDown&#xff0c;但是我们却不能像写单独的 MarkDown 文档时那样肆无忌惮。由于我们所写的文档是需要被解析为静态网页文件的&#xff0c;所以我们必须严格遵从 Hexo 的规范&#xff0c;这样才能解析出条理清晰的静态网页文件。新建文档 假设我们的文章名为…

HTML 速查列表 - 教程

HTML 速查列表 - 教程pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "…

Exp1

Experiment 1 实验任务1 1.1 #include <stdio.h> int main() {printf(" O \n");printf("<H>\n");printf("I I\n");printf(" O \n");printf("<H>\n&qu…