一、技术选型与功能设计
使用minio服务,进行文件的中转与存储。用户提交文件到doc-llm-controller,控制面将文件转存到minio中,关联此次任务id。然后doc-llm-worker轮询redis发现有需要执行的任务,拿到id后,根据id从minio拿取文件,然后将文件解析成结构化信息,再提交到大模型,进行文档测试。
那么此部分功能流程图大致如下:

相对应的,在整体业务流程中补充文件存取步骤,最后如下:

二、minio配置与使用
minio安装部署:我们使用docker镜像来部署minio服务,暴露9000端口提供给我们自己服务使用:
docker run -d --name doc-llm-minio -p 9000:9000 -p 9001:9001 --restart=always -e MINIO_ROOT_USER=root -e MINIO_ROOT_PASSWORD=password -v /home/workspace/minio:/data minio/minio:latest server /data --console-address ":9001"
通过python来调用minio服务:
# minio下载 pip install minio
from minio import Minio from minio.error import S3Error import io# 配置minio client = Minio("localhost:9000",access_key="root",secret_key="xiao1234",secure=False, )bucket_name = "doc-llm-bucket" try:if not client.bucket_exists(bucket_name):client.make_bucket(bucket_name)else:print(f"Bucket '{bucket_name}' already exists.") except S3Error as e:print(f"Error occurred: {e}")# 通过python上传文件到minio def upload_file(local_file_path, object_name):try:client.fput_object(bucket_name, object_name, local_file_path)print(f"'{local_file_path}' is successfully uploaded as '{object_name}' to bucket '{bucket_name}'.")except S3Error as e:print(f"Error occurred while uploading: {e}")# 文件下载 def download_file(object_name, local_file_path):try:client.fget_object(bucket_name, object_name, local_file_path)print(f"'{object_name}' is successfully downloaded to '{local_file_path}'.")except S3Error as e:print(f"Error occurred while downloading: {e}")# 列出所有文件 def list_files():try:objects = client.list_objects(bucket_name)print(f"Objects in bucket '{bucket_name}':")for obj in objects:print(f"- {obj.object_name} (size: {obj.size} bytes)")except S3Error as e:print(f"Error occurred while listing objects: {e}")# 删除指定文件 def delete_file(object_name):try:client.remove_object(bucket_name, object_name)print(f"'{object_name}' is successfully deleted from bucket '{bucket_name}'.")except S3Error as e:print(f"Error occurred while deleting: {e}")
测试效果如下:

三、控制面doc-llm-controller服务适配
总体思路:接口层接收到带文件的创建任务请求,先新增一条任务数据到mysql,其中doc字段为__PENDING_FILE__。然后拿到任务id后,调用推送文件服务将文件关联任务id一起推送到minio,结束后更新任务信息doc字段为:f"minio://{MINIO_BUCKET}/{object_name}"。
至此控制面业务结束。
services层:新增file_service.py,提供minio服务的调用
# 代码样例 def _ensure_bucket():"""确保 bucket 存在"""if not _minio_client.bucket_exists(MINIO_BUCKET):_minio_client.make_bucket(MINIO_BUCKET)def save_task_file(task_id: int, file_obj: FileStorage) -> str:"""把用户上传的文件存到 MinIO,文件名格式:{task_id}_{orig_filename}返回存入数据库的 doc 字段值,例如:minio://doc-llm-bucket/123_xxx.docx...doc_path = f"minio://{MINIO_BUCKET}/{object_name}"return doc_path
给doc_check_service, task_service 增加更新doc方法
# doc_check_service def update_task_doc(task_id: int, doc: str) -> None:"""更新任务的 doc 字段"""task = task_service.get_task_by_id(task_id)if not task:raise TaskNotFoundError(f"任务 {task_id} 不存在")task_service.update_task_doc(task_id, doc)# task_service def update_task_doc(task_id: int, doc: str) -> None:"""更新任务的 doc 字段"""with get_session() as session:task = session.scalar(select(TaskDocLLM).where(TaskDocLLM.task_id == task_id))if not task:raise ValueError(f"任务 {task_id} 不存在")task.doc = doc
更新接口函数,兼容传文本信息、文本文件两种方式:
@bp.route("/tasks/", methods=["POST"]) def create_doc_task():# 判断是不是文件上传if request.content_type and "multipart/form-data" in request.content_type:return _create_task_with_file()# 默认走老的 JSON 逻辑return _create_task_with_json()def _create_task_with_json():...task_id = doc_check_service.submit_doc_task(task_name, doc, product, feature)...def _create_task_with_file():....try:# 1. 先写一条任务,doc 用占位符,保证非空placeholder_doc = "__PENDING_FILE__"task_id = doc_check_service.submit_doc_task(task_name=task_name,doc=placeholder_doc,product=product,feature=feature,)doc_path = file_service.save_task_file(task_id, file_obj)# 3. 回写 doc 字段 doc_check_service.update_task_doc(task_id, doc_path)...
用postman测试下接口效果,大致是OK的:
接口请求:

flask这边日志、数据库、minio表现都OK,数据一致性有保障:
