一、统计socket单词数
侦听TCP套接字的数据服务器接收到的文本数据中的单词数。
二、maven配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency>
</dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>
三、编程代码
public class SocketApp implements SparkConfInfo {public static void main(String[] args) throws InterruptedException {JavaStreamingContext streamingContext = new SocketApp().getStreamingContext("SocketApp", 5);JavaReceiverInputDStream<String> lines = streamingContext.socketTextStream("localhost", 8891);JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")).stream().filter(v->v.length()>0).iterator());JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(Integer::sum);wordCounts.foreachRDD(v->{v.foreach(s-> System.out.println(s._1+":" + s._2));System.out.println("---------------------------");});streamingContext.start();streamingContext.awaitTermination();}
}public interface SparkConfInfo {default JavaStreamingContext getStreamingContext(String appName, int second){SparkConf sparkConf = getSparkConf();sparkConf.setAppName(appName);JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(second));return jssc;}default SparkSession getSparkSession(String appName){SparkConf sparkConf = getSparkConf();SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}default SparkConf getSparkConf() {SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}return sparkConf;}
}
输入内容
Tom        Lucy
Tom        Jack
Jone        Lucy
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
Philip        Alma
Mark        Terry
Mark        Alma输出结果
Mark:2
Tom:2
Jesse:2
Philip:2
Alice:2
Jone:2
Terry:4
Alma:2
Ben:1
Lucy:4
Mary:1
Jack:4
---------------------------