上一节中通过如下命令启动服务摸来模拟Socket流。
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。
pom.xml和上一节一致不需要修改
编写代码
同样适用Socket流
// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符
DataStreamSource<
String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n"
)
;
FlinkServer
继承Thread启动线程
package org.example.snow.demo3
;
import org.apache.flink.api.common.functions.FlatMapFunction
;
import org.apache.flink.api.java.functions.KeySelector
;
import org.apache.flink.api.java.tuple.Tuple2
;
import org.apache.flink.streaming.api.datastream.DataStreamSource
;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
;
import org.apache.flink.streaming.api.windowing.time.Time
;
import org.apache.flink.util.Collector
;
/**
* @author snowsong
*/
public
class FlinkServer
extends Thread{
@Override
public
void run(
) {
String ip = "0.0.0.0"
;
int port = 8886
;
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(
)
;
// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符
DataStreamSource<
String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n"
)
;
SingleOutputStreamOperator<
Tuple2<
String
, Long>
> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(
new FlatMapFunction<
String
, Tuple2<
String
, Long>
>(
) {
@Override
public
void flatMap(String s, Collector<
Tuple2<
String
, Long>
> collector)
throws Exception {
String[] splits = s.split("\\s"
)
;
for (String word : splits) {
collector.collect(Tuple2.of(word, 1L
)
)
;
}
}
}
)
;
SingleOutputStreamOperator<
Tuple2<
String
, Long>
> word = tuple2SingleOutputStreamOperator
.keyBy(
new KeySelector<
Tuple2<
String
, Long>
, Object>(
) {
@Override
public Object getKey(Tuple2<
String
, Long> stringLongTuple2)
throws Exception {
return stringLongTuple2.f0;
}
}
)
.window(SlidingProcessingTimeWindows.of(Time.seconds(5
)
, Time.seconds(1
)
)
)
.sum(1
)
;
word.print(
)
;
try {
executionEnvironment.execute("stream!"
)
;
}
catch (Exception e) {
throw
new RuntimeException(e)
;
}
}
}
NumRandom
使用ServerSocket实现一个持续的流输出
package org.example.snow.demo3
;
import java.io.OutputStream
;
import java.io.PrintWriter
;
import java.net.InetSocketAddress
;
import java.net.ServerSocket
;
import java.net.Socket
;
import java.util.Random
;
/**
* @author snowsong
*/
public
class RandomNumClient
extends Thread {
@Override
public
void run(
) {
// 随机生成数字
String ip = "0.0.0.0"
;
int port = 8886
;
try {
ServerSocket serverSocket =
new ServerSocket(
)
;
InetSocketAddress address =
new InetSocketAddress(ip, port)
;
// 灵活绑定服务器地址
serverSocket.bind(address)
;
// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来
Socket accept = serverSocket.accept(
)
;
// 获取输入流,读取客户端发送的数据
OutputStream outputStream = accept.getOutputStream(
)
;
// 包装成打印流,方便写入数据 true 自动刷新缓冲区
PrintWriter printWriter =
new PrintWriter(outputStream, true
)
;
Random random =
new Random(
)
;
// 遍历
for (
int i = 0
; i <
10
; i++
) {
// 生成随机数
int num = random.nextInt(10
) + 1
;
printWriter.println("随机数:" + num)
;
System.out.println("send to flink:" + num)
;
Thread.sleep(100
)
;
}
}
catch (Exception e) {
throw
new RuntimeException(e)
;
}
super.run(
)
;
}
}
启动类
package org.example.snow.demo3
;
/**
* @author snowsong
*/
public
class StartApp {
public
static
void main(String[] args)
throws Exception {
RandomNumClient randomNumClient =
new RandomNumClient(
)
;
FlinkServer flinkServer =
new FlinkServer(
)
;
flinkServer.start(
)
;
randomNumClient.start(
)
;
}
}