一、单表关联
给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——祖父母)表
二、maven设置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.mk</groupId><artifactId>spark-test</artifactId><version>1.0</version><name>spark-test</name><url>http://spark.mk.com</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.1</scala.version><spark.version>2.4.4</spark.version><hadoop.version>2.6.0</hadoop.version></properties><dependencies><!-- scala依赖--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency></dependencies><build><pluginManagement><plugins><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin></plugins></pluginManagement></build>
</project>
三、编程代码
public class SingleTableJoinApp implements SparkConfInfo {public static void main(String[] args) {String filePath = "E:\\spark\\childParent.txt";SparkSession sparkSession = new SingleTableJoinApp().getSparkConf("childParent");JavaPairRDD<String, String> childParent = sparkSession.sparkContext().textFile(filePath, 4).toJavaRDD().flatMap(v -> Arrays.asList(v.split("\n")).iterator()).mapToPair(v -> {if(v.matches("\\s+child\\s+parent\\s+")){return null;}String[] data = v.split("\\s+");if (data.length != 2) {return null;}return new Tuple2<>(data[0],data[1]);}).filter(v -> v != null).cache();JavaPairRDD<String, String> parentChild = childParent.mapToPair(v->new Tuple2(v._2, v._1));JavaPairRDD<String, Tuple2<String, String> >  joinRdd = parentChild.join(childParent);List<Tuple2<String, String>> childGrand = joinRdd.mapToPair(v->new Tuple2<>(v._2._1, v._2._2)).sortByKey(true).collect();System.out.println("child\t\tgrand");childGrand.forEach(v -> System.out.println(v._1 + "\t\t" + v._2));sparkSession.stop();}
}public interface SparkConfInfo {default SparkSession getSparkConf(String appName){SparkConf sparkConf = new SparkConf();if(System.getProperty("os.name").toLowerCase().contains("win")) {sparkConf.setMaster("local[4]");System.out.println("使用本地模拟是spark");}else{sparkConf.setMaster("spark://hadoop01:7077,hadoop02:7077,hadoop03:7077");sparkConf.set("spark.driver.host","192.168.150.1");//本地ip,必须与spark集群能够相互访问,如:同一个局域网sparkConf.setJars(new String[] {".\\out\\artifacts\\spark_test\\spark-test.jar"});//项目构建生成的路径}SparkSession session = SparkSession.builder().appName(appName).config(sparkConf).config(sparkConf).getOrCreate();return session;}
}
childParent.txt文件内容
child        parent
Tom        Lucy
Tom        Jack
Jone        Lucy
Jone        Jack
Lucy        Mary
Lucy        Ben
Jack        Alice
Jack        Jesse
Terry        Alice
Terry        Jesse
Philip        Terry
Philip        Alma
Mark        Terry
Mark        Alma输出
child		grand
Jone		Mary
Jone		Ben
Jone		Alice
Jone		Jesse
Mark		Alice
Mark		Jesse
Philip		Alice
Philip		Jesse
Tom		Mary
Tom		Ben
Tom		Alice
Tom		Jesse
四、join方法
<W> JavaPairRDD<K, Tuple2<V, W>> join(JavaPairRDD<K, W> other)关联表返回相同可以的键值对