sb-flink1.13.1-jdk8-分隔字符串 20260125
1、pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.15</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.flink</groupId>
<artifactId>db-flink</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>db-flink</name>
<description>db-flink</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
<!--flink客户端-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--streaming的java版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--日志输出-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<!--json依赖包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、DbFlinkApplicatipackage com.flink.dbflink
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class DbFlinkApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(DbFlinkApplication.class, args);
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(1);
//相同类型元素的数据流 source
DataStreamSource<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis",
"kafka,课堂");
stringDS.print("处理前");
DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> collector) throws Exception {
String [] arr = value.split(",");
for(String str : arr){
collector.collect(str);
}
}
});
//输出 sink
flatMapDS.print("处理后");
//DataStream需要调用execute,可以取个名称
env.execute("flat map job");
}
// public static void runFlinkJob(String[] args) throws Exception {
// //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// //设置并行度
// env.setParallelism(1);
// //相同类型元素的数据流 source
// DataStreamSource<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis",
// "kafka,课堂");
// stringDS.print("处理前");
// DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() {
// @Override
// public void flatMap(String value, Collector<String> collector) throws Exception {
// String [] arr = value.split(",");
// for(String str : arr){
// collector.collect(str);
// }
// }
// });
// //输出 sink
// flatMapDS.print("处理后");
//
// //DataStream需要调用execute,可以取个名称
// env.execute("flat map job");
// }
}
3、输出结果
处理前> java,SpringBoot
处理后> java
处理后> SpringBoot
处理前> spring cloud,redis
处理后> spring cloud
处理后> redis
处理前> kafka,课堂
处理后> kafka
处理后> 课堂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1215497.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!