locust高级特性详解

news/2025/11/12 15:51:56/文章来源:https://www.cnblogs.com/yjh1995/p/19214333

事件系统深度应用

全局事件监听

Locust的事件系统就像是一个"消息广播站",可以在特定时机执行自定义逻辑:

  • @events.test_start.add_listener
  • @events.test_stop.add_listener
  • @events.user_error.add_listener
from locust import HttpUser, task, events
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)@events.test_start.add_listener
def on_test_start(environment, **kwargs):"""测试开始时执行"""logger.info("🚀 压测开始!准备发射...")# 可以在这里做一些初始化工作# 比如清理测试数据、发送通知等@events.test_stop.add_listener  
def on_test_stop(environment, **kwargs):"""测试结束时执行"""logger.info("🏁 压测结束!开始分析结果...")# 可以在这里做清理工作# 比如生成报告、发送邮件等@events.user_error.add_listener
def on_user_error(user_instance, exception, tb, **kwargs):"""用户出错时执行"""logger.error(f"❌ 用户出错: {exception}")# 可以记录详细错误信息,便于调试class AdvancedUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"@taskdef test_api(self):self.client.get("/get")

自定义统计指标

有时候默认的统计指标不够用,我们可以自定义:

  • @events.request_success.add_listener
  • @events.request_failure.add_listener
from locust import HttpUser, task, events
from locust.stats import stats_history
import time# 自定义统计数据存储
custom_stats = {"business_success_count": 0,"business_error_count": 0,"total_processing_time": 0
}@events.request_success.add_listener
def on_request_success(request_type, name, response_time, response_length, **kwargs):"""请求成功时的自定义统计"""if "business_api" in name:custom_stats["business_success_count"] += 1custom_stats["total_processing_time"] += response_time@events.request_failure.add_listener  
def on_request_failure(request_type, name, response_time, response_length, exception, **kwargs):"""请求失败时的自定义统计"""if "business_api" in name:custom_stats["business_error_count"] += 1class BusinessUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"@taskdef business_api_call(self):"""业务API调用"""start_time = time.time()with self.client.get("/json", name="business_api", catch_response=True) as response:# 业务逻辑验证if response.status_code == 200:data = response.json()if "slideshow" in data:response.success()else:response.failure("业务数据格式错误")else:response.failure(f"HTTP错误: {response.status_code}")

集合点功能实现

集合点是性能测试中的重要概念,让所有用户在某个时刻同时执行操作

  • @events.spawning_complete.add_listener
from locust import HttpUser, task, events
from gevent._semaphore import BoundedSemaphore
import time# 创建集合点信号量
rendezvous_point = BoundedSemaphore()
rendezvous_point.acquire()  # 初始状态为锁定@events.spawning_complete.add_listener
def on_spawning_complete(**kwargs):"""所有用户启动完成后释放集合点"""print("🚦 所有用户已就位,释放集合点!")rendezvous_point.release()class RendezvousUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"@taskdef synchronized_task(self):"""需要同步执行的任务"""print(f"用户 {self.user_id} 到达集合点,等待其他用户...")# 等待集合点释放rendezvous_point.acquire()rendezvous_point.release()  # 立即释放,让其他用户也能通过print(f"用户 {self.user_id} 开始同步执行任务")# 同时执行的关键操作start_time = time.time()response = self.client.get("/get")end_time = time.time()print(f"用户 {self.user_id} 完成任务,耗时: {end_time - start_time:.2f}秒")def on_start(self):"""为每个用户分配唯一ID"""import uuidself.user_id = str(uuid.uuid4())[:8]

自定义负债模型 LoadTestShape

阶梯式负载

from locust import LoadTestShapeclass StepLoadShape(LoadTestShape):"""阶梯式负载模型每60秒增加10个用户,最多100个用户"""step_time = 60  # 每步持续时间step_load = 10  # 每步增加的用户数spawn_rate = 2  # 用户启动速率time_limit = 600  # 总测试时间def tick(self):run_time = self.get_run_time()if run_time > self.time_limit:return None  # 停止测试current_step = run_time // self.step_timeuser_count = (current_step + 1) * self.step_load# 限制最大用户数user_count = min(user_count, 100)return (user_count, self.spawn_rate)class MyUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"@taskdef test_endpoint(self):self.client.get("/get")

波浪式负载

import mathclass WaveLoadShape(LoadTestShape):"""波浪式负载模型用户数量呈正弦波变化"""min_users = 10max_users = 100wave_period = 300  # 波浪周期(秒)def tick(self):run_time = self.get_run_time()if run_time > 600:  # 10分钟后停止return None# 计算当前用户数(正弦波)wave_progress = (run_time % self.wave_period) / self.wave_perioduser_count = self.min_users + (self.max_users - self.min_users) * \(math.sin(2 * math.pi * wave_progress) + 1) / 2return (int(user_count), 5)

HTTP客户端自定义

自定义认证AuthBase

from requests.auth import AuthBase
import hmac
import hashlib
import timeclass CustomAuth(AuthBase):"""自定义认证类"""def __init__(self, access_key, secret_key):self.access_key = access_keyself.secret_key = secret_keydef __call__(self, request):# 添加时间戳timestamp = str(int(time.time()))# 生成签名string_to_sign = f"{request.method}\n{request.url}\n{timestamp}"signature = hmac.new(self.secret_key.encode(),string_to_sign.encode(),hashlib.sha256).hexdigest()# 添加认证头request.headers['X-Access-Key'] = self.access_keyrequest.headers['X-Timestamp'] = timestamprequest.headers['X-Signature'] = signaturereturn requestclass AuthenticatedUser(HttpUser):wait_time = between(1, 2)host = "https://api.example.com"def on_start(self):"""设置自定义认证"""self.client.auth = CustomAuth("your_access_key", "your_secret_key")@taskdef protected_api(self):"""调用需要认证的API"""self.client.get("/protected/resource")

连接池优化

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retryclass OptimizedUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"def on_start(self):"""优化HTTP连接"""# 配置重试策略retry_strategy = Retry(total=3,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504],)# 配置连接适配器adapter = HTTPAdapter(pool_connections=10,  # 连接池大小pool_maxsize=20,      # 最大连接数max_retries=retry_strategy)# 应用到客户端self.client.mount("http://", adapter)self.client.mount("https://", adapter)# 设置超时self.client.timeout = (5, 30)  # 连接超时5秒,读取超时30秒@taskdef optimized_request(self):"""优化后的请求"""self.client.get("/get")

实时监控和告警

自定义监控指标

from locust import HttpUser, task, events
import psutil
import timeclass SystemMonitor:"""系统资源监控"""def __init__(self):self.start_time = time.time()self.cpu_samples = []self.memory_samples = []def collect_metrics(self):"""收集系统指标"""cpu_percent = psutil.cpu_percent()memory_percent = psutil.virtual_memory().percentself.cpu_samples.append(cpu_percent)self.memory_samples.append(memory_percent)# 检查告警条件if cpu_percent > 80:print(f"⚠️  CPU使用率过高: {cpu_percent}%")if memory_percent > 80:print(f"⚠️  内存使用率过高: {memory_percent}%")# 全局监控实例
monitor = SystemMonitor()@events.test_start.add_listener
def start_monitoring(environment, **kwargs):"""开始监控"""print("📊 开始系统监控...")@events.request_success.add_listener
def on_success(request_type, name, response_time, response_length, **kwargs):"""请求成功时收集指标"""monitor.collect_metrics()class MonitoredUser(HttpUser):wait_time = between(1, 2)host = "https://httpbin.org"@taskdef monitored_request(self):"""被监控的请求"""self.client.get("/get")

测试报告定制

生成自定义报告

from locust import events
import json
import os
from datetime import datetimeclass CustomReporter:"""自定义报告生成器"""def __init__(self):self.test_start_time = Noneself.test_end_time = Noneself.custom_data = {"test_info": {},"performance_metrics": {},"error_summary": []}def generate_report(self, environment):"""生成自定义报告"""stats = environment.stats# 收集基本信息self.custom_data["test_info"] = {"start_time": self.test_start_time.isoformat(),"end_time": self.test_end_time.isoformat(),"duration": (self.test_end_time - self.test_start_time).total_seconds(),"total_users": environment.runner.user_count}# 收集性能指标self.custom_data["performance_metrics"] = {"total_requests": stats.total.num_requests,"total_failures": stats.total.num_failures,"average_response_time": stats.total.avg_response_time,"rps": stats.total.current_rps,"failure_rate": stats.total.fail_ratio}# 保存报告report_file = f"custom_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"with open(report_file, 'w', encoding='utf-8') as f:json.dump(self.custom_data, f, ensure_ascii=False, indent=2)print(f"📋 自定义报告已生成: {report_file}")# 全局报告器实例
reporter = CustomReporter()@events.test_start.add_listener
def on_test_start(environment, **kwargs):reporter.test_start_time = datetime.now()@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):reporter.test_end_time = datetime.now()reporter.generate_report(environment)

高级技巧总结

1. 性能调优建议

  • 合理设置用户数:不要一味追求高并发,要根据实际业务场景
  • 监控客户端资源:确保压测机器不成为瓶颈
  • 使用连接池:减少连接建立开销
  • 适当的等待时间:模拟真实用户行为

2.调优技巧

# 开发时使用调试模式
import logging
logging.basicConfig(level=logging.DEBUG)# 单用户调试
if __name__ == "__main__":from locust.env import Environmentenv = Environment(user_classes=[MyUser])env.create_local_runner()env.runner.start(1, spawn_rate=1)import timetime.sleep(10)env.runner.quit()

3. 最佳实践

  • 模块化设计:将公共功能抽取为基类

  • 数据驱动:使用外部数据文件驱动测试

  • 环境隔离:不同环境使用不同配置

  • 持续集成:集成到CI/CD流水线

参考 https://www.coder-ljx.cn:8825/stressStabilityTest/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/963513.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Aoao Round 2 比赛总结

分数: \(100 + 25 + 20 + 0 = 145\) 好一个神秘 seq 赛。 T1 不难发现,一个符合要求的序列需要是连续的,且其中比 \(b\) 大的数和比 \(b\) 小的数数量相等。 因此,我们可以以 \(b\) 为起点,分别向两侧扫描,把比 …

基于遗传算法的PID控制器参数整定方法详解

基于遗传算法的PID控制器参数整定方法详解一、算法原理与核心流程 1. 遗传算法优化框架2. 关键参数编码染色体结构:采用实数编码直接表示Kp、Ki、Kd参数取值范围: Kp ∈ [0.1, 100], Ki ∈ [0, 50], Kd ∈ [0, 20] …

QT项目复盘:如何在有限资源下把桌面端做成‘高端应用’?

QT项目复盘:如何在有限资源下把桌面端做成‘高端应用’?项目背景:资源困境与“高端”诉求的矛盾 兰亭妙微曾接手某工业数据分析桌面应用开发,核心诉求是“高端化”——既要满足工程师对数据处理的高性能需求,又要…

11.12 联考总结

11.10 联考总结 前三题都很简单,但我二三题都调了较长的时间,很不好。 留给T4的时间不是很多。发现可以分解质因数,转化为网格图上不选相邻格的计数。理论复杂度是 \(O(\sqrt{N}\log_2N\log_3N2^{\log_3N})\) 似乎只…

揭开时序数据库的秘密:为何它是数据存储的未来?

在万物互联的时代,数据正以指数级速度增长。从智能工厂的传感器到金融市场的实时交易,从能源网络的监控到车联网的轨迹追踪,这些场景产生的数据都有一个共同特征——时间戳驱动。传统关系型数据库在处理这类高频、海…

Java中将String字符串转换为算术表达式并计算

在Java中,将字符串表示的算术表达式进行计算,可以借助现有的库,如Apache Commons JEXL或使用Java的内置脚本引擎JSR 223进行。但是,如果要自己解析和计算算术表达式,就需要零起步实现一个表达式解析器。这涉及到对…

按钮固定在底部

按钮固定在底部1 wxml<view class="fixed-bottom-btn"><view class="button" bindtap="goYuding">马上预定</view> </view> 2 wxss/* 固定在底部的按钮容器 *…

locust基础

它采用纯 Python 实现,是一个分布式用户负载测试的工具。 使用基于 Requests 库的客户端发起请求,使编写脚本大大简化; 在模拟并发方面摒弃进程和线程,完全基于时间驱动,采用协程(gevent)提供的非阻塞 IO 和 cor…

基于HSMS通信标准的SECS通讯程序

HSMS(High-Speed SECS Message Services)通信标准的SECS通讯程序开发 结合SEMI E37标准核心规范 一、HSMS协议核心机制 1. 连接模式与状态机模式 角色 关键流程被动模式 设备端(服务端) 监听端口(默认5000)→ 接…

设置fdfs自动启动

fdfs设置服务,且自启动很久以前的fdfs服务一直在跑,只是没有设置为服务,且无自启动。使用deepseek,指导我完成了整个设置,几乎无脑。 1、问deepseek,提示词如下执行ps -ef| grep fdfs显示如下,该如何创建服务?…

完整教程:Redis GEO 模块深度解析:从原理到高可用架构实践

pre { white-space: pre !important; word-wrap: normal !important; overflow-x: auto !important; display: block !important; font-family: "Consolas", "Monaco", "Courier New", …

办公楼设计多少钱一平?广州办公楼设计收费标准

办公楼设计费的收费标准因设计公司资质、项目规模、设计复杂程度以及地区差异等因素而有所不同。那么,办公楼设计多少钱一平? 一、办公楼设计多少钱一平 1.普通设计师:按建筑平米收费,设计费用在70-130元/㎡。 2.资深…

macOS 下载汇总 (系统、应用和教程) - macOS Tahoe 26

macOS Tahoe 26, macOS Sequoia 15, macOS Sonoma 14, macOS Ventura 13, macOS Monterey 12, macOS Big Sur 11macOS 下载汇总 (系统、应用和教程) macOS Tahoe 26, macOS Sequoia 15, macOS Sonoma 14, macOS Ventur…

使用page-meta为u-popup的遮罩层添加穿透屏蔽

根节点下添加page-meta <page-meta :page-style="overflow:${showAll ? hidden : visible};"></page-meta>在打开u-popup时将变量showAll 设置为true即可 关闭时记得解开

2025年广州到吉尔吉斯斯坦海运公司权威推荐榜单:广州到吉尔吉斯斯坦运输/广州到吉尔吉斯斯坦双清门到门/广州到吉尔吉斯斯坦双清源头公司精选

在"一带一路"倡议深入发展的背景下,广州作为中国南大门与吉尔吉斯斯坦间的贸易往来日益密切,选择一家可靠的海运公司已成为众多出口企业的关键决策。 本文将基于企业实力、运输网络、清关能力、数字化服务…

AI人力资源管理系统如何让HR的工作更高效、更有判断力

随着人工智能技术的快速发展,企业数字化的重心正从流程自动化转向智能化决策。在这一过程中,人力资源管理系统的角色也在悄然改变。传统HR系统更多承担的是事务管理职能,例如考勤、薪酬、审批和档案维护,它让人力工…

etcd 参数调整

B. 参数调优(增强抗抖动能力) 以 static-pod 方式启动的 etcd,修改 /etc/kubernetes/manifests/etcd.yaml:yaml 复制# 在 command 数组里追加或修改 - --heartbeat-interval=300 # 默认 100ms → 300ms -…