| 课程 | 数据采集 |
|---|---|
| 仓库 | https://gitee.com/lisu6/data_collect/tree/master/4 |
| 作业1东方财富 selenium | https://gitee.com/lisu6/data_collect/tree/master/4/ |
| 作业2慕课selenium | https://gitee.com/lisu6/data_collect/tree/master/4/eastmoney_scrapy |
| 学号姓名 | 102302141 易敏亮 |
一、作业一
熟练掌握 Selenium 查找HTML元素、爬取Ajax网页数据、等待HTML元素等内容。
使用Selenium框架+ MySQL数据库存储技术路线爬取“沪深A股”、“上证A股”、“深证A股”3个板块的股票数据信息。
1.明确爬取对象
网站:从主页中找三个板块进行
"沪深A股": "http://quote.eastmoney.com/center/gridlist.html#hs_a_board","上证A股": "http://quote.eastmoney.com/center/gridlist.html#sh_a_board","深证A股": "http://quote.eastmoney.com/center/gridlist.html#sz_a_board"
数据:
F12调出,需要的数据在table标签中

2.数据库配置
根据字段创建建表语句。在stock_data.stock_info中存储13个数据,以及自增id和存入时间。
CREATE DATABASE IF NOT EXISTS stock_data DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_unicode_ci;USE stock_data;CREATE TABLE IF NOT EXISTS stock_info (id INT AUTO_INCREMENT PRIMARY KEY COMMENT '序号',board_name VARCHAR(20) COMMENT '板块名称(沪深/上证/深证)',stock_code VARCHAR(10) COMMENT '股票代码',stock_name VARCHAR(20) COMMENT '股票名称',current_price DECIMAL(10, 2) COMMENT '最新报价',change_percent VARCHAR(10) COMMENT '涨跌幅(%)',change_amount DECIMAL(10, 2) COMMENT '涨跌额',volume VARCHAR(20) COMMENT '成交量',turnover VARCHAR(20) COMMENT '成交额',amplitude VARCHAR(10) COMMENT '振幅(%)',high_price DECIMAL(10, 2) COMMENT '最高',low_price DECIMAL(10, 2) COMMENT '最低',open_price DECIMAL(10, 2) COMMENT '今开',pre_close DECIMAL(10, 2) COMMENT '昨收',crawl_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间'
);
3.流程设计
确定翻页逻辑:
三个板块中,页面数据加载使用的是Ajax异步加载,对于selenium,只需要找到翻页的a标签即可。

按钮定位:
使用selenium自带xpath,根据title属性查找
next_btn = self.driver.find_element(By.XPATH, "//a[@title='下一页']")
点击:
使用script点击,避免遮挡。
self.driver.execute_script("arguments[0].click();", next_btn)
4.设置最大翻页次数,避免被盾
MAX_PAGE设置为2,可修改
for page in range(1, MAX_PAGES + 1):print(f"正在爬取 {board_name} - 第 {page} 页...")data = self.parse_page()if data:self.save_to_mysql(data, board_name)if page < MAX_PAGES:next_btn = self.driver.find_element(By.XPATH, "//a[@title='下一页']")self.driver.execute_script("arguments[0].click();", next_btn)time.sleep(2)
5.总代码逻辑(折叠)
点击查看代码
import time
import pymysql
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as ECCHROMEDRIVER_PATH = r"C:\Users\24048\.wdm\drivers\chromedriver\win64\142.0.7444.162\chromedriver-win32\chromedriver.exe"DB_CONFIG = {'host': 'localhost','user': 'root','password': 'xxxxxxx','database': 'stock_data','charset': 'utf8mb4'
}URLS = {"沪深A股": "http://quote.eastmoney.com/center/gridlist.html#hs_a_board","上证A股": "http://quote.eastmoney.com/center/gridlist.html#sh_a_board","深证A股": "http://quote.eastmoney.com/center/gridlist.html#sz_a_board"
}MAX_PAGES = 2class StockScraper:def __init__(self):self.conn = pymysql.connect(**DB_CONFIG)self.cursor = self.conn.cursor()options = webdriver.ChromeOptions()options.add_argument('--disable-gpu')service = Service(executable_path=CHROMEDRIVER_PATH)self.driver = webdriver.Chrome(service=service, options=options)self.driver.maximize_window()print(f"浏览器驱动加载成功")def save_to_mysql(self, data_list, board_name):sql = """INSERT INTO stock_info (board_name, stock_code, stock_name, current_price, change_percent, change_amount, volume, turnover, amplitude, high_price, low_price, open_price, pre_close) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"""params = []for item in data_list:row = [board_name] + list(item)processed_row = [0 if x == '-' else x for x in row]params.append(processed_row)self.cursor.executemany(sql, params)self.conn.commit()print(f"[{board_name}] 成功存入 {len(data_list)} 条数据")def parse_page(self):WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".quotetable tbody tr")))rows = self.driver.find_elements(By.CSS_SELECTOR, ".quotetable tbody tr")page_data = []for row in rows:cols = row.find_elements(By.TAG_NAME, "td")stock_code = cols[1].text.strip()stock_name = cols[2].text.strip()current_price = cols[4].text.strip()change_percent = cols[5].text.strip()change_amount = cols[6].text.strip()volume = cols[7].text.strip()turnover = cols[8].text.strip()amplitude = cols[9].text.strip()high = cols[10].text.strip()low = cols[11].text.strip()open_p = cols[12].text.strip()pre_close = cols[13].text.strip()print(f"爬取数据:{stock_code},{stock_name}, {current_price}, {change_percent}, "f"{change_amount}, {volume}, {turnover}, {amplitude}, "f"{high}, {low}, {open_p}, {pre_close}")page_data.append((stock_code, stock_name, current_price, change_percent,change_amount, volume, turnover, amplitude,high, low, open_p, pre_close))return page_datadef crawl_board(self, board_name, url):print(f"\n开始爬取板块: {board_name}")self.driver.get(url)time.sleep(2)for page in range(1, MAX_PAGES + 1):print(f"正在爬取 {board_name} - 第 {page} 页...")data = self.parse_page()if data:self.save_to_mysql(data, board_name)if page < MAX_PAGES:next_btn = self.driver.find_element(By.XPATH, "//a[@title='下一页']")self.driver.execute_script("arguments[0].click();", next_btn)time.sleep(2)def run(self):for name, url in URLS.items():self.crawl_board(name, url)self.close()def close(self):if hasattr(self, 'driver'):self.driver.quit()if hasattr(self, 'cursor'):self.cursor.close()if hasattr(self, 'conn'):self.conn.close()print("爬虫任务结束。")if __name__ == "__main__":scraper = StockScraper()scraper.run()
4.反思与总结
1.强化了selenium流程设计。熟悉元素定位方法:等待元素加载并设置最大时间。
WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".quotetable tbody tr")))
2.封装设计,直接写和面向对象编写差异挺大的,selenium面向对象感受到的最大优势是self.driver的调用,很方便
二、作业二
熟练掌握 Selenium 查找HTML元素、实现用户模拟登录、爬取Ajax网页数据、等待HTML元素等内容。
使用Selenium框架+MySQL爬取中国mooc网课程资源信息(课程号、课程名称、学校名称、主讲教师、团队成员、参加人数、课程进度、课程简介)
1.明确爬取对象
网站: https://www.icourse163.org/我的课程界面
需要翻页,需要点进去获取详细数据

爬取具体的数据:

2.数据库设计
CREATE DATABASE IF NOT EXISTS mooc_spider DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;
USE mooc_spider;CREATE TABLE IF NOT EXISTS course_info (Id INT AUTO_INCREMENT PRIMARY KEY,cCourse VARCHAR(255) COMMENT '课程名称',cCollege VARCHAR(255) COMMENT '学校名称',cTeacher VARCHAR(255) COMMENT '主讲教师',cTeam TEXT COMMENT '团队成员',cCount VARCHAR(50) COMMENT '参加人数',cProcess VARCHAR(100) COMMENT '课程进度',cBrief TEXT COMMENT '课程简介'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
3.流程设计
1,用户登录 + 进入数据爬取页面
登录流程:
1)先点击登录按钮
login_btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, '//div[@class="_3uWA6" and text()="登录/注册"]')))login_btn.click()

2)表单填写数据
iframe = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'div[id^="j-ursContainer"] iframe')))self.driver.switch_to.frame(iframe)phone_input = WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.ID, 'phoneipt')))phone_input.send_keys(self.username)password_input = WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.CSS_SELECTOR, ".j-inputtext.dlemail")))password_input.send_keys(self.password)

3)点击我的课程

4)总结进入页面步骤动画

2.分页逻辑
翻页按钮,最后一页class会和前面不同,利用这一点设置终点。
next_tag = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//a[contains(@class, 'th-bk-main-gh') and text()='下一页']")))self.
先获取具体url再到后面解析


3.具体爬取url逻辑获取

4.具体代码实现


点击查看代码
import time
import re
import pymysql
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementExceptionclass IcourseScraper:def __init__(self):self.db_config = {'host': 'localhost','port': 3306,'user': 'root','password': 'xxxxxxx','db': 'mooc','charset': 'utf8mb4'}self.url = 'https://www.icourse163.org/'self.driver_path = r"C:\Users\24048\.wdm\drivers\chromedriver\win64\142.0.7444.162\chromedriver-win32\chromedriver.exe"self.username = '173xxxxxx'self.password = 'xxxxxxxx'self.links = []self.conn = Noneself.cursor = Noneself.driver = Noneself.wait = Noneself._init_db()self._init_driver()def _init_db(self):try:self.conn = pymysql.connect(**self.db_config)self.cursor = self.conn.cursor()sql = """CREATE TABLE IF NOT EXISTS courses (id INT AUTO_INCREMENT PRIMARY KEY,course_name VARCHAR(255),school VARCHAR(100),teachers TEXT,team VARCHAR(100),student_count VARCHAR(50),course_time VARCHAR(100),introduce TEXT,create_time DATETIME DEFAULT CURRENT_TIMESTAMP) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"""self.cursor.execute(sql)self.conn.commit()print("数据库连接及建表成功")except Exception as e:print(f"数据库初始化失败: {e}")def _init_driver(self):options = Options()options.add_argument('--disable-gpu')options.add_argument('--no-sandbox')options.add_argument('--disable-blink-features=AutomationControlled')options.add_argument('--start-maximized')options.add_experimental_option('excludeSwitches', ['enable-automation'])options.add_experimental_option('useAutomationExtension', False)service = Service(executable_path=self.driver_path)self.driver = webdriver.Chrome(service=service, options=options)self.wait = WebDriverWait(self.driver, 10)self.driver.get(self.url)def login(self):try:login_btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, '//div[@class="_3uWA6" and text()="登录/注册"]')))login_btn.click()iframe = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'div[id^="j-ursContainer"] iframe')))self.driver.switch_to.frame(iframe)phone_input = WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.ID, 'phoneipt')))phone_input.send_keys(self.username)password_input = WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.CSS_SELECTOR, ".j-inputtext.dlemail")))password_input.send_keys(self.password)time.sleep(1)submit_btn = self.driver.find_element(By.ID, 'submitBtn')submit_btn.click()self.driver.switch_to.default_content()print("登录成功")except Exception as e:print(f"登录异常: {e}")try:my_courses_btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//div[contains(@class, '_3uWA6') and text()='我的课程']")))my_courses_btn.click()all_handles = self.driver.window_handlesself.driver.switch_to.window(all_handles[-1])time.sleep(2)except Exception:print("未找到我的课程入口")time.sleep(2)def _collect_page_links(self):try:courses_tag = self.wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.course-panel-body-wrapper .course-card-wrapper')))time.sleep(1)for tag in courses_tag:link = tag.find_element(By.CLASS_NAME, 'menu').find_element(By.TAG_NAME, 'a').get_attribute('href')if link not in self.links:self.links.append(link)except TimeoutException:passdef scan_all_pages(self):while True:try:self._collect_page_links()next_tag = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//a[contains(@class, 'th-bk-main-gh') and text()='下一页']")))self.driver.execute_script("arguments[0].click();", next_tag)time.sleep(2)except Exception:print("翻页结束")breakprint(f'共获取到 {len(self.links)} 门课程')def parse_and_save(self):for link in self.links:self.driver.get(link)time.sleep(2)try:course = WebDriverWait(self.driver, 20).until(EC.presence_of_element_located((By.CSS_SELECTOR, ".course-title.f-ib.f-vam"))).textschool_tag = self.wait.until(EC.presence_of_element_located((By.XPATH, "//img[@class='u-img']")))school = school_tag.get_attribute('alt')teacher_tag = self.wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".um-list-slider_con h3.f-fc3")))teachers_list = [elem.text for elem in teacher_tag]teachers_str = ",".join(teachers_list)team = teachers_list[0] if teachers_list else ""count_tag = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "span.count")))student_count = re.search(r'\d+', count_tag.text).group()time_tag = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".course-enroll-info_course-info_term-info_term-time")))course_time = time_tag.find_elements(By.TAG_NAME, 'span')[1].textintroduce_tag = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "#j-rectxt2")))introduce = introduce_tag.textprint(f"解析: {course}")self._save_to_db(course, school, teachers_str, team, student_count, course_time, introduce)except Exception as e:print(f"解析失败 {link}: {e}")def _save_to_db(self, *args):if self.conn and self.cursor:try:sql = """INSERT INTO courses (course_name, school, teachers, team, student_count, course_time, introduce) VALUES (%s, %s, %s, %s, %s, %s, %s)"""self.cursor.execute(sql, args)self.conn.commit()print("入库成功")except Exception as e:self.conn.rollback()print(f"入库失败: {e}")def close(self):if self.cursor:self.cursor.close()if self.conn:self.conn.close()if self.driver:self.driver.quit()if __name__ == "__main__":scraper = IcourseScraper()scraper.login()scraper.scan_all_pages()scraper.parse_and_save()scraper.close()
5. 心得体会
1.登录流程的精细化处理让我认识到框架切换的重要性。 最初无法定位登录表单,排查后发现整个登录区域被嵌套在独立的iframe中,必须使用driver.switch_to.frame()切换到该框架上下文才能操作元素。这启示我面对现代复杂网页时,首先要检查DOM结构层次,框架隔离是常见的封装手段。
2.显式等待与隐式等待的组合使用平衡了稳定性与效率。 我为关键操作如页面跳转、元素加载设置显式等待WebDriverWait,同时在某些非关键节点保留time.sleep()缓冲,这种混合策略既确保了元素加载完成,又避免了过度等待带来的时间浪费。
3.两阶段爬取策略显著提升了数据采集的可靠性。 先遍历所有分页收集课程详情页链接,再逐条访问解析,虽然看似效率不高,但有效避免了因网络波动导致的页面跳转失败,实现了“断点可续”,这在处理大量数据时尤为关键。
作业三
掌握大数据相关服务,熟悉Xshell的使用
完成文档 华为云_大数据实时分析处理实验手册-Flume日志采集实验(部分)v2.docx 中的任务,即为下面5个任务,具体操作见文档。
1.绑定公网ip,开通mapreduce服务,登录到master节点

2.Python脚本生成测试数据
autodatagen.py
点击查看代码
#coding:utf-8
###########################################
# rowkey:随机的两位数 + 当前时间戳,并要确保该rowkey在表数据中唯一。
# 列定义:行健,用户名,年龄,性别,商品ID,价格,门店ID,购物行为,电话,邮箱,购买日期
#421564974572,Sgxrp,20,woman,152121,297.64,313015,scan,15516056688,JbwLDQmzwP@qq.com,2019-08-01
#601564974572,Lbeuo,43,man,220902,533.13,313016,pv,15368953106,ezfrJSluoR@163.com,2019-08-05###########################################
import random
import string
import sys
import time# 大小写字母
alphabet_upper_list = string.ascii_uppercase
alphabet_lower_list = string.ascii_lowercase# 随机生成指定位数的字符串
def get_random(instr, length):# 从指定序列中随机获取指定长度的片段并组成数组,例如:['a', 't', 'f', 'v', 'y']res = random.sample(instr, length)# 将数组内的元素组成字符串result = ''.join(res)return result# 放置生成的并且不存在的rowkey
rowkey_tmp_list = []# 制作rowkey
def get_random_rowkey():import timepre_rowkey = ""while True:# 获取00~99的两位数字,包含00与99num = random.randint(00, 99)# 获取当前10位的时间戳timestamp = int(time.time())# str(num).zfill(2)为字符串不满足2位,自动将该字符串补0pre_rowkey = str(num).zfill(2) + str(timestamp)if pre_rowkey not in rowkey_tmp_list:rowkey_tmp_list.append(pre_rowkey)breakreturn pre_rowkey# 创建用户名
def get_random_name(length):name = string.capwords(get_random(alphabet_lower_list, length))return name# 获取年龄
def get_random_age():return str(random.randint(18, 60))# 获取性别
def get_random_sex():return random.choice(["woman", "man"])# 获取商品ID
def get_random_goods_no():goods_no_list = ["220902", "430031", "550012", "650012", "532120","230121","250983", "480071", "580016", "950013", "152121","230121"]return random.choice(goods_no_list)# 获取商品价格(浮点型)
def get_random_goods_price():# 随机生成商品价格的整数位,1~999的三位数字,包含1与999price_int = random.randint(1, 999)# 随机生成商品价格的小数位,1~99的两位数字,包含1与99price_decimal = random.randint(1, 99)goods_price = str(price_int) +"." + str(price_decimal)return goods_price# 获取门店ID
def get_random_store_id():store_id_list = ["313012", "313013", "313014", "313015", "313016","313017","313018", "313019", "313020", "313021", "313022","313023"]return random.choice(store_id_list)# 获取购物行为类型
def get_random_goods_type():goods_type_list = ["pv", "buy", "cart", "fav","scan"]#点击、购买、加购、收藏、浏览return random.choice(goods_type_list)
# 获取电话号码
def get_random_tel():pre_list = ["130", "131", "132", "133", "134", "135", "136", "137", "138", "139", "147", "150","151", "152", "153", "155", "156", "157", "158", "159", "186", "187", "188"]return random.choice(pre_list) + ''.join(random.sample('0123456789', 8))# 获取邮箱名
def get_random_email(length):alphabet_list = alphabet_lower_list + alphabet_upper_listemail_list = ["163.com", "126.com", "qq.com", "gmail.com","huawei.com"]return get_random(alphabet_list, length) + "@" + random.choice(email_list)# 获取商品购买日期(统计最近7天数据)
def get_random_buy_time():buy_time_list = ["2019-08-01", "2019-08-02", "2019-08-03", "2019-08-04", "2019-08-05", "2019-08-06", "2019-08-07"]return random.choice(buy_time_list)# 生成一条数据
def get_random_record():return get_random_rowkey() + "," + get_random_name(5) + "," + get_random_age() + "," + get_random_sex() + "," + get_random_goods_no() + ","+get_random_goods_price()+ "," +get_random_store_id()+ "," +get_random_goods_type() + ","+ get_random_tel() + "," + get_random_email(10) + "," +get_random_buy_time()# 获取随机整数用于休眠
def get_random_sleep_time():return random.randint(5, 10)# 将记录写到文本中
def write_record_to_file():# 覆盖文件内容,重新写入f = open(sys.argv[1], 'w')i = 0while i < int(sys.argv[2]):record = get_random_record()f.write(record)# 换行写入f.write('\n')i += 1f.close()if __name__ == "__main__":write_record_to_file()
3.配置kafka、安装flume



4.连通kafka和flume,有数据产生

5.心得体会
1。云服务部署让我深刻理解了大数据基础设施的便捷性。 在华为云控制台开通MRS(MapReduce服务)集群的过程仅需十几分钟,相比传统方式需要在多台物理机上部署Hadoop生态组件,云服务提供了开箱即用的环境,让我能快速聚焦于核心的数据管道搭建,而非繁琐的环境配置。
2.各组件协同工作展现了完整数据流管道的设计逻辑。 从Python脚本模拟日志生成作为数据源,到Kafka作为消息队列缓冲,最后通过Flume采集到HDFS存储,这个实验让我直观理解了“数据生成→传输→采集→存储”的标准大数据处理链路,每个组件各司其职又紧密衔接。
3.Flume配置文件的编写是对数据流向的精确控制。 通过编辑flume-kafka.conf文件,分别定义Source监听本地日志目录、Channel采用内存缓存、Sink连接到Kafka主题,这种配置驱动的方式让我体会到声明式架构的灵活性,只需修改配置文件即可调整整个数据路由逻辑。
4.命令行验证是检验管道联通性的关键步骤。 使用kafka-console-consumer.sh成功消费到由Python生成、经Flume传输的日志消息时,我意识到在大数据系统中,每一步都需要有明确的验证手段,这种端到端的测试思维在实际工程中至关重要。