Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

Python异步编程:使用`asyncio`和`aiofiles`进行高效的文件批量写入

    • 1. 异步编程基础
      • 1.1 `asyncio`和`await`
      • 1.2 `aiofiles`
    • 2. 异步文件批量写入示例
      • 2.1 代码结构
      • 2.2 代码实现
      • 2.3 代码解释
        • 2.3.1 `BatchWriter`类
        • 2.3.2 `main`函数
    • 3. 其他示例代码
      • 3.1 简单的异步文件写入
      • 3.2 异步文件读取
      • 3.3 使用`asyncio.Lock`保护共享资源
      • 3.4 使用`asyncio.wait_for`设置超时
    • 4. 总结

在现代编程中,异步编程已经成为提高程序性能的重要手段之一。特别是在处理I/O密集型任务时,异步编程可以显著提高程序的效率。Python的asyncio库和第三方库aiofiles为我们提供了强大的工具,使得异步文件操作变得简单而高效。

本文将通过一个具体的示例,介绍如何使用asyncioaiofiles进行异步文件批量写入,并详细解释其中的关键技术点。

1. 异步编程基础

1.1 asyncioawait

asyncio是Python的标准库,用于编写异步代码。async关键字用于定义一个异步函数,而await关键字用于等待一个协程(coroutine)完成。

import asyncioasync def hello_world():print("Hello")await asyncio.sleep(1)print("World")asyncio.run(hello_world())

1.2 aiofiles

aiofiles是一个第三方库,提供了异步文件操作的功能。通过aiofiles.open可以异步打开文件,并通过await f.write进行异步写入。

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

2. 异步文件批量写入示例

2.1 代码结构

我们将实现一个BatchWriter类,用于批量写入数据到文件中。该类的主要功能包括:

  • 将数据添加到缓冲区。
  • 当缓冲区达到一定大小时,将缓冲区中的数据批量写入文件。
  • 使用asyncio.Lock保护共享资源,避免竞态条件。
  • 使用asyncio.wait_for设置写入操作的超时时间。

2.2 代码实现

import asyncio
import json
import time
from collections import deque
import aiofilesclass BatchWriter:def __init__(self, filename: str, batch_size: int = 100):self.filename = filenameself.batch_size = batch_sizeself.buffer = deque()self.lock = asyncio.Lock()async def add(self, data: dict):print(f"Adding data: {data}")async with self.lock:print("Lock acquired in add")self.buffer.append(data)print(f"Buffer size after adding: {len(self.buffer)}")should_flush = len(self.buffer) >= self.batch_sizeprint("Releasing lock in add")if should_flush:print("Buffer size exceeded batch_size, calling flush")await self.flush()async def flush(self):print("Starting flush")buffer_to_write = []async with self.lock:if not self.buffer:print("Buffer is empty, exiting flush")return# 将缓冲区内容复制到临时列表并清空缓冲区buffer_to_write = list(self.buffer)self.buffer.clear()print(f"Flushing buffer of size: {len(buffer_to_write)}")# 在锁外执行文件写入if buffer_to_write:try:print(f"Writing {len(buffer_to_write)} items to file")await asyncio.wait_for(self._write_to_file(buffer_to_write), timeout=10)print("Finished writing to file")except asyncio.TimeoutError:print("Timeout while writing to file")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))except Exception as e:print(f"Error writing to file: {e}")# 写入失败时,将数据放回缓冲区async with self.lock:self.buffer.extendleft(reversed(buffer_to_write))print("Flush complete")async def _write_to_file(self, buffer_to_write):async with aiofiles.open(self.filename, 'a') as f:for data in buffer_to_write:await f.write(json.dumps(data, ensure_ascii=False) + '\n')async def main():start_time = time.time()writer = BatchWriter(filename="output.jsonl", batch_size=5)async def generate_data():for i in range(10):data = {"id": i, "value": f"data_{i}"}print(f"Generating data: {data}")await writer.add(data)await asyncio.sleep(0.1)print("Starting data generation")await generate_data()print("Finished data generation")print("Calling final flush")await writer.flush()  # 最终刷新缓冲区end_time = time.time()print(f"Total time: {end_time - start_time} seconds")if __name__ == "__main__":asyncio.run(main())

2.3 代码解释

2.3.1 BatchWriter
  • __init__方法:初始化文件名、批量大小、缓冲区和锁。
  • add方法:将数据添加到缓冲区,并在缓冲区达到批量大小时调用flush方法。
  • flush方法:将缓冲区中的数据批量写入文件。如果写入失败(如超时),将数据重新放回缓冲区。
  • _write_to_file方法:异步写入数据到文件。
2.3.2 main函数
  • generate_data协程:生成数据并调用add方法将数据添加到缓冲区。
  • main函数:启动数据生成,并在数据生成完成后调用flush方法进行最终的缓冲区刷新。

3. 其他示例代码

3.1 简单的异步文件写入

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():await write_to_file('example.txt', 'Hello, World!')print("Data written to file")asyncio.run(main())

3.2 异步文件读取

import asyncio
import aiofilesasync def read_from_file(filename):async with aiofiles.open(filename, 'r') as f:content = await f.read()return contentasync def main():content = await read_from_file('example.txt')print(f"File content: {content}")asyncio.run(main())

3.3 使用asyncio.Lock保护共享资源

import asyncio
import aiofilesclass FileWriter:def __init__(self, filename):self.filename = filenameself.lock = asyncio.Lock()async def write(self, content):async with self.lock:async with aiofiles.open(self.filename, 'a') as f:await f.write(content + '\n')async def main():writer = FileWriter('example.txt')async def write_data(data):await writer.write(data)await asyncio.gather(write_data('Data 1'),write_data('Data 2'),write_data('Data 3'))print("All data written to file")asyncio.run(main())

3.4 使用asyncio.wait_for设置超时

import asyncio
import aiofilesasync def write_to_file(filename, content):async with aiofiles.open(filename, 'a') as f:await f.write(content + '\n')async def main():try:await asyncio.wait_for(write_to_file('example.txt', 'Hello, World!'), timeout=1)print("Data written to file")except asyncio.TimeoutError:print("Operation timed out")asyncio.run(main())

4. 总结

通过本文的介绍和示例代码,我们了解了如何使用asyncioaiofiles进行异步文件操作。异步编程可以显著提高I/O密集型任务的效率,特别是在处理大量并发任务时。使用asyncio.Lock可以保护共享资源,避免竞态条件。使用asyncio.wait_for可以设置操作的超时时间,防止无限期等待。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/57545.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

婚纱相册必须去摄影店吗?其实自己会拍照就能实现,性价比更高

一直以来,婚纱照都是新人们婚礼筹备中不可或缺的部分。然而,高昂的摄影店价格让不少新人望而却步。其实,只要掌握一些拍照技巧,自己在家就能制作出独一无二的婚纱相册,不仅性价比超高,还能留下更多珍贵的回…

Android 中的串口开发

一:背景 本文着重讲安卓下的串口。 由于开源的Android在各种智能设备上的使用越来越多,如车载系统等。在我们的认识中,Android OS的物理接口一般只有usb host接口和耳机接口,但其实安卓支持各种各样的工业接口,如HDM…

条码检测系统——基于MATLAB的一维条码识别

摘 要:条码技术是如今应用最广泛的识别和输入技术之一,由于其包含的信息量大,识别错误率低而在各个方面得到很大的重视。它发展迅速并被广泛应用于于工业、商业、图书出版、医疗卫生等各行各业。由我国目前发展现状来看,条码的正…

人工智能:重塑未来生活与工作的科技力量

方向一:介绍人工智能技术的发展历程和现状,指出它的应用领域和前景 一、人工智能技术的发展历程 人工智能(Artificial Intelligence, AI)作为一门学科,其起源可以追溯到20世纪50年代。最初,AI的研究主要集…

Mytatis-plus使用sl4j日志打印SQL

以下是关于使用 Spring Boot 起始器替换 slf4j-api 和 logback 依赖的详细步骤和注意事项&#xff0c;包括 MyBatis-Plus 的默认日志级别信息。 1、依赖项配置 在 pom.xml 中添加以下依赖项&#xff1a; <dependency><groupId>org.springframework.boot</gro…

字符串使用方法:

字符串: -- 拼接字符串 SELECT CONCAT(糯米,啊啊啊撒,删掉); -- 字符长度 SELECT LENGTH(asssssssggg); -- 转大写 SELECT UPPER(asdf); -- 转小写 SELECT LOWER(ASDFG); -- 去除左边空格 SELECT LTRIM( aaaasdrf ); -- 去除右边空格 SELECT RTRIM( aaaasdff ); -- 去除两端…

攻坚金融关键业务系统,OceanBase亮相2024金融科技大会

10月15-16日&#xff0c;第六届中新数字金融应用博览会与2024金融科技大会&#xff08;简称“金博会”&#xff09;在苏州工业园区联合举办。此次大会融合了国家级重要金融科技资源——“中国金融科技大会”&#xff0c;围绕“赋能金融高质量发展&#xff0c;金融科技创新前行”…

【C++指南】运算符重载详解

引言 C 提供了运算符重载这一特性&#xff0c;允许程序员为自定义类型&#xff08;如类和结构体&#xff09;定义运算符的行为。 通过运算符重载&#xff0c;可以使自定义类型对象像内置类型一样使用运算符&#xff0c;从而提高代码的可读性和易用性。 本文将详细介绍 C 中运算…

【状态机DP】力扣2786. 访问数组中的位置使分数最大

给你一个下标从 0 开始的整数数组 nums 和一个正整数 x 。 你 一开始 在数组的位置 0 处&#xff0c;你可以按照下述规则访问数组中的其他位置&#xff1a; 如果你当前在位置 i &#xff0c;那么你可以移动到满足 i < j 的 任意 位置 j 。 对于你访问的位置 i &#xff0c…

若依微服务15 - RuoYi-Vue3 实现前端独立运行

正文开始&#xff1a; RuoYi-Vue3 使用 Vue3 Element Plus Vite 技术栈。 GitHub 开源地址&#xff1a;https://github.com/yangzongzhuan/RuoYi-Vue3 本文介绍使用若依提供的在线后端接口&#xff0c;仅启动前端项目并进行界面开发&#xff0c;而无需启动后端服务。 一、克隆…

AI视听新体验!浙大阿里提出视频到音乐生成模型MuVi:可解决语义对齐和节奏同步问题

MuVi旨在解决视频到音乐生成(V2M)中的语义对齐和节奏同步问题。 MuVi通过专门设计的视觉适配器分析视频内容,以提取上下文 和时间相关的特征,这些特征用于生成与视频的情感、主题及其节奏和节拍相匹配的音乐。MuVi在音频质量和时间同步方面表现优于现有基线方法,并展示了其在风…

Clickhouse集群_Zookeeper配置的dataDir目录磁盘占有率接近100%时,该dataDir目录是否可以清理及如何清理的脚本

官方文档https://zookeeper.apache.org/doc/r3.1.2/zookeeperAdmin.html#OngoingDataDirectoryCleanup 监控报警发现clickhouse集群环境的数据库节点磁盘报警&#xff0c;检查下来发现/chdata/zookeeper/data/version-2/目录特别大&#xff0c;里面包含了log.*文件和snapshot.…

【前端】--- ES6上篇(带你深入了解ES6语法)

前言&#xff1a;ECMAScript是 JavaScript 的标准化版本&#xff0c;由 ECMA 国际组织制定。ECMAScript 定义了 JavaScript 的语法、类型、语句、关键字、保留字等。 ES6 是 ECMAScript 的第六个版本&#xff0c;于 2015 年发布&#xff0c;引入了许多重要的新特性&#xff0c;…

实现vuex源码,手写

实现vuex源码&#xff0c;手写 Vuex 是专门为 Vue.js 应用程序开发的状态管理模式 库&#xff0c;它采用集中式存储管理应用的所有组件的状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。 第一步&#xff1a;定义初始化Store类 创建文件夹store/vuex.js 1…

C++ 20 Concept

concept主要用来定义模板参数的约束&#xff0c;最明显的作用就是在模板参数不满足类型的约束时编译器不再给出几千行奇奇怪怪的错误。当然还有其它的作用&#xff0c;比如说concepts可以用来实现函数的重载、新的concepts可以基于已有的concepts定义从而进行扩展等等下面以实现…

k8s 部署 nexus3 详解

创建命名空间 nexus3-namespace.yaml apiVersion: v1 kind: Namespace metadata:name: nexus-ns创建pv&pvc nexus3-pv-pvc.yaml apiVersion: v1 kind: PersistentVolume metadata:name: nfs-pvnamespace: nexus-ns spec:capacity:storage: 3GiaccessModes:- ReadWriteM…

Redis的6.0以上为啥又支持多线程

Redis 在 6.0 版本之前一直采用单线程架构&#xff0c;这是因为 Redis 主要是内存操作&#xff0c;单线程模型足以应对大部分高性能场景。而单线程模型的优势在于避免了多线程带来的上下文切换和锁的开销&#xff0c;使得 Redis 保持极高的性能和简单性。 然而&#xff0c;随着…

C++:模板的特化与分离编译

之前我们在介绍模板的时候仅仅是简单的介绍了模板的用法&#xff0c;本篇文章我们来详细的介绍下模板中比较重要的几个点。 一&#xff0c;非类型模板参数 我们之前的c中&#xff0c;会将经常使用的而又确保在我们程序的运行过程中值不会改变的值进行#define&#xff1a; #d…

初入编程之路,启航代码海

#1024程序员节|征文# 前言 今天又是1024程序员节了&#xff0c;第一次听说这个节日是在我在23年刚刚上大一的时候听学长他们说的&#xff0c;如今已经是24年了&#xff0c;虽然只学习了一年的编程但我已经了解到了这条路上的不易。希望能够在这条路上面一路坚持下去&#xff0…

力扣_斐波那契数列

本题目本质和爬楼梯是一样的&#xff0c;主要运用的是递归来解题。 class Solution:my_dict {}def fib(self, n: int) -> int:if self.my_dict.get(n) is not None: # 先判断有没有计算过这个值return self.my_dict.get(n)tempResult 0if n > 2:tempResult self.fib…