《数据采集与融合》第三次作业
学号: 102302128姓名: 吴建良
Gitee仓库地址: (请在这里填写您的Gitee仓库链接)
作业①:多线程爬取网站图片
一、核心思路与代码
- MiniCrawler (爬虫核心类)MiniCrawler 类封装了爬虫的核心逻辑和所有共享资源。
1.1. 方法: init
1.1.1 思路:学号约束: 根据学号 102302128 初始化页面限制 PAGE_LIMIT = 28 和图片限制 IMAGE_LIMIT = 128。共享资源: 初始化已下载计数器 downloaded_count 和已访问集合 visited_pages。线程安全: 实例化一个 threading.Lock(),它将是整个多线程爬虫的核心,用于保护所有共享资源(计数器、集合、队列)在被多个线程同时访问时的数据一致性。环境准备: 创建用于存储图片的目录 images_hw3。
1.1.2 相关代码块:
class MiniCrawler:
def __init__(self, page_limit, image_limit):# --- 根据学号 (102302128) 设置约束 ---self.PAGE_LIMIT = page_limit # 页面限制 (最后两位: 28)self.IMAGE_LIMIT = image_limit # 图片限制 (最后三位: 128)self.downloaded_count = 0 # 已下载图片数量self.visited_pages = set() # 已访问页面URL# 线程锁,用于在多线程中安全地更新计数和集合self.lock = threading.Lock()# 确保图片存储目录存在self.image_dir = 'images_hw3'os.makedirs(self.image_dir, exist_ok=True)print(f"--- 爬虫已初始化 ---")# ... (省略打印信息) ...
1.2. 方法: crawl_page (页面爬取与链接发现)1.2.1 思路:页面限制 (线程安全): 首先,加锁检查 visited_pages 集合,判断页面是否已访问或是否达到 PAGE_LIMIT (28) 上限。如果条件满足,则加锁将当前 url 添加到集合中,防止其他线程重复爬取。模拟请求: 使用 requests.get 并携带 User-Agent 伪装成浏览器,获取页面 HTML。图片解析: 使用 BeautifulSoup 和 lxml 解析器。关键是同时查找 img.get('data-src') (用于懒加载图片) 和 img.get('src') (用于普通图片),确保能抓取到所有图片。图片下载: 遍历找到的图片 URL,调用 self.save_image 方法进行下载。链接解析 (BFS): 查找所有 标签,使用 urllib.parse.urljoin 自动将相对路径(如 ../a.jpg)补全为绝对路径。链接过滤: 筛选出 weather.com.cn 站内的、且未被访问过的新链接,返回给调用者(single_thread 或 worker)将其加入队列。
1.2.2 相关代码块:
def crawl_page(self, url):
# --- 检查是否达到页面上限 ---
with self.lock:if url in self.visited_pages or len(self.visited_pages) >= self.PAGE_LIMIT:return []self.visited_pages.add(url)print(f"\n页面 {len(self.visited_pages)}/{self.PAGE_LIMIT}: 正在爬取 {url}")try:headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}r = requests.get(url, timeout=5, headers=headers)r.raise_for_status()soup = BeautifulSoup(r.content, 'lxml')# --- 1. 下载图片 ---image_urls_on_page = []for img in soup.find_all('img'):src = img.get('data-src') or img.get('src') # (处理懒加载)if src and not src.startswith('data:image/'): full_img_url = urljoin(url, src) # (处理相对路径)if any(full_img_url.lower().endswith(ext) for ext in ['.jpg', '.png', '.gif', '.jpeg']) and full_img_url not in image_urls_on_page:image_urls_on_page.append(full_img_url)print(f" -> 找到 {len(image_urls_on_page)} 张图片...")for img_url in image_urls_on_page:with self.lock:if self.downloaded_count >= self.IMAGE_LIMIT: # (检查图片上限)print(" -> 图片配额已满, 停止本页下载。")breakself.save_image(img_url)# --- 2. 收集链接 ---new_links = []for a in soup.find_all('a', href=True):full_url = urljoin(url, a['href'])if 'weather.com.cn' in full_url and full_url not in self.visited_pages:clean_url = full_url.split('#')[0].split('?')[0] # (清理URL)if clean_url not in new_links:new_links.append(clean_url)print(f" -> 找到 {len(new_links)} 个新链接。")return new_links[:10] except Exception as e:return []
1.3. 方法: save_image (图片下载与线程安全)1.3.1 思路:图片限制 (线程安全): 加锁检查 self.downloaded_count 是否达到 IMAGE_LIMIT (128)。这是为了防止多个线程在同一时间都通过了检查,导致下载超出限制。下载: 使用 requests.get(stream=True) 并配合 iter_content() 以流式写入文件,这样可以高效下载大文件(如高清图片)而不会占满内存。更新计数 (线程安全): 图片写入成功后,再次加锁,安全地将 self.downloaded_count += 1。
1.3.2 相关代码块:
def save_image(self, img_url):
"""
下载并保存单张图片。
这是一个线程安全的方法。
"""
with self.lock:if self.downloaded_count >= self.IMAGE_LIMIT:return False # 已达到上限,停止下载try:r = requests.get(img_url, timeout=5, stream=True)r.raise_for_status() filename = os.path.basename(img_url)if not filename or len(filename) > 50:filename = f"img_{self.downloaded_count + 1}.jpg" save_path = os.path.join(self.image_dir, filename)with open(save_path, 'wb') as f:for chunk in r.iter_content(1024): f.write(chunk)# --- 再次加锁,安全地更新计数器 ---with self.lock:if self.downloaded_count < self.IMAGE_LIMIT:self.downloaded_count += 1print(f"下载 {self.downloaded_count}/{self.IMAGE_LIMIT}: {img_url} -> {save_path}")return Trueelse:os.remove(save_path) # (下载晚了,名额已满,删除文件)return False
except Exception as e:return False
-
爬虫调度逻辑
2.1. 方法: single_thread2.1.1 思路:使用一个 Python list 作为 queue (队列)。在 while 循环中,通过 queue.pop(0) 从列表头部取出一个 URL(先进先出),然后调用 crawler.crawl_page() 爬取页面。最后,通过 queue.extend(new_links) 将新发现的链接添加到列表尾部。这实现了一个标准的广度优先搜索 (BFS),按层级爬取网站。
2.1.2 相关代码块:def single_thread(start_url, page_limit, image_limit):
print("--- 启动 [单线程] 爬虫 ---")
crawler = MiniCrawler(page_limit=page_limit, image_limit=image_limit)
queue = [start_url]
while queue:
if crawler.downloaded_count >= crawler.IMAGE_LIMIT or len(crawler.visited_pages) >= crawler.PAGE_LIMIT:
print("\n--- 任务限制已达到,停止爬取 ---")
break
url = queue.pop(0) # (从队列头部取出一个URL)
new_links = crawler.crawl_page(url)
queue.extend(new_links) # (将新找到的链接添加到队列尾部)print(f"\n--- [单线程] 爬虫结束 ---")
2.2. 方法: multi_thread 及 worker2.2.1 思路:共享资源: multi_thread 函数创建一个 crawler 实例和一个 queue 列表。创建线程: 创建 5 个 threading.Thread,所有线程都指向同一个 worker 函数。Worker 逻辑 (线程安全): worker 函数是多线程的核心。它在一个 while True 循环中运行。取任务: 加锁 (with crawler.lock:),然后从共享 queue 中 pop(0) 一个 URL。如果队列为空,break 退出线程。执行任务: 释放锁,调用 crawler.crawl_page(url)。这个过程是 I/O 密集型(网络等待),耗时最长,此时锁是释放的,允许其他线程去队列取任务,从而实现并发。放回任务: crawl_page 返回新链接后,再次加锁 (with crawler.lock:),将 new_links 添加回共享 queue。
2.2.2 相关代码块:
def multi_thread(start_url, page_limit, image_limit, thread_count=5):
print(f"--- 启动 [多线程] 爬虫 ({thread_count} 个线程) ---")
crawler = MiniCrawler(page_limit=page_limit, image_limit=image_limit)
queue = [start_url]
def worker():while True:url = None# (加锁:安全地从队列中取出一个URL)with crawler.lock:if queue:url = queue.pop(0)else:break # (如果队列为空,线程退出)if crawler.downloaded_count >= crawler.IMAGE_LIMIT or len(crawler.visited_pages) >= crawler.PAGE_LIMIT:break # 任务完成,线程退出if url:new_links = crawler.crawl_page(url)# (加锁:安全地将新链接添加回队列)with crawler.lock:queue.extend(new_links)time.sleep(0.1)threads = []
for _ in range(thread_count):t = threading.Thread(target=worker)threads.append(t)t.start()for t in threads:t.join()print(f"\n--- [多线程] 爬虫结束 ---")
# ... (省略打印信息) ...
二、代码与输出结果代码连接:
代码:https://gitee.com/wujianliang9/2025-data-collection/blob/master/第三次作业/1.py
输出:

三、心得体会
1.多线程效率的压倒性优势:
本次实验最直观的感受就是多线程 (threading) 带来的巨大性能提升。在相同的网络环境下,爬取 128 张图片,多线程 (5个线程) 仅用时 10.23 秒,而单线程用时 30.58 秒(根据我的单独测试),效率提升了近 3 倍。这清晰地展示了在 I/O 密集型任务(如网络请求)中,并发处理能如何有效利用等待时间。
2.线程安全 (Threading Lock) 是核心关键:
在编写多线程爬虫时,最大的挑战是处理共享资源。在 MiniCrawler 类中,downloaded_count (下载计数)、 visited_pages (已访问集合) 以及 queue (任务队列) 都是被所有线程共享的。如果不使用 threading.Lock 进行保护,多个线程会同时“读-改-写”这些变量,导致计数不准(例如下载了 130 张图片)或重复爬取同一页面。通过在所有关键的读/写操作(如 if self.downloaded_count >= ...、self.downloaded_count += 1、queue.pop(0))前后使用 with self.lock:,我确保了数据的一致性和准确性。
3.广度优先 (BFS) 爬取策略:
通过使用一个 list 作为队列,并坚持 pop(0) (从头部取) 和 extend() (在尾部加) 的原则,我实现了一个广度优先搜索 (BFS) 爬虫。这保证了爬虫会优先爬取完第一层深度的所有链接(如首页),再进入第二层,这种按层级爬取的方式比深度优先 (DFS) 更加可控,也更符合网站爬取的一般逻辑。
作业②:
Scrapy 爬取股票数据并存入 SQLite
2.1、实验要求要求:熟练掌握 Scrapy 中 Item、Pipeline 数据的序列化输出方法;Scrapy+Xpath+MySQL数据库存储技术路线爬取股票相关信息 [cite: 作业3_20251112.docx]。候选网站:东方财富网 (eastmoney.com)。输出信息:MySQL 数据库存储(实验修改:由于本地无 MySQL 环境,改为使用 SQLite)。
2.2、核心思路与代码
- items.py (数据模型)1.1.1 思路:根据作业要求 [cite: 作业3_20251112.docx] 和范例代码,定义一个 StockItem 类。此类继承自 scrapy.Item,并使用 scrapy.Field() 为每个数据字段(如 stock_code, stock_name 等)进行声明。这为 Scrapy 提供了统一的数据容器。我特别补全了作业要求中提到但范例中遗漏的 turnover (成交额) 字段。
1.1.2 相关代码块:
import scrapy
class StockItem(scrapy.Item):
# (仿照您提供的 Item,并补全作业要求的字段)
id = scrapy.Field() # 序号
stock_code = scrapy.Field() # 股票代码
stock_name = scrapy.Field() # 股票名称
current_price = scrapy.Field() # 最新报价
change_percent = scrapy.Field() # 涨跌幅
change_amount = scrapy.Field() # 涨跌额
volume = scrapy.Field() # 成交量
turnover = scrapy.Field() # 成交额 (作业②要求, 已为您补全)
amplitude = scrapy.Field() # 振幅
high = scrapy.Field() # 最高
low = scrapy.Field() # 最低
open = scrapy.Field() # 今开
previous_close = scrapy.Field() # 昨收
crawl_time = scrapy.Field() # 爬取时间
- eastmoney.py (爬虫逻辑)2.1.1 思路 (API分析):作业要求使用 Xpath,但 eastmoney.com 是一个 JS 动态渲染的网站,Scrapy 无法直接用 Xpath 抓取。因此,我采用了范例中提供的更优方案:抓取其数据API。通过 start_requests 构造一个包含所有查询参数(如 pz=100 获取100条数据)的 API URL,并伪装 User-Agent 和 Referer 来绕过反爬虫。
2.1.2 相关代码块:
import scrapy
import json
from stock_spider.items import StockItem
from datetime import datetime
import time
class EastmoneySpider(scrapy.Spider):
name = 'eastmoney'
allowed_domains = ['eastmoney.com']
def start_requests(self):base_url = "[http://82.push2.eastmoney.com/api/qt/clist/get](http://82.push2.eastmoney.com/api/qt/clist/get)"params = {'pn': '1','pz': '100', # (爬取 100 条数据)'po': '1','np': '1','ut': 'bd1d9ddb04089700cf9c27f6f7426281','fltt': '2','invt': '2','fid': 'f3','fs': 'm:0 t:6,m:0 t:80,m:1 t:2,m:1 t:23',# (已为您添加 f6=成交额)'fields': 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f12,f13,f14,f15,f16,f17,f18','_': str(int(time.time() * 1000))}url = base_url + '?' + '&'.join([f'{k}={v}' for k, v in params.items()])yield scrapy.Request(url=url,callback=self.parse_stock_data,headers={'Referer': '[http://quote.eastmoney.com/](http://quote.eastmoney.com/)','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'})
2.2.1 思路 (数据解析):在 parse_stock_data 回调函数中,使用 json.loads() 解析 API 返回的 JSON 字符串。遍历 data['data']['diff'] 下的每条股票 stock,将其填充到 StockItem 中。关键点是使用 stock.get('f12', '') 这种安全的方式获取数据,避免因某个字段缺失 (None) 导致程序崩溃。同时,调用 format_volume 和 format_turnover 辅助函数将成交量(手)和成交额(元)格式化为易读的“万”或“亿”单位。2.2.2 相关代码块:def parse_stock_data(self, response):
try:
data = json.loads(response.text)
if data.get('data') and data['data'].get('diff'):
stocks = data['data']['diff']
for index, stock in enumerate(stocks, 1):item = StockItem()item['id'] = indexitem['stock_code'] = stock.get('f12', '')item['stock_name'] = stock.get('f14', '')item['current_price'] = stock.get('f2', 0)item['change_percent'] = stock.get('f3', 0)item['change_amount'] = stock.get('f4', 0)item['volume'] = self.format_volume(stock.get('f5', 0)) # 成交量item['turnover'] = self.format_turnover(stock.get('f6', 0)) # 成交额 (已补全)item['amplitude'] = stock.get('f7', 0)item['high'] = stock.get('f15', 0)item['low'] = stock.get('f16', 0)item['open'] = stock.get('f17', 0)item['previous_close'] = stock.get('f18', 0)item['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')yield item
except Exception as e:self.logger.error(f"解析数据失败: {e}")
def format_volume(self, volume):
"""格式化成交量 (手)"""
try:
volume = float(volume)
if volume >= 100000000:
return f"{volume/100000000:.2f}亿手"
elif volume >= 10000:
return f"{volume/10000:.2f}万手"
else:
return f"{volume}手"
except:
return "0手"
def format_turnover(self, turnover):
"""(新增) 格式化成交额 (元)"""
try:
turnover = float(turnover)
if turnover >= 100000000:
return f"{turnover/100000000:.2f}亿"
elif turnover >= 10000:
return f"{turnover/10000:.2f}万"
else:
return str(turnover)
except:
return "0"
3. pipelines.py (数据管道)
3.1.1 思路 (SQLitePipeline):由于我本地没有安装 MySQL,我将 MySQLPipeline 修改为了 SQLitePipeline,它使用 Python 内置的 sqlite3 库,无需安装。open_spider: 连接到本地的 stocks.db 文件,并执行 DROP TABLE IF EXISTS 来清空旧数据,确保每次运行都是全新的。create_table: 使用 SQLite 语法创建 stock_data 表,将 DECIMAL 替换为 REAL,VARCHAR 替换为 TEXT,并设置 UNIQUE (stock_code) 来防止重复。process_item: 使用 INSERT OR REPLACE INTO ... VALUES (?, ...) 这种 SQLite 特有的语法。它会自动处理数据重复:如果 stock_code 已存在,则更新该行数据;如果不存在,则插入新行。这比 INSERT ... ON DUPLICATE KEY UPDATE 更简洁。
3.1.2 相关代码块:import sqlite3 # 1. 将 pymysql 替换为 sqlite3
import json
class SQLitePipeline: # 2. 重命名为 SQLitePipeline
def init(self):
# (我们不再需要从 settings 读取配置)
self.db_name = 'stocks.db'
self.connection = None
self.cursor = None
@classmethod
def from_crawler(cls, crawler):return cls()def open_spider(self, spider):# 3. 连接到 SQLite 数据库文件self.connection = sqlite3.connect(self.db_name)self.cursor = self.connection.cursor()# 4. (仿照范例,先清空表,确保每次都是最新数据)self.cursor.execute("DROP TABLE IF EXISTS stock_data")self.create_table()def create_table(self):# 5. (使用 SQLite 语法创建表格)create_table_sql = """CREATE TABLE IF NOT EXISTS stock_data (id INTEGER PRIMARY KEY,stock_code TEXT NOT NULL,stock_name TEXT NOT NULL,current_price REAL,change_percent REAL,change_amount REAL,volume TEXT,turnover TEXT, amplitude REAL,high REAL,low REAL,open REAL,previous_close REAL,crawl_time TEXT,UNIQUE (stock_code))"""self.cursor.execute(create_table_sql)def close_spider(self, spider):self.connection.commit() # 6. 提交更改self.connection.close()def process_item(self, item, spider):# 7. (使用 SQLite 的 INSERT OR REPLACE 语法 和 ? 占位符)sql = """INSERT OR REPLACE INTO stock_data (id, stock_code, stock_name, current_price, change_percent, change_amount, volume, turnover, amplitude, high, low, open, previous_close, crawl_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""self.cursor.execute(sql, (item['id'], item['stock_code'], item['stock_name'],item['current_price'], item['change_percent'], item['change_amount'],item['volume'], item['turnover'], item['amplitude'],item['high'], item['low'], item['open'],item['previous_close'], item['crawl_time']))self.connection.commit() return item
class JsonWriterPipeline:
# (仿照范例,同时输出一个 JSON 文件)
def open_spider(self, spider):
self.file = open('stocks.json', 'w', encoding='utf-8')
self.file.write('[\n')
def close_spider(self, spider):self.file.write('\n]')self.file.close()def process_item(self, item, spider):line = json.dumps(dict(item), ensure_ascii=False) + ",\n"self.file.write(line)return item
2.3、代码与输出结果
-
代码连接: https://gitee.com/wujianliang9/2025-data-collection/tree/master/第三次作业
-
1. 终端运行
scrapy crawl eastmoney截图:

- 2.
stocks.json文件内容截图:

- 3.
stocks.db数据库内容截图:
![image]()
2.4、心得体会
-
API 优于 Xpath:
本次作业要求使用 Xpath,但我分析eastmoney.com时发现其数据是 JS 动态加载的。如果强行使用 Xpath,就必须上 Selenium,这违背了 Scrapy 的初衷。通过模仿范例代码,我学会了优先寻找数据 API (如push2.eastmoney.com)。这种方法更高效、更稳定,并且直接返回结构化的 JSON,完全绕过了复杂的 JS 渲染和 Xpath 解析,是爬取动态网站的最佳实践。 -
Scrapy 框架的模块化:
我深刻体会到了 Scrapy 框架的强大模块化设计。items.py像一个“数据合同”,定义了爬虫要抓什么。spiders/目录下的爬虫(如eastmoney.py)只负责“抓取”和“解析”。pipelines.py只负责“存储”和“清洗”。
当我发现没有 MySQL 时,我不需要修改任何爬虫逻辑代码 (eastmoney.py),只需要修改pipelines.py,将MySQLPipeline替换为SQLitePipeline,并在settings.py中切换一下配置,就能无缝切换数据库。这种低耦合的设计非常强大。
-
数据库的适配性:
在切换数据库时,我学到了不同数据库在 SQL 语法上的细微差别。例如,MySQL 使用的INSERT ... ON DUPLICATE KEY UPDATE在 SQLite 中并不完全支持,但 SQLite 提供了更简洁的INSERT OR REPLACE INTO来实现相同的功能(插入或替换)。同时,数据类型也需要适配(如VARCHAR->TEXT,DECIMAL->REAL)。 -
数据清洗的必要性:
API 返回的数据(如f5和f6)是原始的数字(如26130000),这对用户并不直观。通过在eastmoney.py中编写format_volume和format_turnover两个辅助函数,将数据在存入 Item 之前就转换为 "2.61亿" 或 "2613万",这是一种很好的数据预处理习惯,能极大提高最终数据的可读性。
作业③:Scrapy 爬取外汇数据并存入 SQLite
3.1、实验要求
- 要求:熟练掌握 Scrapy 中 Item、Pipeline 数据的序列化输出方法;使用
scrapy 框架+Xpath+MySQL数据库存储技术路线爬取外汇网站数据 - 候选网站:中国银行外汇牌价 (
https://www.boc.cn/sourcedb/whpj/) - 输出信息:数据库存储
3.2、核心思路与代码
1. items.py (数据模型)
- 1.1.1 思路:
根据作业要求 中提供的表格样式,定义一个ForexItem类。此类继承自scrapy.Item,并为表格中的Currency(货币名称),TBP(现汇买入价),CBP(现钞买入价),TSP(现汇卖出价),CSP(现钞卖出价), 和Time(发布时间) 声明scrapy.Field()。 - 1.1.2 相关代码块:
import scrapyclass ForexItem(scrapy.Item):# (根据作业③要求的表格结构)Currency = scrapy.Field() # 货币名称TBP = scrapy.Field() # 现汇买入价CBP = scrapy.Field() # 现钞买入价TSP = scrapy.Field() # 现汇卖出价CSP = scrapy.Field() # 现钞卖出价Time = scrapy.Field() # 发布时间crawl_time = scrapy.Field() # 爬取时间
2. boc_forex.py (爬虫逻辑)
-
2.1.1 思路 (Xpath 定位):
作业要求使用Xpath。通过分析同学的成功日志 [cite: image_f0ff57.png-da474584-77a9-4fce-9f2c-29d85e89de2a] 和 F12 调试,我推翻了“页面是动态”的错误判断,确认了数据是静态的。- 定位表格: 页面上有多个表格,通过
response.xpath('//table')获取所有表格,发现数据在第二个表格 (tables[1]) 中。 - 定位行: 使用
target_table.xpath('.//tr[position()>1]')选取第二个表格中的所有<tr>行,并跳过第1行(表头)。
- 定位表格: 页面上有多个表格,通过
-
2.1.2 相关代码块:
import scrapy from forex_crawler.items import ForexItem from datetime import datetimeclass BocForexSpider(scrapy.Spider):name = 'boc_forex'allowed_domains = ['boc.cn']# 1. 使用作业指定的、已被证明可用的 URLstart_urls = ['[https://www.boc.cn/sourcedb/whpj/](https://www.boc.cn/sourcedb/whpj/)']custom_settings = {'USER_AGENT': 'Mozilla/5.0 ...' # (伪装 User-Agent)}def parse(self, response):self.logger.info("--- 启动 [中国银行] 爬虫 (同学的 URL 方案) ---")# 2. 查找页面上的所有 <table>tables = response.xpath('//table')self.logger.info(f"--- 页面上共找到 {len(tables)} 个表格 ---")# 3. 目标数据在第二个表格 (索引 [1])target_table = tables[1] # 4. 提取所有行,跳过表头 (position()>1)rows = target_table.xpath('.//tr[position()>1]')self.logger.info(f"--- 找到 {len(rows)} 行外汇数据 ---")for row in rows:# ... (提取逻辑见 2.2.1) ...yield item -
2.2.1 思路 (Xpath 提取):
在遍历rows时,必须使用健壮的 Xpath 来提取数据。string(./td[1]): 我没有使用td[1]/text(),因为它很脆弱。我使用了string(./td[1])函数,它可以提取<td>标签下所有的文本(无论嵌套多深),并自动拼接。.get().strip(): 配合.get()安全地获取结果,并用.strip()清除空白符。- 精确索引:
td[1]到td[7]被用来精确对应“货币名称”、“现汇买入价”...“发布时间”等列。
-
2.2.2 相关代码块:
def parse(self, response):# ... (省略表格定位) ...for row in rows:item = ForexItem()# (使用 string() 函数并清理空白符)currency = row.xpath("string(./td[1])").get(default='').strip()tbp = row.xpath("string(./td[2])").get(default='0').strip() # 现汇买入价cbp = row.xpath("string(./td[3])").get(default='0').strip() # 现钞买入价tsp = row.xpath("string(./td[4])").get(default='0').strip() # 现汇卖出价csp = row.xpath("string(./td[5])").get(default='0').strip() # 现钞卖出价time_str = row.xpath("string(./td[7])").get(default='').strip() # 发布时间if currency:item['Currency'] = currencyitem['TBP'] = tbpitem['CBP'] = cbpitem['TSP'] = tspitem['CSP'] = cspitem['Time'] = time_stritem['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')yield itemself.logger.info(f"--- 数据提取完成, 共 {item_count} 条 ---")
3. pipelines.py (数据管道)
-
3.1.1 思路 (SQLitePipeline):
继续使用sqlite3替代 MySQL。- 创建表:
CREATE TABLE语句的字段名与ForexItem严格对应,并将Currency(货币名称) 设为PRIMARY KEY(主键)。 - 数据去重: 在
process_item中,使用INSERT OR REPLACE INTO ...语法。这利用了Currency主键:如果货币(如“美元”)已存在,则更新其汇率;如果不存在,则插入新行。
- 创建表:
-
3.1.2 相关代码块:
import sqlite3 import json from itemadapter import ItemAdapterclass SQLitePipeline:def __init__(self):self.db_name = 'forex.db' # (新数据库文件)self.connection = Noneself.cursor = None# ... (省略 open_spider 和 create_table) ...def create_table(self):create_table_sql = """CREATE TABLE IF NOT EXISTS forex_data (Currency TEXT PRIMARY KEY,TBP REAL, CBP REAL, TSP REAL, CSP REAL,Time TEXT, crawl_time TEXT)"""self.cursor.execute(create_table_sql)def close_spider(self, spider):self.connection.commit()self.connection.close()def _to_float(self, value):# ... (辅助函数见 3.2.1) ...def process_item(self, item, spider):sql = """INSERT OR REPLACE INTO forex_data (Currency, TBP, CBP, TSP, CSP, Time, crawl_time) VALUES (?, ?, ?, ?, ?, ?, ?)"""try:self.cursor.execute(sql, (item['Currency'],self._to_float(item.get('TBP')), # (使用辅助函数)# ... (省略) ...item['crawl_time']))self.connection.commit()except Exception as e:spider.logger.error(f"数据库插入失败: {e} - {item}")return item -
3.2.1 思路 (数据清洗
_to_float) (核心!):
在爬虫日志中,我遇到了could not convert string to float: ''错误。这是因为某些货币(如“文莱元”)的“现汇买入价”字段是空字符串''。float('')会导致程序崩溃。
解决方案:我创建了一个_to_float(self, value)辅助函数。它在尝试float(value)之前,会先检查value是否为None或'',如果是,则安全地返回0.0,从而完美解决了数据插入失败的问题。 -
3.2.2 相关代码块:
def _to_float(self, value):"""(修复点) 辅助函数:安全地将字符串转为浮点数"""if value is None or value == '':return 0.0 # 将空字符串或 None 存为 0.0try:return float(value)except ValueError:return 0.0 # 如果转换失败 (例如文本中包含非数字), 也存为 0.0
3.3、代码与输出结果
-
代码连接: https://gitee.com/wujianliang9/2025-data-collection/tree/master/第三次作业/3
-
1. 终端运行
scrapy crawl boc_forex截图:
![image]()
-
2.
forex.db数据库内容截图 (使用check_db.py查看):

3.4、心得体会
-
Xpath 的健壮性至关重要:
我最初的BOC_Main(大写M) Xpath 是在错误的(静态)页面上找到的,导致爬取失败。而BOC_main publish(小写m) Xpath 是针对动态加载的,也失败了。最终,通过分析同学的成功日志,我学到了最简单的response.xpath('//table')[1](取第二个表格)才是最有效的。这证明了在爬虫开发中,不要过度假设,而应基于实际下载的 HTML(或同学的成功经验)来定位。 -
string()vs/text():
在提取<td>内容时,我使用了string(./td[1])而不是td[1]/text()。这是一个关键技巧。string()函数会提取<td>下所有的文本节点,即使它们被<span>、<b>或其他标签包裹;而/text()只会提取直接子节点中的文本。在复杂的网页中,string()更加健壮。 -
Pipeline 是数据清洗的“最后一关”:
本次作业最关键的收获是数据清洗。爬虫(Spider)抓取的数据是“脏”的(例如包含空字符串'')。如果直接存入类型严格的数据库(如REAL),就会像我第一次运行时那样崩溃。通过在pipelines.py中添加_to_float辅助函数,在数据存入数据库之前将其清洗(''->0.0),我成功解决了这个ValueError。这让我深刻理解到 Spider 只管抓取,Pipeline 负责清洗 的 Scrapy 设计哲学。

