头歌实践教学平台 大数据编程 实训答案(三)

第一章 遍历日志数据

用 Spark 遍历日志数据

第1关:用 Spark 获得日志文件中记录总数

任务描述
本关任务:编写一个能用 Spark 操作日志文件并输出日志文件记录数的小程序。

相关知识
为了完成本关任务,你需要掌握:1.搜索查询日志的内容,2.如何用 Spark 获得日志文件,3.如何获得日志文件的记录数。

import org.apache.spark.{SparkConf, SparkContext}object Test1 {def main(args: Array[String]) {// SparkConf包含了Spark配置的各种参数,// local:设置为本地运行// *:使用本地的所有cpu核// setAppName:设置本应用程序的别名(自定义)val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")// 进入Spark操作的入口val sc = new SparkContext(sparkConf)// 获得文本文件内容val sou = sc.textFile("/data/workspace/myshixun/projectsou1_1/src/soulog.txt")//********** Begin **********println("搜索日志文件中共有%d条记录".format(sou.count())) //********** End **********}
}

第2关:用 Spark 获得日志文件中记录内容

任务描述
本关任务:编写一个能用 Spark 获得日志文件中记录内容的小程序。

相关知识
为了完成本关任务,你需要掌握:如何用 Spark 获得日志文件中记录内容。

import org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]) {// SparkConf包含了Spark配置的各种参数,// local:设置为本地运行// *:使用本地的所有cpu核// setAppName:设置本应用程序的别名(自定义)val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")// 进入Spark操作的入口val sc = new SparkContext(sparkConf)// 获得文本文件内容val sou = sc.textFile("/data/workspace/myshixun/projectsou1_1/src/soulog.txt")//********** Begin **********val rdd1 = sou.map {case log =>val logSplit = log.split("\\s")(logSplit(3),logSplit(4))}rdd1.collect.take(6).foreach(println(_)) //********** End **********}
}

第二章 过滤日志数据

用 Spark 过滤日志数据

第1关:掌握用 Spark 过滤日志数据

任务描述
本关任务:编写一个能用 Spark 过滤日志数据的小程序。

相关知识
为了完成本关任务,你需要掌握:如何用 Spark 过滤日志数据。

import org.apache.spark.{SparkConf, SparkContext}object Test1 {def main(args: Array[String]) {// SparkConf包含了Spark配置的各种参数,// local:设置为本地运行// *:使用本地的所有cpu核// setAppName:设置本应用程序的别名(自定义)val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")// 进入Spark操作的入口val sc = new SparkContext(sparkConf)// 获得文本文件内容val sou = sc.textFile("/data/workspace/myshixun/projectsou2_1/src/soulog.txt")//********** Begin **********val splitSou = sou.map(_.split("\\s"))val filterSou = splitSou  .filter(_ (3).toInt == 1).filter(_ (4).toInt == 1)print(filterSou.count())//********** End **********}
}

第三章 聚合、排序日志数据

用 Spark 对日志数据进行排序

第1关:用 Spark 对日志数据进行排序

任务描述
本关任务:编写一个能用 Spark 对日志数据进行排序的小程序。

相关知识
为了完成本关任务,你需要掌握:如何用 Spark 对日志数据进行排序。

排序操作
要对上节课的数据进行排序操作,才能从大到小输出排行榜,
比如上节课获得的数据是:

(222,1)
(111,3)
(333,2)

格式是(用户id,查询次数),

现在要将这些数据按照查询次数的从大到小进行排序,也就是降序排序,代码如下:

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
    val sou = sc.textFile("src/soulog2.txt")
    val splitSou = sou.map(_.split("\\s"))
    val result=splitSou.map(x => (x(1),1))
      .reduceByKey(_+_)
    // 将之前的结果进行降序排序,输出用户查询次数的排行榜
      val sortResult=result
      .map(x => (x._2,x._1))
      .sortByKey(false)
      .map(x => (x._2,x._1))
    sortResult.collect().foreach(println(_)) 

import org.apache.spark.{SparkConf, SparkContext}object Test1 {def main(args: Array[String]) {// SparkConf包含了Spark配置的各种参数,// local:设置为本地运行// *:使用本地的所有cpu核// setAppName:设置本应用程序的别名(自定义)val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")// 进入Spark操作的入口val sc = new SparkContext(sparkConf)// 获得文本文件内容val sou = sc.textFile("/data/workspace/myshixun/projectsou4_1/src/soulog.txt")//\s代表正则表达式中的一个空白字符(可能是空格、制表符、其他空白)//分割后,输出总记录数val splitSou = sou.map(_.split("\\s"))// 日志文件中总共有10000条记录println(splitSou.count())//只查询在返回结果中的排名和用户点击的顺序号都为1的记录,// 之前已经分隔成6个部分的数据,// 现在我们要查询排名第1(也就是第4个部分数据)以及用户点击排名第1(也就是第5个数据)的数据// 可以用连续的filter方法来进行多次过滤,// 注意将排名值用toInt方法转换为整数val filterSou = splitSou.filter(_ (3).toInt == 1).filter(_ (4).toInt == 1)// 获得经过以上过滤的数据,并且,将每条记录的用户id取出来,并给每条记录加一个value值为1val result=filterSou.map(x => (x(1),1))// 将相同用户的查询次数统计出来.reduceByKey(_+_)//********** Begin **********val sortResult=result// 因为我们要按key进行排序,而之前的结果的key是用户id,value是次数// 所以我们将原来的key和value互换位置,// x._1就是(key,value)的第一个元素key,x._2就是(key,value)的第二个元素value// 所以我们用map方法互换了key和value的位置.map(x => (x._2,x._1))// 然后按照现在的key也就是查询次数来进行排序,因为是排行榜,从大到小,所以是降序排序.sortByKey(false)//排完序后,再将排完序的数据的key和value进行互换,.map(x => (x._2,x._1))// 输出用户查询次数sortResult.collect().take(10).foreach(println(_)) //********** End **********}
}

用 Spark 对日志数据进行聚合

第1关:用 Spark 对日志数据进行聚合

任务描述
本关任务:编写一个能用 Spark 对日志数据进行聚合的小程序。

相关知识
为了完成本关任务,你需要掌握:如何用 Spark 对日志数据进行聚合。

聚合操作
我们经常要对数据进行聚合操作,

比如对于以下数据:
时间       用户id   查询的词
00:00:00    111    [查询词1]
00:00:00    111    [查询词2]
00:00:00    333    [查询词3]
00:00:00    111    [查询词4]
00:00:00    222    [查询词5]
00:00:00    333    [查询词5]

我们要查询出每个用户查询的次数,
可以用以下代码来实现:

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")
    val sc = new SparkContext(sparkConf)
    val sou = sc.textFile("src/soulog2.txt")
    val splitSou = sou.map(_.split("\\s"))
    val result=splitSou.map(x => (x(1),1))
      .reduceByKey(_+_)
    result.collect().foreach(println(_)) 

import org.apache.spark.{SparkConf, SparkContext}object Test1 {def main(args: Array[String]) {// SparkConf包含了Spark配置的各种参数,  // local:设置为本地运行  // *:使用本地的所有cpu核  // setAppName:设置本应用程序的别名(自定义)  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sou")// 进入Spark操作的入口  val sc = new SparkContext(sparkConf)// 获得文本文件内容  val sou = sc.textFile("/data/workspace/myshixun/projectsou3_1/src/soulog.txt")//\s代表正则表达式中的一个空白字符(可能是空格、制表符、其他空白)  //分割后,输出总记录数  val splitSou = sou.map(_.split("\\s"))// 日志文件中总共有10000条记录  println(splitSou.count())//只查询在返回结果中的排名和用户点击的顺序号都为1的记录,  // 之前已经分隔成6个部分的数据,  // 现在我们要查询排名第1(也就是第4个部分数据)以及用户点击排名第1(也就是第5个数据)的数据  // 可以用连续的filter方法来进行多次过滤,  // 注意将排名值用toInt方法转换为整数  val filterSou = splitSou  .filter(_ (3).toInt == 1)  .filter(_ (4).toInt == 1)  //********** Begin **********val result=filterSou.map(x => (x(1),1))// 将相同用户的查询次数统计出来.reduceByKey(_+_)result.collect().take(5).foreach(println(_)) //********** End **********}
}

第一章 Spark 入门

Spark Standalone 模式的安装和部署

第1关: Standalone 分布式集群搭建

任务描述
掌握 Standalone 分布式集群搭建。

相关知识
我们已经掌握了 Spark 单机版安装,那么分布式集群怎么搭建呢? 接下来我们学习 Standalone 分布式集群搭建。

启动环境

cd /home
wrapdocker
ulimit -f 1024000
# 加载镜像
docker load -i hbase-ssh2_v1.0.tar
# 启动集群 启动失败则等一会,再次执行,直至成功
docker-compose up -d
# 新开一个命令行窗口 master         密码统一为 123456
ssh 172.18.0.2
ssh-keygen -t rsa #三下回车
# 新开一个命令行窗口 slave1
ssh 172.18.0.3
ssh-keygen -t rsa #三下回车
# 新开一个命令行窗口 slave2
ssh 172.18.0.4
ssh-keygen -t rsa #三下回车
#  在 master 复制 master、slave1、slave2 的公钥。
cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
ssh root@slave1 cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
ssh root@slave2 cat ~/.ssh/id_rsa.pub>> ~/.ssh/authorized_keys
# 分别在 slave1、slave2 复制 master 的 authorized_keys 文件。
ssh root@master cat ~/.ssh/authorized_keys>> ~/.ssh/authorized_keys
# 在第1个命令行窗口 密码为123456
scp -r /usr/local/spark-2.3.4-bin-hadoop2.7 root@172.18.0.2:/usr/local
# 在master(第2个命令行窗口) 修改配置 注意:未提示更换命令行则一直在master上执行
echo "export SPARK_HOME=/usr/local/spark-2.3.4-bin-hadoop2.7" >> /etc/profile
source /etc/profile
cd /usr/local/spark-2.3.4-bin-hadoop2.7/conf 
mv spark-env.sh.template spark-env.sh
echo "export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111" >> spark-env.sh
echo "SPARK_MASTER_WEBUI_PORT=8888" >> spark-env.sh
echo "export SPARK_MASTER_IP=master" >> spark-env.sh
mv slaves.template slaves 
vi slaves
# 修改为以下内容
master
slave1
slave2
# 分发安装包
cd /usr/local
scp -r spark-2.3.4-bin-hadoop2.7/ root@slave1:/usr/local
scp -r spark-2.3.4-bin-hadoop2.7/ root@slave2:/usr/local
# 启动集群
cd /usr/local/spark-2.3.4-bin-hadoop2.7/sbin
./start-all.sh

Spark的安装与使用

第1关:Scala 环境的安装与部署

任务描述
本关任务:安装与配置Scala开发环境。

相关知识
Scala是一种函数式面向对象语言,它融汇了许多前所未有的特性,而同时又运行于JVM之上。随着开发者对Scala的兴趣日增,以及越来越多的工具支持,无疑Scala语言将成为你手上一件必不可少的工具。

而我们将要学习的大数据框架Spark底层是使用Scala开发的,使用scala写出的代码长度是使用java写出的代码长度的1/10左右,代码实现更加简练。

所以安装与配置Scala的环境是我们在开始学习Spark之前要完成的准备工作。

接下来我们开始安装,分为三个步骤:

下载解压;
配置环境;
校验。

启动环境

mkdir /app
cd /opt/
tar -zxvf  scala-2.12.7.tgz -C /app
vi /etc/profile
SCALA_HOME=/app/scala-2.12.7
export PATH=$PATH:$SCALA_HOME/bin
source /etc/profile

第2关:Spark 环境安装

任务描述
本关任务:安装与配置Spark开发环境。

相关知识
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab(加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

本关我们来配置一个伪分布式的Spark开发环境,与配置Hadoop类似分为三个步骤:

下载解压安装包;
配置环境变量;
配置Spark环境;
校验。

cd /opt/
tar -zxvf spark-2.2.2-bin-hadoop2.7.tgz -C /app
vim /etc/profile
SPARK_HOME=/app/spark-2.2.2-bin-hadooop2.7
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile
cd /app/spark-2.2.2-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.sh
vi spark-env.sh
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_111
export SCALA_HOME=/app/scala-2.12.7
export HADOOP_HOME=/usr/local/hadoop/
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export SPARK_MASTER_IP=127.0.0.1
export SPARK_LOCAL_IP=127.0.0.1
cd /app/spark-2.2.2-bin-hadoop2.7
./sbin/start-all.sh

第三章 SparkSQL结构化数据分析与处理

军用大数据 - 结构化数据分析与处理

第1关:Spark SQL入门

任务描述
掌握 Spark SQL 相关基础知识,完成选择题任务。

相关知识
Spark SQL 是 Spark 用来处理结构化数据的一个模块。Spark SQL 为了支持结构化数据的处理,它提供了两个编程抽象分别叫做 DataFrame 和DataSet。

1、C;2、AB

第2关:使用Spark SQL统计战斗机飞行性能

任务描述
通过飞行速度统计出战斗机飞行性能排比。

相关知识
本关使用  mySQL 统计战斗机飞行性能。


# coding=utf-8from pyspark.sql import SparkSession#**********Begin**********##创建SparkSession
spark = SparkSession \.builder \.appName("Python Spark SQL basic example") \.config("spark.sql.crossJoin.enabled", "true") \.master("local") \.getOrCreate()#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json")
#创建视图
df.createOrReplaceTempView("table1")
#统计出全球飞行速度排名前三的战斗机
sqlDF = spark.sql("select cast(regexp_replace(regexp_extract(`最大飞行速度`,'[\\\d,\\\.]+',0),',','') as float) as SPEED, `名称` from table1 order by SPEED desc LIMIT 3")
#保存结果
sqlDF.write.format("csv").save("/root/airspark")#**********End**********#
spark.stop()

第3关:使用Spark SQL统计各个研发单位研制战斗机占比

任务描述
统计出各个研发单位研制战斗机占比。

相关知识
使用 Spark SQL 统计各个研发单位研制战斗机占比。

# coding=utf-8from pyspark.sql import SparkSession#**********Begin**********##创建SparkSession
spark = SparkSession \.builder \.appName("Python Spark SQL basic example") \.config("spark.sql.crossJoin.enabled", "true") \.master("local") \.getOrCreate()#读取/root/jun.json中数据
df =spark.read.json("/root/jun.json").coalesce(1)
#创建视图
df.createOrReplaceTempView("table1")#统计出全球各研发单位研制的战斗机在全球所有战斗机中的占比
sqlDF = spark.sql("select concat(cast(round(count(`研发单位`)*100/(select count(`研发单位`) from table1 where `研发单位` is not null and `名称` is not null ),2) as float),'%'),`研发单位` from table1 where `研发单位` is not null and `名称` is not null group by `研发单位`")#保存结果
sqlDF.write.format("csv").save("/root/airspark")
#**********End**********#spark.stop()

第四章 Spark结构化流处理

军用大数据--结构化流式数据处理

第1关:Spark结构化流快速入门

任务描述
Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。本关结合我们的应用场景,介结我们如何使用 Spark Streaming 处理数据。

# -*- coding: utf-8
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
import time
# 请在此处编写代码
#********** Begin **********#
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
spark.sparkContext.setLogLevel("error")
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# Split the lines into words
words = lines.select(explode(split(lines.value, " ")).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()# Start running the query that prints the running counts to the console
query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime='1 seconds').start()
time.sleep( 20 )
query.stop()
#********** End **********#

第2关:对飞机的点击次数实时统计

任务描述
Spark Streaming 是一套优秀的实时计算框架。其良好的可扩展性、高吞吐量以及容错机制能够满足我们很多的场景应用。上一关我们介绍了如何实时获取数据,并处理数据,本关结合上一关的场景,结合 Kafka 把分析结果读出来。

相关知识
Spark Streaming 其优秀的特点给我们带来很多的应用场景。本关中,将通过从 TCP 获取数据来进行介绍。

import time
from pyspark.sql import SparkSession
#********** Begin **********#
spark = SparkSession.builder.master("local[2]").appName("case2").getOrCreate()
spark.sparkContext.setLogLevel("error")
df = spark.readStream.format("socket").option("host", "localhost").option("port", 9998).load()
ds=df.selectExpr( "CAST(value AS STRING)")
ds.createOrReplaceTempView("planeNumber")
sql= spark.sql("select count(*) nums,value from planeNumber   group  by value order by nums desc");
query =  sql.writeStream.format("console").outputMode("complete").start()
time.sleep( 20 )
query.stop()
#********** End **********#

kafka-入门篇

第1关:kafka - 初体验

任务描述
本关任务:使用 Kafka 命令创建一个副本数量为1、分区数量为3的 Topic 。

相关知识
为了完成本关任务,你需要掌握:1.如何使用 Kafka 的常用命令。

#!/bin/bash
#1.创建一个名为demo的Topic
kafka-topics.sh -create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic demo
#2.查看所有Topic
kafka-topics.sh --list --zookeeper 127.0.0.1:2181
#3.查看名为demo的Topic的详情信息
kafka-topics.sh -topic demo --describe --zookeeper 127.0.0.1:2181

第2关:生产者 (Producer ) - 简单模式

任务描述
本关任务:编写一个 Kafka 的 Producer 进行数据生产。

相关知识
为了完成本关任务,你需要掌握:1.如何使用 Kafka 的 Producer API 进行数据生产。

package net.educoder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/*** kafka producer 简单模式*/
public class App {public static void main(String[] args) {/*** 1.创建配置文件对象,一般采用 Properties*//**----------------begin-----------------------*/Properties props = new Properties();/**-----------------end-------------------------*//*** 2.设置kafka的一些参数*          bootstrap.servers --> kafka的连接地址 127.0.0.1:9092*          key、value的序列化类 -->org.apache.kafka.common.serialization.StringSerializer*          acks:1,-1,0*//**-----------------begin-----------------------*/props.put("bootstrap.servers", "127.0.0.1:9092");props.put("acks", "1");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");/**-----------------end-------------------------*//*** 3.构建kafkaProducer对象*//**-----------------begin-----------------------*/Producer<String, String> producer = new KafkaProducer<>(props);/**-----------------end-------------------------*/for (int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("demo", i + "", i + "");/*** 4.发送消息*//**-----------------begin-----------------------*/producer.send(record);/**-----------------end-------------------------*/}producer.close();}
}

第3关:消费者( Consumer)- 自动提交偏移量

任务描述
本关任务:编写一个 Kafka 消费者并设置自动提交偏移量进行数据消费。

相关知识
为了完成本关任务,你需要掌握:1.如何编写 Kafka 消费者,2.如何使用自动提交偏移量。

package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class App {public static void main(String[] args) {Properties props = new Properties();/**--------------begin----------------*///1.设置kafka集群的地址props.put("bootstrap.servers", "127.0.0.1:9092");//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组props.put("group.id", "g1");//3.开启offset自动提交props.put("enable.auto.commit", "true");//4.自动提交时间间隔props.put("auto.commit.interval.ms", "1000");//5.序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/**---------------end---------------*//**--------------begin----------------*///6.创建kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//7.订阅kafka的topicconsumer.subscribe(Arrays.asList("demo"));/**---------------end---------------*/int i = 1;while (true) {/**----------------------begin--------------------------------*///8.poll消息数据,返回的变量为crsConsumerRecords<String, String> crs = consumer.poll(100);for (ConsumerRecord<String, String> cr : crs) {System.out.println("consume data:" + i);i++;}/**----------------------end--------------------------------*/if (i > 10) {return;}}}
}

第4关:消费者( Consumer )- 手动提交偏移量

任务描述
本关任务:编写一个 Kafka 消费者并使用手动提交偏移量进行数据消费。

相关知识
为了完成本关任务,你需要掌握:1.如何编写 Kafka 消费者,2.如何手动提交偏移量。

Kafka 两种手动提交方式
异步提交( CommitAsync ):
   异步模式下,提交失败也不会尝试提交。消费者线程不会被阻塞,因为异步操作,可能在提交偏移量操作结果未返回时就开始下一次拉取操作。

同步提交( CommitSync ):
   同步模式下,提交失败时一直尝试提交,直到遇到无法重试才结束。同步方式下,消费者线程在拉取消息时会被阻塞,直到偏移量提交操作成功或者在提交过程中发生错误。

注意:实现手动提交前需要在创建消费者时关闭自动提交,设置enable.auto.commit=false

package net.educoder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class App {public static void main(String[] args){Properties props = new Properties();/**-----------------begin------------------------*///1.设置kafka集群的地址props.put("bootstrap.servers", "127.0.0.1:9092");//2.设置消费者组,组名字自定义,组名字相同的消费者在一个组props.put("group.id", "g1");//3.关闭offset自动提交props.put("enable.auto.commit", "false");//4.序列化器props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/**-----------------end------------------------*//**-----------------begin------------------------*///5.实例化一个消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//6.消费者订阅主题,订阅名为demo的主题consumer.subscribe(Arrays.asList("demo"));/**-----------------end------------------------*/final int minBatchSize = 10;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {for (ConsumerRecord bf : buffer) {System.out.printf("offset = %d, key = %s, value = %s%n", bf.offset(), bf.key(), bf.value());}/**-----------------begin------------------------*///7.手动提交偏移量consumer.commitSync();/**-----------------end------------------------*/buffer.clear();return;}}}
}

第六章 Spark MLib机器学习

军用大数据 - Spark机器学习

第1关:Iris 分类

任务描述
本关任务:使用 pyspark ml 的LogisticRegression分类器完成 Iris 分类任务。

相关知识
观察数据集

我们本次使用的数据集是sklearn自带的数据集Iris。

接下来,我们来了解下Iris数据集的数据吧!

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from sklearn.datasets import load_iris
import pandas
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.feature import RFormula# 训练模型
def trainingModel(spark):# ********** Begin ********** ## 1.加载sklearn的训练数据iris =

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/55698.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java生成二维码示例(带logo以及文字描述)

先看一下生成效果 普通二维码 普通带文本二维码 带logo二维码 带logo带文本二维码 直接上代码 这里主要是用的第三方工具生成二维码的&#xff0c;所以我们需要先引入 jar 包 <dependency><groupId>com.google.zxing</groupId><artifactId>core</…

2024诺贝尔生理学或医学奖:RNA技术将拯救人类世界

生信碱移 miRNA领域获得最新诺贝尔奖 “我好像接到了真的诺贝尔委员会的电话&#xff01;” 加里鲁夫坎 2024年诺贝尔医学奖得主 ▲ 两位诺贝尔奖获奖得主。来源:诺贝尔生理学或医学奖委员会。 就在今天&#xff0c;卡罗林斯卡学院的诺贝尔大会决定将2024年诺贝尔生理学或医学…

动手学深度学习(李沐)PyTorch 第 6 章 卷积神经网络

李宏毅-卷积神经网络CNN 如果使用全连接层&#xff1a;第一层的weight就有3*10^7个 观察 1&#xff1a;检测模式不需要整张图像 很多重要的pattern只要看小范围即可 简化1&#xff1a;感受野 根据观察1 可以做第1个简化&#xff0c;卷积神经网络会设定一个区域&#xff0c…

无人机之飞行算法篇

无人机的飞行算法是一个复杂而精细的系统&#xff0c;它涵盖了多个关键技术和算法&#xff0c;以确保无人机能够稳定、准确地执行飞行任务。 一、位置估计 无人机在空中飞行过程中需要实时获取其位置信息&#xff0c;以便进行路径规划和控制。这通常通过以下传感器实现&#…

工程师 - 如何配置DNS服务器

配置本地 DNS 服务器的步骤根据操作系统的不同会有所差异。下面提供了在常见操作系统&#xff08;如 Windows 和 Linux&#xff09;上配置本地 DNS 服务器的方法。 1. 在 Windows 系统中配置本地 DNS 服务器 步骤&#xff1a; 1. 打开网络连接设置 - 右键单击任务栏中的网络图标…

基于STM32的模拟舞台灯光控制系统设计

引言 本项目设计了一个基于STM32的模拟舞台灯光控制系统&#xff0c;可以通过调节灯光的亮度、颜色和模式&#xff0c;实现多种灯光效果模拟&#xff0c;如渐变、闪烁和跟随节奏的灯光变化。该系统结合了LED灯条、PWM控制和按钮输入等&#xff0c;实现了舞台灯光的多样化展示。…

【Linux的那些事】shell命名及Linux权限的理解

目录 一、shell命令以及运行原理 二、Linux权限的概念 三、Linux权限管理 3.1.文件访问者的分类&#xff08;人&#xff09; 3.2.文件类型和访问权限&#xff08;事物属性&#xff09; 3.3.文件权限值的表示方法 3.4.文件访问权限的相关设置方法 a)chmod b)chown c)…

5.错误处理在存储过程中的重要性(5/10)

错误处理在存储过程中的重要性 引言 在数据库编程中&#xff0c;存储过程是一种重要的组件&#xff0c;它允许用户将一系列SQL语句封装成一个单元&#xff0c;以便重用和简化数据库操作。然而&#xff0c;像任何编程任务一样&#xff0c;存储过程中的代码可能会遇到错误或异常…

使用MTVerseXR SDK实现VR串流

1、概述​ MTVerseXR SDK 是摩尔线程GPU加速的虚拟现实&#xff08;VR&#xff09;流媒体平台&#xff0c;专门用于从远程服务器流式传输基于标准OpenXR的应用程序。MTVerseXR可以通过Wi-Fi和USB流式将VR内容从Windows服务器流式传输到XR客户端设备, 使相对性能低的VR客户端可…

15分钟学 Python 第38天 :Python 爬虫入门(四)

Day38 : Python爬虫异常处理与反爬虫机制 章节1&#xff1a;异常处理的重要性 在爬虫开发过程中&#xff0c;网络请求和数据解析常常会遭遇各种异常。正确的异常处理可以提高程序的稳定性&#xff0c;避免崩溃&#xff0c;并帮助开发者快速定位问题。 章节2&#xff1a;常见…

18710 统计不同数字的个数(升级版)

### 思路 为了快速判断某个数字是否在之前出现过&#xff0c;我们可以使用一个布尔数组来记录每个数字是否已经出现过。由于题目中给出了数字的范围&#xff08;0 < ai < 200000&#xff09;&#xff0c;我们可以开一个大小为200001的布尔数组来记录每个数字的出现情况。…

网络编程(15)——服务器如何主动退出

十五、day15 服务器主动退出一直是服务器设计必须考虑的一个方向&#xff0c;旨在能通过捕获信号使服务器安全退出。我们可以通过asio提供的信号机制绑定回调函数即可实现优雅退出。 之前服务器的主函数如下 #include "CSession.h" #include "CServer.h"…

ASP.NetCore---I18n(internationalization)多语言版本的应用

文章目录 0.实现的效果如下1.创建新项目I18nBaseDemo2.添加页面中的下拉框3.在HomeController中添加ChangeLanguage方法4.在Progress.cs 文件中添加如下代码&#xff1a;5. 在progress.cs中添加code6.添加Resource资源文件7.在页面中引用i18n的变量8. 重启项目&#xff0c;应该…

力扣4. 寻找两个正序数组的中位数

给定两个大小分别为 m 和 n 的正序&#xff08;从小到大&#xff09;数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 示例 1&#xff1a; 输入&#xff1a;nums1 [1,3], nums2 [2] 输出&#xff1a;2.00000 解释&…

神经网络激活函数汇总

神经网络中的激活函数用于引入非线性&#xff0c;使模型能够学习和表示复杂的模式。不同的激活函数有各自的特性和用途。常见的激活函数包括以下几种&#xff1a; 1. Sigmoid&#xff08;S型函数&#xff09; 公式&#xff1a; σ ( x ) 1 1 e − x \sigma(x) \frac{1}{1…

录屏达人必备!四款神器助你轻松搞定一切

录屏&#xff0c;一个既简单又实用的技能&#xff0c;不仅能帮助我们记录下电脑上的精彩瞬间&#xff0c;还能在需要的时候进行演示。是不是觉得特别棒呢&#xff1f;今天&#xff0c;我就来给大家分享一下如何轻松地录屏&#xff0c;并推荐四款非常实用的录屏工具。 一、如何录…

暴力目前98%-----找数字

题目描述 给一个二维数组nums&#xff0c;对于每一个元素nums[i]&#xff0c;找出距离最近的且值相等的元素&#xff0c; 输出横纵坐标差值的绝对值之和&#xff0c;如果没有等值元素&#xff0c;则输出-1。 例如&#xff1a; 输入数组 nums 为 0 3 5 4 2 2 5 7 8 3 2 5 4 …

力扣hot100--链表

链表 1. 2. 两数相加 给你两个 非空 的链表&#xff0c;表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的&#xff0c;并且每个节点只能存储 一位 数字。 请你将两个数相加&#xff0c;并以相同形式返回一个表示和的链表。 你可以假设除了数字 0 之外&#xff…

网络学习第二篇

认识网关和路由器 这里大家先了解一下什么三层设备。 三层设备 三层设备是指在网络架构中能够工作在第三层&#xff08;网络层&#xff09;的设备&#xff0c;通常包括三层交换机和路由器。这些设备可以根据IP地址进行数据包的转发和路由选择&#xff0c;从而在不同的网络之间…

JVM Class类文件结构

国庆节快乐 2024年10月2日17:49:22 目录 前言 magic 数 文件版本 使用JClassLib观察class文件 一般信息 接口 常量池 字段 方法 常量池计数器 常量池 类型 CONSTANT_Methodref_info CONSTANT_Class_info 类型结构总表 访问标志 类索引, …