批量造数据
- 连接Mysql的信息
-
1 import pymysql -
2 # 数据库连接信息 -
3 # 多个库要有多个conn -
4 conn = pymysql.connect( -
5 host="主机", -
6 user="用户名", -
7 password="密码", -
8 database="库名" -
9 ) -
10 conn1 = pymysql.connect( -
11 host="主机", -
12 user="用户名", -
13 password="密码", -
14 database="库名" -
15 ) -
16 -
17 # 创建游标对象 -
18 cursor = conn.cursor() -
19 cursor1 = conn1.cursor() -
20 -
21 # 执行对应的SQL -
22 cursor.execute -
23 # 获取执行结果 -
24 Result=cursor.fetchall()
-
场景一:基于已有的csv文件,分批次读取csv文件中的字段值作为变量填充到执行的SQL语句
- 分批读取csv文件中的值
-
1 csv_file_path = 'csv文件目录' -
2 with open(csv_file_path, 'r',encoding='utf-8') as file: -
3 reader = csv.reader(file) -
4 next(reader) # Skip the header row -
5 -
6 batch_size = 100 # 每批处理的数量 -
7 total_items = 3100 # 总共需要处理的数量 -
8 -
9 for i in range(0, total_items, batch_size): -
10 # 在每次循环中处理 batch_size 个项目 -
11 # 可以在循环体内部使用 i 作为起始索引 -
12 -
13 for j in range(i, min(i + batch_size, total_items)): -
14 row = next(reader) -
15 # 打印这一行的数据 -
16 print(row)
场景一:基于已有的csv文件,分批次读取csv文件中的字段值作为变量填充到执行的SQL语句
- 分批读取csv文件中的值
-
1 csv_file_path = 'csv文件目录' -
2 with open(csv_file_path, 'r',encoding='utf-8') as file: -
3 reader = csv.reader(file) -
4 next(reader) # Skip the header row -
5 -
6 batch_size = 100 # 每批处理的数量 -
7 total_items = 3100 # 总共需要处理的数量 -
8 -
9 for i in range(0, total_items, batch_size): -
10 # 在每次循环中处理 batch_size 个项目 -
11 # 可以在循环体内部使用 i 作为起始索引 -
12 -
13 for j in range(i, min(i + batch_size, total_items)): -
14 row = next(reader) -
15 # 打印这一行的数据 -
16 print(row)
场景二:随机生成特殊字段的值,作为变量填充到Insert语句中
- 随机生成统代
-
1 import random -
2 import string -
3 def generate_credit_code(): -
4 # 生成第1位登记管理部门代码 -
5 管理部门代码 = ['1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D'] -
6 register_department = random.choice(管理部门代码) -
7 # print('管理部门代码为',register_department) -
8 -
9 # 生成2-9位组织机构代码 -
10 organizations_code = [] -
11 for _ in range(8): -
12 org_code = '' -
13 for _ in range(8): -
14 org_code += random.choice(string.ascii_uppercase + string.digits) -
15 organizations_code.append(org_code) -
16 organizations_code=random.choice(organizations_code) -
17 # print('组织机构代码为',organizations_code) -
18 -
19 -
20 # 生成10-17位统一社会信用代码 -
21 unification_credit_code = '' -
22 for _ in range(8): -
23 unification_credit_code += random.choice(string.ascii_uppercase + string.digits) -
24 # print('统一社会信用代码为',unification_credit_code) -
25 -
26 # 组合统一社会信用代码 -
27 credit_code = f"{register_department}{''.join(organizations_code)}{unification_credit_code}" -
28 return credit_code
-
随机生成注册号
-
1 mport random -
2 -
3 #这个注册号是由15个随机数字组成的,使用random.choice方法从0-9中随机选择数字。这个方法会被调用15次,每次都会生成一个随机数字,然后通过字符串的join方法将这15个数字拼接在一起。 -
4 def generate_reg_code(): -
5 # 15位注册号,以0开头 -
6 reg_code = ''.join(random.choice('0123456789') for i in range(15)) -
7 return reg_code
结合python+pytest+fixture 实现定时任务接口调用
目录结构
(有些乱。。。

-- config.ini 存放的是系统固定的url之类的

-- conftest.py 一般用于放登录接口,用户返回token,利用fixture被其他接口使用
-
1 import pytest -
2 import requests -
3 import pymysql -
4 from config import readconfig -
5 readcon = readconfig.Read() -
6 -
7 -
8 @pytest.fixture(scope="session") -
9 # 这个方法是pytest封装公共方法的一个文件,文件名必须是(conftest.py) -
10 # 作用: 其他地方在使用这个方法时就不用from XX import cc 然后也不用实例化了 -
11 -
12 -
13 def test_login(): -
14 msg = { -
15 "username": '用户名', -
16 "password": '加密后的密码' -
17 } -
18 -
19 url =readcon.get_URL("baseurl") -
20 cc = requests.post(url+"api/uxxxxxxr/login", params=msg) -
21 getjson = cc.json() -
22 -
23 # 获取token -
24 tok = getjson['data']['token'] -
25 userid = getjson['data']['userId'] -
26 return tok, userid
定时任务
-
import pytest -
import requests -
from config import readconfig -
read = readconfig.Read() -
class TestCase1: -
global url, tim # 全局变量,便于其他地方调用 -
url = read.get_URL("baseurl") -
tim = read.get_URL("timeout") -
def test_case1(self, test_login): -
head = {'Content-Type': 'application/json', 'Authorization': test_login[0]} # test_login[0]为token -
NewtestCreditCodeList = [] -
SelectNewtestGs = "select 字段1,字段2,字段3 from 数据表 order by id desc limit 100" -
cursor.execute(SelectNewtestGs) -
SelectNewtestResult = cursor.fetchall() -
-
for tuple in SelectNewtestResult: -
NewtestCreditCodeList.append(tuple[2]) -
NewtestGsCreditCodeListResult = ', '.join('"' + i + '"' for i in NewtestCreditCodeList) -
print('结果为', NewtestGsCreditCodeListResult) -
r = requests.get(url + 'api/exxxxxh/txxx/xxxxx?入参='+NewtestGsEidListResult, headers=head) -
print(r.json())
总结:
感谢每一个认真阅读我文章的人!!!
作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助。

-
文档获取方式: -
加入我的软件测试交流群:680748947免费获取~(同行大佬一起学术交流,每晚都有大佬直播分享技术知识点)
这份文档,对于想从事【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!
以上均可以分享,只需要你搜索vx公众号:程序员雨果,即可免费领取