1. os 和 sys(系统编程基础)
这两个模块是进行系统层面操作(如文件管理、路径处理、环境变量访问等)必不可少的工具。
os 模块
os 主要是用于与操作系统交互的,比如:
-  文件和目录操作 
-  获取系统信息 
-  运行系统命令 
-  管理环境变量 
可以理解为:让你的 Python 代码能够操作"文件系统"、"目录结构"、"系统指令"等资源。
路径操作:
import osprint(os.getcwd())         # 获取当前工作目录
os.chdir('./tmp')            # 改变当前工作目录
print(os.listdir('.'))      # 列出当前目录下的所有文件和文件夹 当前工作目录已经是 ./tmp
文件/文件夹操作:
os.mkdir('testdir')         # 创建文件夹
os.rmdir('testdir')         # 删除空文件夹
os.remove('file.txt')       # 删除文件
os.rename('old.txt', 'new.txt')  # 重命名文件或文件夹
路径拼接与判断:
path = os.path.join('folder', 'file.txt')  # 拼接路径(自动处理/或\问题)
print(os.path.exists(path))                # 检查路径是否存在
print(os.path.isfile(path))                # 是否是文件
print(os.path.isdir(path))                 # 是否是目录
执行系统命令:
# 获取环境变量
home = os.environ.get('HOME')
print(home)os.system('ls -l')           # 运行一个系统命令(同步阻塞)
(注:更推荐用 subprocess,因为更安全、灵活)
sys 模块
sys 主要是用于跟 Python 解释器交互,比如:
-  读取命令行参数 
-  退出程序 
-  获取 Python 环境信息 
-  修改模块搜索路径 
可以理解为:让你的程序能控制/了解运行时自身的状态。
命令行参数:
import sysprint(sys.argv)             # 获取命令行参数,列表,第一个是程序本身# 例如命令行运行:
# python test.py arg1 arg2
# sys.argv 结果就是:
# ['test.py', 'arg1', 'arg2']
退出程序:
sys.exit(0)                 # 正常退出
sys.exit(1)                 # 异常退出
Python 版本信息:
print(sys.version)          # 打印Python版本
print(sys.platform)         # 当前系统平台:'win32', 'linux', 'darwin'(Mac)
修改模块搜索路径:
import sys# 加了之后,Python在**import 模块**时,除了默认的系统路径,还会去你指定的新路径里找模块。
sys.path.append('/path/to/your/module')    # 动态添加模块搜索路径(比如项目中的一些自定义模块,需要临时导入)
实际综合例子
写一个 Python 脚本,它可以根据命令行传参,创建一个目录并进入:
import os
import sysdef create_and_enter_dir(dirname):if not os.path.exists(dirname):os.mkdir(dirname)print(f"Directory '{dirname}' created.")else:print(f"Directory '{dirname}' already exists.")os.chdir(dirname)print(f"Now in directory: {os.getcwd()}")if __name__ == "__main__":if len(sys.argv) != 2:print("Usage: python script.py <dirname>")sys.exit(1)create_and_enter_dir(sys.argv[1])
这样可以:
python script.py myfolder
自动创建并进入 myfolder。
2. datetime(时间处理)
用于处理日期和时间的模块,常用于日志、时间戳生成、时间间隔计算等。
获取当前时间:
import datetimenow = datetime.datetime.now()
print(now)  # 比如输出:2025-04-29 15:30:42.123456
如果只要日期
today = datetime.date.today()
print(today)  # 比如:2025-04-29
创建指定时间:
d = datetime.datetime(2025, 5, 1, 12, 30, 45)  # 年, 月, 日, 时, 分, 秒
print(d)  # 2025-05-01 12:30:45
如果只要日期对象
d = datetime.date(2025, 5, 1)
print(d)  # 2025-05-01
时间加减(使用 timedelta),比如1天后、2小时前、10分钟后:
timedelta 支持的参数有:days, seconds, microseconds, milliseconds, minutes, hours, weeks
from datetime import datetime, timedeltanow = datetime.now()print(now + timedelta(days=1))      # 一天后
print(now - timedelta(hours=2))     # 两小时前
print(now + timedelta(minutes=10))  # 十分钟后
时间格式化输出(变成字符串)
比如格式化成 "2025-04-29 15:30:42" 这种:
now = datetime.datetime.now()print(now.strftime("%Y-%m-%d %H:%M:%S"))
# 输出:2025-04-29 15:30:42
字符串转时间对象(解析时间)
如果拿到的是字符串,比如 "2025-04-29 15:30:42",想转成 datetime 对象:
(注意 strptime 和 strftime 名字很像,p 是 parse(解析))
s = "2025-04-29 15:30:42"
dt = datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
print(dt)
# 输出:2025-04-29 15:30:42
时间比较
可以直接比较两个 datetime 对象:
t1 = datetime.datetime(2025, 5, 1, 12, 0, 0)
t2 = datetime.datetime(2025, 5, 2, 12, 0, 0)print(t1 < t2)  # True
print(t2 - t1)  # 1 day, 0:00:00 (是一个 timedelta)
循环生成时间段(进阶常用)
比如想生成从 2025-04-29 00:00 开始,每隔1小时一个点:
start = datetime.datetime(2025, 4, 29, 0, 0, 0)
end = datetime.datetime(2025, 4, 29, 5, 0, 0)current = start
while current <= end:print(current.strftime("%Y-%m-%d %H:%M:%S"))current += datetime.timedelta(hours=1)
要处理时区,就要用到 tzinfo,不过一般推荐配合第三方库 pytz 或 zoneinfo 来做。
from datetime import datetime, timezone, timedelta# 设置时区:UTC+8
dt = datetime(2025, 4, 29, 12, 0, 0, tzinfo=timezone(timedelta(hours=8)))
print(dt)
3. re(正则表达式)
正则表达式(Regular Expression)是一种字符串匹配规则的描述语言。
它能非常灵活地搜索、提取、替换字符串中的特定内容,比如:
-  验证手机号 
-  提取网页上的邮箱 
-  替换文本中的敏感词 
-  分割复杂字符串 
-  检测密码复杂度 
Python 处理正则,靠的就是内置的 re 模块。
re模块常用函数
 
导入:
import re
常用函数有:
| 函数 | 作用 | 
|---|---|
| re.match(pattern, string) | 从字符串开头匹配 | 
| re.search(pattern, string) | 搜索整个字符串,找到第一个匹配 | 
| re.findall(pattern, string) | 找到所有匹配,返回列表 | 
| re.finditer(pattern, string) | 找到所有匹配,返回迭代器(内存更省) | 
| re.sub(pattern, repl, string) | 替换匹配到的内容 | 
| re.split(pattern, string) | 按正则分割字符串 | 
| re.compile(pattern) | 编译成正则对象,提高效率 | 
正则表达式语法基础
字面意义:
普通字符,自己代表自己,比如 abc 就匹配 abc。
元字符(特殊意义的字符):
| 字符 | 含义 | 
|---|---|
| . | 任意一个字符(除了换行) | 
| ^ | 开头匹配 | 
| $ | 结尾匹配 | 
| * | 重复0次或更多次 | 
| + | 重复1次或更多次 | 
| ? | 重复0次或1次 | 
| {n} | 恰好n次 | 
| {n,} | 至少n次 | 
| {n,m} | n到m次 | 
| [...] | 匹配字符集中的任意一个字符 | 
| ` | ` | 
| () | 分组(提取、捕获) | 
转义字符(\)
 
如果你想匹配特殊字符,比如   . ,必须用   \. 。
比如:
re.match(r"a\.b", "a.b")  # 匹配成功
注意:正则表达式字符串一般加 r"",表示原生字符串,不转义。
常用字符集
| 表达式 | 含义 | 
|---|---|
| \d | 数字,等价于 [0-9] | 
| \D | 非数字 | 
| \w | 字母数字下划线,等价于 [a-zA-Z0-9_] | 
| \W | 非字母数字下划线 | 
| \s | 空白字符(空格、tab、换行) | 
| \S | 非空白字符 | 
贪婪 vs 非贪婪
-  正则默认是贪婪匹配(能多就多) 
-  在量词后加 ?,变成非贪婪匹配(能少就少)
例子:
import retext = "<p>Title</p><p>Content</p>"print(re.findall(r"<p>.*</p>", text))    # 贪婪,匹配到整个 "<p>Title</p><p>Content</p>"
print(re.findall(r"<p>.*?</p>", text))   # 非贪婪,分别匹配 "<p>Title</p>" 和 "<p>Content</p>"
典型应用案例
匹配手机号
import rephone = "我的号码是13812345678,你的呢?"pattern = r"1[3-9]\d{9}"result = re.search(pattern, phone)
print(result.group())  # 输出:13812345678
提取网页中的所有邮箱地址
text = "联系:user1@gmail.com, admin@company.cn, feedback@yahoo.com"emails = re.findall(r"[a-zA-Z0-9._%-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", text)
print(emails)
# 输出:['user1@gmail.com', 'admin@company.cn', 'feedback@yahoo.com']
替换敏感词
text = "这是一段包含badword1和badword2的文字"pattern = r"badword1|badword2"
clean_text = re.sub(pattern, "***", text)
print(clean_text)
# 输出:这是一段包含***和***的文字
分割复杂字符串
比如有一段文字,逗号、分号、空格都可能作为分隔:
text = "apple,banana;orange  pear"words = re.split(r"[,;\s]+", text)
print(words)
# 输出:['apple', 'banana', 'orange', 'pear']
分组提取
-  在正则表达式里,用小括号 ()把一部分内容包起来。
-  这样就可以单独提取出括号里的子匹配。 
-  每个括号对应一个组(group),编号从 1 开始。 
.group(0) 是整个匹配到的字符串;.group(1), .group(2) 分别是第1个、第2个括号匹配到的内容。
import retext = '姓名:张三,电话:13812345678'pattern = r'姓名:(.*),电话:(1[3-9]\d{9})'result = re.search(pattern, text)print(result.group(0))  # 整个匹配:姓名:张三,电话:13812345678
print(result.group(1))  # 第1组:(.*) -> 张三
print(result.group(2))  # 第2组:(1[3-9]\d{9}) -> 13812345678
compile优化
如果一个正则要重复用多次,可以预编译成对象:
好处:效率更高,尤其是循环里用的时候。
pattern = re.compile(r"\d+")
print(pattern.findall("123 abc 456"))
总结:re模块让你可以用一套灵活的规则(正则表达式),在字符串中快速定位、提取、替换内容,非常强大。
4. json(数据序列化)
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,特点是:
-  结构简单(键值对、数组) 
-  可读性好(人和机器都能轻松理解) 
-  跨语言通用(不只是 JS,Python/Java/Go/后端接口全用) 
Python对象和JSON格式互转,非常适合与前后端接口对接、配置文件读写。
样例 JSON:
{"name": "小明","age": 24,"skills": ["Python", "Java", "Hacking"],"has_job": false
}
常用于:
-  后端接口返回(API) 
-  配置文件(如 .json结尾)
-  本地保存数据(缓存) 
-  网络通信 
核心函数用法
Python 提供了内置的 json 模块,主要做两件事:
| 操作 | 函数 | 作用 | 
|---|---|---|
| Python 对象 → JSON 字符串 | json.dumps() | 序列化(dump string) | 
| JSON 字符串 → Python 对象 | json.loads() | 反序列化(load string) | 
| Python 对象 → 写入 JSON 文件 | json.dump() | 序列化并写入文件 | 
| JSON 文件 → Python 对象 | json.load() | 反序列化并读取文件 | 
json.dumps()
 
把 Python 对象转换成 JSON 字符串。
import jsondata = {"name": "小明", "age": 24, "skills": ["Python", "Hacking"]}json_str = json.dumps(data)
print(json_str)
# 输出:{"name": "\u5c0f\u660e", "age": 24, "skills": ["Python", "Hacking"]}
默认中文会转成 Unicode 编码!
 想让中文正常显示,加参数:
json_str = json.dumps(data, ensure_ascii=False)
print(json_str)
# 输出:{"name": "小明", "age": 24, "skills": ["Python", "Hacking"]}
美化输出(缩进):
json_str = json.dumps(data, ensure_ascii=False, indent=2)
print(json_str)
输出:
{"name": "小明","age": 24,"skills": ["Python","Hacking"]
}
json.loads()
 
把 JSON 字符串转换成 Python 对象。
json_str = '{"name": "小明", "age": 24, "skills": ["Python", "Hacking"]}'data = json.loads(json_str)
print(data)
# 输出:{'name': '小明', 'age': 24, 'skills': ['Python', 'Hacking']}
json.dump()
把 Python 对象写入到文件中(保存成 .json 文件)。
data = {"name": "小明", "age": 24}with open('data.json', 'w', encoding='utf-8') as f:json.dump(data, f, ensure_ascii=False, indent=2)
写文件时注意 encoding='utf-8',否则中文可能乱码。
json.load()
 
从文件中读取 JSON 数据,变成 Python 对象。
with open('data.json', 'r', encoding='utf-8') as f:data = json.load(f)
print(data)
Python类型和JSON类型的对应关系
| Python类型 | JSON类型 | 
|---|---|
| dict | 对象(Object) | 
| list、tuple | 数组(Array) | 
| str | 字符串(String) | 
| int、float | 数字(Number) | 
| True、False | 布尔值(true/false) | 
| None | 空(null) | 
注意:Python 的 None 会变成 JSON 的 null。
常见应用场景示例
处理 API 接口返回的 JSON
import requests
import jsonresp = requests.get('https://api.example.com/data')
data = json.loads(resp.text)
print(data)
保存爬虫数据
results = [{"title": "新闻1"}, {"title": "新闻2"}]with open('results.json', 'w', encoding='utf-8') as f:json.dump(results, f, ensure_ascii=False, indent=2)
配置文件读取
with open('config.json', 'r', encoding='utf-8') as f:config = json.load(f)print(config['api_key'])
总结:json模块就是把Python数据和JSON字符串互相转换的桥梁,非常适合用来做接口交互、数据保存、配置管理。
5. collections(高级数据结构)
collections 是 Python 标准库里的一个模块。
 专门提供比内置数据结构(如 list、dict)更强大、更高效的“容器类”工具。
比如:
-  更智能的字典( defaultdict、OrderedDict)
-  可统计的计数器( Counter)
-  支持队列操作的双端队列( deque)
-  可以给元组加字段名( namedtuple)
-  专业的堆栈/链表( ChainMap)
常用的有这几个:
| 名称 | 简单理解 | 作用 | 
|---|---|---|
| namedtuple | 有名字的元组 | 类似轻量版对象(只有属性没有方法) | 
| deque | 双端队列 | 高效插入/删除元素(两边都快) | 
| Counter | 计数器 | 统计元素出现的次数 | 
| OrderedDict | 有序字典(老版本用) | 按插入顺序记住键值对(Python 3.7+内置dict已有序) | 
| defaultdict | 带默认值的字典 | 取不存在的key不会报错,返回默认值 | 
| ChainMap | 多字典组合 | 多个字典逻辑上合并查询 | 
namedtuple(有名字的元组)
 
像 tuple,但可以用名字访问元素,更直观。
用法场景:临时定义小对象,不想写复杂 class
from collections import namedtuple# 定义一个点
Point = namedtuple('Point', ['x', 'y'])p = Point(1, 2)
print(p.x, p.y)  # 1 2
print(p)         # Point(x=1, y=2)
特点:
-  占用内存小 
-  代码简洁 
-  支持索引访问,也支持属性名访问 
deque(双端队列)
 
比 list 更高效,两边都能快速添加/删除元素。
用法场景:需要频繁在头部/尾部插入或删除,比如 BFS、滑动窗口。
from collections import dequeq = deque([1, 2, 3])q.append(4)       # 末尾加
q.appendleft(0)   # 开头加
print(q)          # deque([0, 1, 2, 3, 4])q.pop()           # 删除末尾
q.popleft()       # 删除开头
print(q)          # deque([1, 2, 3])
特性:
-  头尾操作都是 O(1) 复杂度 
-  比 list.insert(0, x) 高效得多 
Counter(计数器)
 
用来统计元素出现的次数。
用法场景:数据统计、文本词频分析。
from collections import Counterc = Counter('abracadabra')
print(c)
# Counter({'a': 5, 'b': 2, 'r': 2, 'c': 1, 'd': 1})print(c.most_common(2))  # 出现最多的2个元素
# [('a', 5), ('b', 2)]
特性:
-  可以直接加减集合 
-  支持最常见元素提取 
OrderedDict(有序字典)
 
普通 dict 在 Python 3.7+ 默认已经有序,但在老版本(3.6以前)中,OrderedDict很重要。
用法场景:需要维护元素插入顺序。
from collections import OrderedDictod = OrderedDict()
od['a'] = 1
od['b'] = 2
od['c'] = 3print(od)  # OrderedDict([('a', 1), ('b', 2), ('c', 3)])
特性:
-  遍历时,顺序和插入顺序一致。 
defaultdict(带默认值的字典)
 
普通 dict 访问不存在的 key 会报错。defaultdict 自动给一个默认值。
用法场景:构建复杂字典(比如分组统计、字典嵌套)
from collections import defaultdictd = defaultdict(int)  # 默认值是 int(), 即0
d['a'] += 1
print(d['a'])  # 1
print(d['b'])  # 0 (不会报错)# 嵌套使用
tree = defaultdict(lambda: defaultdict(list))
tree['python']['versions'].append(3.9)
print(tree)
特性:
-  自动初始化新key 
-  很适合做分组/聚合操作 
ChainMap(字典链)
 
多个字典组合在一起,查询时像一个整体。
用法场景:多层配置覆盖,比如环境变量优先级。
from collections import ChainMapdefaults = {'theme': 'Default', 'language': 'English'}
overrides = {'theme': 'Dark'}settings = ChainMap(overrides, defaults)
print(settings['theme'])    # Dark(优先查overrides)
print(settings['language']) # English(fallback到defaults)
特性:
-  查询时,优先从前面的字典查 
-  不是真的合并,只是逻辑组合,非常高效 
常见使用场景总结
| 场景 | 用什么 | 
|---|---|
| 快速建小对象(无方法的) | namedtuple | 
| 队列、栈、滑动窗口 | deque | 
| 统计元素出现次数 | Counter | 
| 需要顺序的字典 | OrderedDict | 
| 字典默认值处理 | defaultdict | 
| 多层配置管理 | ChainMap | 
6. logging(日志模块)
logging是 Python 的标准日志模块,主要功能是:
把程序运行时的关键信息输出到:
-  控制台(屏幕) 
-  文件(保存起来) 
-  网络、邮件、数据库等(更高级) 
让你:
-  不再乱用 print
-  统一管理日志 
-  区分不同重要程度的日志(比如正常信息 vs 错误信息) 
-  定制输出格式、保存位置、保存数量 
logging 是专业的 "程序黑匣子",记录程序的一举一动,出问题能迅速回溯!
logging内部有四大核心对象:
| 名称 | 作用 | 
|---|---|
| Logger | 产生日志的对象(你用的入口) | 
| Handler | 决定日志去哪儿(控制台?文件?邮箱?) | 
| Formatter | 决定日志长什么样(格式) | 
| Filter | 决定日志过不过滤(可选,不常用) | 
Logger(负责发日志)↓
Handler(负责把日志发到地方)↓
Formatter(控制日志格式)
每个 Logger 可以有多个 Handler,
 每个 Handler 都可以指定自己的 Formatter。
比如同时打印到屏幕 + 保存到文件,但格式可以不一样!
logging常用功能
你常用的基本是这几种功能:
-  logging.debug()—— 调试信息
-  logging.info()—— 普通运行信息
-  logging.warning()—— 警告(小问题)
-  logging.error()—— 错误(程序还能跑)
-  logging.critical()—— 致命错误(要崩)
还能做:
-  设置日志等级(控制打印多少) 
-  保存到日志文件 
-  定时轮换日志(比如每天生成一个新日志) 
-  按大小轮换日志(比如100MB一个文件) 
-  设置日志格式(时间、模块、行号等信息) 
logging的五个等级
| 等级 | 函数 | 用途 | 默认级别 | 
|---|---|---|---|
| DEBUG | logging.debug | 最详细的调试信息 | |
| INFO | logging.info | 普通的信息,比如程序开始了、结束了 | |
| WARNING | logging.warning | 警告,程序还能继续运行 | ✅ | 
| ERROR | logging.error | 错误,影响功能 | |
| CRITICAL | logging.critical | 严重错误,程序可能崩溃 | 
注意!
 默认的日志级别是 WARNING,所以低于 WARNING(比如 DEBUG、INFO)不会显示,除非你自己改级别!
常见配置方式(两种)
基本配置
import logginglogging.basicConfig(level=logging.INFO,format="%(asctime)s [%(levelname)s] %(message)s",
)logging.info("程序启动了")
logging.warning("小心,有点不对劲")
logging.error("程序出错了!")
输出示例:
2025-04-29 20:35:00 [INFO] 程序启动了
2025-04-29 20:35:01 [WARNING] 小心,有点不对劲
2025-04-29 20:35:02 [ERROR] 程序出错了!
basicConfig 可以快速设置:
-  level:日志等级
-  format:日志格式
-  filename:写到哪个文件
-  filemode:写入模式('w'覆盖,'a'追加)
更复杂的手动配置
import logginglogger = logging.getLogger('my_logger')  # 创建一个Logger
logger.setLevel(logging.DEBUG)           # 设置日志级别# 创建Handler
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('my_log.log', encoding='utf-8')# 创建Formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')# 绑定Formatter到Handler
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)# 绑定Handler到Logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)# 开始使用
logger.info('这是一条info日志')
logger.error('这是一条error日志')
这样可以:
-  控制台有一份 
-  文件保存一份 
-  格式一致 
实战例子
| 目标 | 写法示例 | 
|---|---|
| 设置日志保存到文件 | filename='xxx.log' | 
| 保存中文日志 | encoding='utf-8' | 
| 每天生成一个新日志 | 用 TimedRotatingFileHandler | 
| 文件太大自动切分 | 用 RotatingFileHandler | 
| 改变日志输出样式 | 自定义 format | 
日志按大小切割:
from logging.handlers import RotatingFileHandler
import logginghandler = RotatingFileHandler('big.log', maxBytes=1024*1024*5, backupCount=3)
logging.basicConfig(handlers=[handler], level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s')logging.info("这是一个测试日志")
-  单个日志最大5MB 
-  最多保留3个历史文件(big.log.1, big.log.2, big.log.3) 
总结:logging = 专业版print,程序运行过程的黑匣子,调试、追错、归档必备神器!
7. subprocess(执行系统命令)
更安全、更强大、更灵活的执行外部命令方法。
简单来说:subprocess = 帮你在Python里开一个子进程(子命令行),执行命令,拿到输出。
流程:
Python程序↓
subprocess模块↓
操作系统(开子进程,执行系统命令)↓
拿到返回值、输出结果
subprocess.run() 基础用法
 
基本格式:
import subprocesssubprocess.run(["命令", "参数1", "参数2", ...])
例子:
import subprocess# 在Windows上执行 dir
subprocess.run(["dir"], shell=True)# 在Linux上执行 ls -l
subprocess.run(["ls", "-l"])
注意:
-  shell=True:让命令像直接在终端里那样执行。
-  默认 shell=False,更安全。
捕获输出(stdout / stderr)
想拿到命令输出?用 capture_output=True:
result = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print(result.stdout)  # 正常输出
print(result.stderr)  # 错误输出参数解释:
-  capture_output=True:自动收集输出。
-  text=True:让输出是字符串(默认是字节bytes)。
传入参数(列表 vs 字符串)
| 写法 | 说明 | 
|---|---|
| 列表形式 | 推荐!安全!每个参数独立 | 
| 字符串形式 | 需要加 shell=True,容易命令注入(危险) | 
列表写法(推荐):
subprocess.run(["ping", "www.baidu.com"])
字符串写法(小心):
subprocess.run("ping www.baidu.com", shell=True)
如果要执行复杂管道命令(比如grep、|管道),必须用 shell=True。
更底层的用法(Popen)
如果想实时读输出、或者同时操作输入输出,用 Popen。
简单例子:
import subprocessp = subprocess.Popen(["ping", "www.baidu.com"], stdout=subprocess.PIPE, text=True)for line in p.stdout:print(line.strip())
这样可以一行行读输出,非常适合:
-  实时监控命令输出 
-  做管道(pipe)连接多个命令 
常见错误和注意事项
| 错误 | 解释 | 
|---|---|
| FileNotFoundError | 命令打错了,找不到 | 
| PermissionError | 没权限执行(尤其是Linux) | 
| UnicodeDecodeError | 输出是二进制,不是文本,加 text=True | 
| 命令执行失败 | 查看 result.returncode!= 0,说明出错了 | 
好习惯:
result = subprocess.run(["ls", "-l"], capture_output=True, text=True)if result.returncode != 0:print("命令执行失败!")print(result.stderr)
else:print(result.stdout)
实战例子(批量执行 / 调用shell脚本)
批量执行一堆命令:
commands = [["echo", "hello"],["ls", "-l"],["pwd"],
]for cmd in commands:subprocess.run(cmd)
调用本地shell脚本:
比如有一个 test.sh 文件:
#!/bin/bash
echo "Hello from script"
Python执行:
subprocess.run(["bash", "test.sh"])
或者:
subprocess.run("./test.sh", shell=True)
注意脚本要有执行权限(chmod +x test.sh)。
总结
subprocess.run() - 快速执行,拿返回值
subprocess.Popen()- 高级控制,实时操作,读写管道
shell=True - 字符串形式(有风险)
capture_output=True- 捕获输出
text=True- 输出是字符串
8. 并发与并行(concurrent.futures、threading、multiprocessing)
| 概念 | 解释 | 打比方 | 
|---|---|---|
| 并发(Concurrency) | 同一时间段内处理多个任务(假装同时做,其实轮着做) | 一个人炒两个菜,轮流翻锅 | 
| 并行(Parallelism) | 真正同时处理多个任务(多核心同时进行) | 两个人同时炒两个菜 | 
并发是逻辑上的同时(比如单核CPU上下文切换)
 并行是物理上的同时(比如多核CPU各自干活)
Python中的并发并行手段
| 模块 | 适用场景 | 特点 | 
|---|---|---|
| threading | I/O密集型任务(网络、磁盘读写) | 多线程,轻量,但受GIL限制 | 
| multiprocessing | CPU密集型任务(复杂计算) | 多进程,真正并行,开销大 | 
| concurrent.futures | 更高级的线程池/进程池 | 简单易用,高层封装 | 
GIL(全局解释器锁)让多线程不能真正并行执行Python代码,但遇到I/O会释放锁。
 多进程可以真正利用多核CPU,每个进程都有独立的Python解释器。
threading(多线程模块)
主要用于 I/O密集型任务:比如抓取网页、文件读写、网络请求。
基本用法
import threadingdef worker():print("我是子线程")# 创建线程
t = threading.Thread(target=worker)# 启动线程
t.start()# 等待线程结束
t.join()
-  start():启动线程(真正开始执行)
-  join():主线程等待子线程结束
多线程执行示例
for i in range(5):threading.Thread(target=worker).start()
多线程是共享内存的,可以共享数据,但要小心数据竞争!
注意事项
-  GIL导致多线程在Python解释器层面不是并行的(CPU密集型反而慢) 
-  线程适合用来等待I/O,而不是计算密集的工作 
multiprocessing(多进程模块)
主要用于 CPU密集型任务:比如数据处理、图像处理、大型计算。
4.1 基本用法
import multiprocessingdef worker():print("我是子进程")if __name__ == '__main__':p = multiprocessing.Process(target=worker)p.start()p.join()
-  start():启动进程
-  join():等待进程结束
多进程执行示例
if __name__ == '__main__':for i in range(5):multiprocessing.Process(target=worker).start()
每个子进程是完全独立的,不共享内存!
注意事项
-  多进程开销大(复制一份整个程序到子进程) 
-  需要用 multiprocessing.Queue等方式进行进程间通信
concurrent.futures(高级封装库)
Python3之后出的一个超好用模块,
 封装了线程池 (ThreadPoolExecutor) 和 进程池 (ProcessPoolExecutor)。让并发变得超级简单!
基本用法(线程池)
from concurrent.futures import ThreadPoolExecutordef worker(x):return x * xwith ThreadPoolExecutor(max_workers=5) as executor:results = executor.map(worker, [1, 2, 3, 4, 5])for r in results:print(r)
-  max_workers:同时跑几个线程
-  map:批量提交任务
-  自动管理 start、join,不用手动操心!
基本用法(进程池)
from concurrent.futures import ProcessPoolExecutordef worker(x):return x * xwith ProcessPoolExecutor(max_workers=5) as executor:results = executor.map(worker, [1, 2, 3, 4, 5])for r in results:print(r)
一行切换线程池/进程池,爽爆!
三者的区别与选择
| 特性 | threading | multiprocessing | concurrent.futures | 
|---|---|---|---|
| 本质 | 线程 | 进程 | 封装线程池/进程池 | 
| 适合 | I/O密集 | CPU密集 | 两者皆可 | 
| 是否并行 | 否(受GIL影响) | 是(独立进程) | 看用的Executor | 
| 编程复杂度 | 中 | 高(通信麻烦) | 低(傻瓜式) | 
| 内存开销 | 小 | 大 | 中等 | 
简单记忆:
-  网络爬虫、文件下载 → threading或ThreadPoolExecutor
-  复杂计算、图像处理 → multiprocessing或ProcessPoolExecutor
-  不想操心细节 → concurrent.futures
实战示例(小项目案例)
用线程池并发抓取网页
import requests
from concurrent.futures import ThreadPoolExecutorurls = ['http://example.com','http://example.org','http://example.net',
]def fetch(url):r = requests.get(url)print(f"{url}: {r.status_code}")with ThreadPoolExecutor(max_workers=3) as executor:executor.map(fetch, urls)
多线程抓网页,飞快!
用进程池并行计算
from concurrent.futures import ProcessPoolExecutordef calc(n):total = 0for i in range(10**6):total += i * nreturn totalif __name__ == '__main__':with ProcessPoolExecutor() as executor:results = executor.map(calc, range(5))print(list(results))
多核CPU全开,处理大计算量!
总结
如果做的是:
-  抓网页、爬虫、IO读写、等待数据库 → 用线程( threading/ThreadPoolExecutor)
-  大量数据运算、模型训练、复杂数学计算 → 用进程( multiprocessing/ProcessPoolExecutor)
-  不想自己管理启动关闭,想一行搞定 → 用 concurrent.futures
注意:
-  threading快但是受GIL限制,适合I/O密集
-  multiprocessing真正并行,但开销更大
-  concurrent.futures非常适合90%的并发场景
IO密集用线程,CPU密集用进程,怕麻烦就用concurrent.futures。