简单介绍pymysql的一些操作,增改删查
增
先建表,再写数据至表中
 除查询操作外,增改删都需要commit操作,具体原理看ref.1
import pandas as pd
import pymysql
import time
import warnings
warnings.filterwarnings("ignore")
 
建表
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
create_sql = """
create table user(id int NOT NULL AUTO_INCREMENT,`name` varchar(50) NOT NULL,`age` int NOT NULL,PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
"""
try:# 执行sql语句cur.execute(create_sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
desc user;
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句con.commit()
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
 
  Field         Type Null  Key Default           Extra
0    id          int   NO  PRI    None  auto_increment
1  name  varchar(50)   NO         None                
2   age          int   NO         None                
 
插入数据
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age)values('小明', 14)"
try:# 执行sql语句t1 = time.time()for i in range(row_nums):cur.execute(sql)con.commit()  # 提交t2 = time.time()print(f"循环写入耗时:{t2 - t1}")  # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
 
循环写入耗时:39.632535457611084
 
批量写入
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
row_nums = 500000
sql = "insert into user(name, age) values(%s,%s)"
citys = [('小明', 14) for i in range(row_nums)
]try:# 执行sql语句t1 = time.time()# citys是参数组合,每个元素对应一行insert sql的对应字段,可以是元组,也可以是列表cur.executemany(sql, citys)  # 批量执行con.commit()  # 提交t2 = time.time()print(f"批量写入耗时:{t2 - t1}")  # 7s
except:# 发生错误时回滚con.rollback()
# 关闭游标
cur.close
# 关闭数据库连接
con.close()
 
批量写入耗时:5.722973823547363
 
批量写入有明显的速度优势,注意"insert into user(name, age) values(%s,%s)",values前面有空格,具体原因看ref.2
pyspark批量写入
数据量巨大时,可以结合spark的foreachPartition算子,并行写入
import pandas as pd
import time
import pymysql
import functools
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
 
def get_or_create_hudi(app_name):spark = SparkSession \.builder \.appName(app_name) \.config("spark.driver.maxResultSize", "10g") \.config("spark.sql.execution.arrow.enabled", "true") \.config("spark.dynamicAllocation.enabled", "false") \.config("spark.sql.crossJoin.enabled", "true") \.config("spark.kryoserializer.buffer.max", "512m") \.config("spark.io.compression.codec", "snappy") \.config("spark.sql.hive.convertMetastoreParquet", "false") \.config("spark.hadoop.dfs.namenode.acls.enabled", "false") \.config("spark.sql.hive.convertMetastoreParquet", "false") \.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.enableHiveSupport() \.getOrCreate()spark.sparkContext.setLogLevel('ERROR')print("\n")print("\n")return spark
 
def insert2mysql_partrdd(part, db_param="", value_cols=['name', 'age'], batch=40000):"""@param part:@param db_param: mysql配置信息@param value_cols: insert 列名称@param batch: 批插入数据量@return:"""con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")cur = con.cursor(cursor=pymysql.cursors.DictCursor)cnt = 0batch_list = []sql = sql = "insert into user(name, age) values(%s,%s)"for row in part:# 这个操作可能会比较耗时。。有没有好方法优化下?batch_list.append([row[i] for i in value_cols])cnt = cnt + 1if cnt > 0 and cnt % batch == 0:cur.executemany(sql, batch_list)con.commit()  # 提交batch_list = []print(f"第{cnt - batch}-{cnt}行数据插入MySql!")# 最后一波数据如果不是batch余数,也推过去if cnt % batch != 0:cur.executemany(sql, batch_list)con.commit()  # 提交print(f"第{cnt - cnt % batch}-{cnt}行数据插入MySql!")if cnt > 0:print(f"数据抽样-key:{row}")print(f"cnt:{cnt}")else:print("该分区无数据")cur.close()con.close()
 
row_nums = 500000df = pd.DataFrame({"name": ['小明'] * row_nums, 'age': [14] * row_nums})
spark = get_or_create_hudi("test")
spark_df = spark.createDataFrame(df).repartition(10)
 
t1 = time.time()
spark_df.rdd.foreachPartition(functools.partial(insert2mysql_partrdd, batch=50000))
t2 = time.time()
print(f"spark批写入耗时:{t2 - t1}")  # 1.2s
 
spark批写入耗时:8.034992456436157
 
- 速度上似乎没有更快
 - 可能数据量再大些,会有效果
 - 另,单机跑spark也可能有些影响
 
删
刚才搞了100w数据,删除些
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
delete from user where id>10
"""
try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select count(*) as cnt from  user
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
   cnt
0   10
 
还剩10条数据
查
结合pandas,把查询的数据转成df
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from  user limit 100
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
   id name  age
0   1   小明   14
1   2   小明   14
2   3   小明   14
3   4   小明   14
4   5   小明   14
5   6   小明   14
6   7   小明   14
7   8   小明   14
8   9   小明   14
9  10   小明   14
 
改
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
update user set name = '小红'  where id<=5
"""
try:# 执行sql语句cur.execute(sql)# 执行sql语句con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
con = pymysql.connect(host='localhost',port=3306,user='root',password='12345',db='ai',charset="utf8")
# 创建游标(默认数据返回tuple,修改为dict)
cur = con.cursor(cursor=pymysql.cursors.DictCursor)
sql = """
select * from  user limit 100
"""
try:# 执行sql语句cur.execute(sql)get_df = pd.DataFrame(cur.fetchall())print(get_df)# 执行sql语句# con.commit()
except:# 发生错误时回滚print("发生错误,回滚")con.rollback()# 关闭数据库连接
con.close()
 
   id name  age
0   1   小红   14
1   2   小红   14
2   3   小红   14
3   4   小红   14
4   5   小红   14
5   6   小明   14
6   7   小明   14
7   8   小明   14
8   9   小明   14
9  10   小明   14
 
Ref
[1] https://www.runoob.com/python3/python3-mysql.html
 [2] https://www.modb.pro/db/184700
2023-07-28 台风、大雨 于南京市江宁区