目录
写在前面
setting.py
sqlite_tool.py
future_widget.py
写在前面
本文默认已经创建了项目,如果不知道如何创建一个空项目的,请参看以下两篇博文
PyQt5将项目搬到一个新的虚拟环境中
https://blog.csdn.net/m0_37967652/article/details/122625280
python_PyQt5开发工具结构基础
https://blog.csdn.net/m0_37967652/article/details/131969032
前序:
【期货日数据维护与使用_日数据维护_sqlite3数据库创建】 博文
【期货日数据维护与使用_日数据维护_模块运行演示】 博文
【期货日数据维护与使用_日数据维护_界面代码】 博文
【期货日数据维护与使用_日数据维护_合约更新】 博文
【期货日数据维护与使用_日数据维护_日数据更新】 博文
setting.py
import os
PROJECT_ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) + os.path.sepYOUKUANG_D_DIR = PROJECT_ROOT_DIR + 'youkuang_data' + os.path.sep + 'd' + os.path.sep
YOUKUANG_MAIN_DIR = PROJECT_ROOT_DIR + 'youkuang_data' + os.path.sep + 'main' + os.path.sep
SQLITE_FROM_DIR = PROJECT_ROOT_DIR + 'tools' + os.path.sep + 'sqlite_dir_from' + os.path.sep
SQLITE_TO_DIR = PROJECT_ROOT_DIR + 'tools' + os.path.sep + 'sqlite_dir_to' + os.path.sep
sqlite_tool.py
import sqlite3,os,datetime,json
import pandas as pd
from typing import List
from setting import PROJECT_ROOT_DIRYOUKUANG_DB_NAME = PROJECT_ROOT_DIR + 'youkuang_data' + os.path.sep + 'youkuang_db.db'# 创建产品表 t_product
def create_product_table():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()c.execute('''create table if not exists t_product (code text, name text,exchange_name text)''')conn.commit()conn.close()passdef batch_insert_product(pre_list: List):''':param pre_list: [(code,name),(code,name)]:return:'''conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_product values (?,?)'''c.executemany(sql_str,pre_list)# econn.commit()conn.close()passdef one_insert_product(pre_one:List):''':param pre_one: [code,name]:return:'''conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_product values (?,?)'''c.execute(sql_str,pre_one)# econn.commit()conn.close()passdef query_products_of_exchange_name(exchange_name:str):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sexchange_name = '\'' + exchange_name + '\''sql_str = '''select code,name from t_product where exchange_name={exchange_name}'''.format(exchange_name=exchange_name)c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_list# 返回产品代码列表 [AG,AU,A...]
def query_all_product_codes_of_product():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''select code from t_product'''c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()res_list00 = []for item in res_list:res_list00.append(item[0])return res_list00def create_symbol_basemsg_table():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()c.execute('''create table if not exists t_symbol_basemsg(ticker text, listDate text,product_code text,minChgPriceNum float,minChgPriceUnit text,limitUpNum float,limitUpUnit text,limitDownNum float,limitDownUnit text,contMultNum float,contMultUnit text,tradeMarginRatio float,deliYear integer,deliMonth integer,lastTradeDate text,firstDeliDate text,lastDeliDate text,tradeCommiNum float,tradeCommiUnit text)''')conn.commit()conn.close()passdef batch_insert_symbol_basemsg(pre_list:List):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_symbol_basemsg values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'''c.executemany(sql_str, pre_list)# econn.commit()conn.close()passdef query_all_from_symbol_basemsg()->List:conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''select ticker,listDate,product_code,minChgPriceNum,minChgPriceUnit,limitUpNum,limitUpUnit,limitDownNum,limitDownUnit,contMultNum,contMultUnit,tradeMarginRatio,deliYear,deliMonth,lastTradeDate,firstDeliDate,lastDeliDate,tradeCommiNum,tradeCommiUnit from t_symbol_basemsg'''c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_list# 查询所有的ticker和deliYear字段
def query_all_ticker_deliYear_from_symbol_basemsg():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''select ticker,deliYear from t_symbol_basemsg'''c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_listpass# 条件查询返回ticker和deliYear字段
def query_pro_ticker_deliYear_from_symbol_basemsg_by_query(tickers:List,deliYear:int):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sticker_str = '\',\''.join(tickers)ticker_str = '\'' + ticker_str + '\''sql_str = '''select product_code,ticker,deliYear from t_symbol_basemsg where ticker in ({tickers}) and deliYear>={deliYear}'''.format(tickers=ticker_str,deliYear=deliYear)c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_listdef query_tickers_of_one_pro_in_symbol_basemsg(product_code:str):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sproduct_code_str = '\''+product_code+'\''sql_str = '''select ticker,product_code,listDate,deliYear from t_symbol_basemsg where product_code={product_code}'''.format(product_code=product_code_str)c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_listdef create_main_symbol_table():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()c.execute('''create table if not exists t_main_symbol (product_code text, ticker text,deliYear integer,start_date text)''')conn.commit()conn.close()passdef one_insert_main_symbol(pre_one:List):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_main_symbol values (?,?,?,?)'''c.execute(sql_str, pre_one)# econn.commit()conn.close()passdef update_main_symbol(product_code:str,ticker:str,deliYear:int,start_date_str:str):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sticker = '\''+ticker+'\''start_date_str = '\'' + start_date_str + '\''product_code = '\'' + product_code + '\''sql_str = '''update t_main_symbol set ticker={ticker},deliYear={deliYear},start_date={start_date} where product_code={product_code}'''.format(ticker=ticker,deliYear=deliYear,start_date=start_date_str,product_code=product_code)c.execute(sql_str)# econn.commit()conn.close()pass# 先判断是否有,有的话update,没有的话insert
def input_main_symbol(product_code:str,ticker:str,deliYear:int,start_date:str=None):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sproduct_code_str = '\''+product_code+'\''sql_str_query = '''select * from t_main_symbol where product_code={product_code}'''.format(product_code=product_code_str)c.execute(sql_str_query)res_one = c.fetchone()if len(res_one)<=0:# 新增品种插入pre_one = [product_code,ticker,deliYear,start_date]one_insert_main_symbol(pre_one)passelse:# 已有品种修改update_main_symbol(product_code,ticker,deliYear,start_date)pass# econn.commit()conn.close()passdef create_online_symbol_table():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()c.execute('''create table if not exists t_online_symbol (product_code text, ticker text, deliYear integer, newest_date text)''')conn.commit()conn.close()passdef one_insert_online_symbol(pre_one:List):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_online_symbol values (?,?,?,?)'''c.execute(sql_str, pre_one)# econn.commit()conn.close()passdef batch_insert_online_symbol(pre_list:List):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_online_symbol values (?,?,?,?)'''c.executemany(sql_str, pre_list)# econn.commit()conn.close()passdef query_online_tickers_of_one_pro_in_online_symbol(product_code:str):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sproduct_code_str = '\'' + product_code + '\''sql_str = '''select ticker,deliYear,newest_date from t_online_symbol where product_code={product_code}'''.format(product_code=product_code_str)c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_listdef query_all_from_online_symbol():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''select product_code,ticker,deliYear,newest_date from t_online_symbol'''c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_listdef delete_all_data_of_online_symbol():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''delete from t_online_symbol'''c.execute(sql_str)# econn.commit()conn.close()passdef create_last30_daily_table():conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()c.execute('''create table if not exists t_last30_daily (product_code text,ticker text,deliYear integer,tradeDate text,openPrice float,highestPrice float,lowestPrice float,closePrice float,settlePrice float,turnoverVol integer,turnoverValue integer,openInt integer)''')conn.commit()conn.close()passdef batch_insert_last30_daily(pre_list: List):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# ssql_str = '''insert into t_last30_daily values (?,?,?,?,?,?,?,?,?,?,?,?)'''c.executemany(sql_str,pre_list)# econn.commit()conn.close()passdef query_daily_by_pro_in_last30_daily(product_code:str):conn = sqlite3.connect(YOUKUANG_DB_NAME)c = conn.cursor()# sproduct_code_str = '\'' + product_code + '\''sql_str = '''select product_code,ticker,deliYear,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,settlePrice,turnoverVol,turnoverValue,openInt from t_last30_daily where product_code={product_code}'''.format(product_code=product_code_str)c.execute(sql_str)res_list = c.fetchall()# econn.commit()conn.close()return res_list
future_widget.py
import sys,os,typing,datetime,shutil,json,traceback
from typing import Dict,Any,List
import pandas as pd
from threading import Thread
from PyQt5 import QtCore,QtWidgets,QtGui
from tools import sqlite_tool
from setting import YOUKUANG_D_DIR,YOUKUANG_MAIN_DIR,SQLITE_FROM_DIR,SQLITE_TO_DIR# 数据更新界面
class DailyUpdateWidget(QtWidgets.QWidget):signal_excute = QtCore.pyqtSignal(object)def __init__(self):super().__init__()self.thread: Thread = Noneself.init_data()self.init_ui()self.register_event()self.init_start()passdef init_data(self):self.left_table_headers = ['品种代码', '合约', '交割年', '最新日期']self.mark_str_init: str = 'init'self.mark_str_error: str = 'error'self.mark_str_log_str: str = 'log_str'self.mark_str_step_one: str = 'step_one'self.mark_str_step_two: str = 'step_two'self.symbol_basemsg_column_list: List = ['ticker', 'listDate', 'contractObject', 'minChgPriceNum','minChgPriceUnit', 'limitUpNum', 'limitUpUnit', 'limitDownNum','limitDownUnit', 'contMultNum', 'contMultUnit', 'tradeMarginRatio','deliYear', 'deliMonth', 'lastTradeDate', 'firstDeliDate','lastDeliDate', 'tradeCommiNum', 'tradeCommiUnit']self.online_symbol_column_list: List = ['product_code', 'ticker', 'deliYear', 'newest_date']self.d_csv_column_list: List = ['tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice','settlePrice', 'turnoverVol', 'turnoverValue', 'openInt']self.last30_daily_column_list: List = ['product_code', 'ticker', 'deliYear', 'tradeDate', 'openPrice','highestPrice', 'lowestPrice', 'closePrice', 'settlePrice','turnoverVol', 'turnoverValue', 'openInt']self.pre_d_csv_column_list: List = ['o_date', 'tradeDate', 'openPrice', 'highestPrice', 'lowestPrice','closePrice', 'settlePrice', 'turnoverVol', 'turnoverValue', 'openInt']self.main_csv_column_list: List = ['ticker', 'deliYear', 'tradeDate', 'openPrice', 'highestPrice','lowestPrice', 'closePrice', 'settlePrice', 'turnoverVol', 'turnoverValue','openInt']self.pre_main_csv_column_list: List = ['product_code', 'ticker', 'deliYear', 'tradeDate', 'openPrice','highestPrice', 'lowestPrice', 'closePrice', 'settlePrice','turnoverVol', 'turnoverValue', 'openInt']passdef init_ui(self):self.setWindowTitle('日数据更新(优矿)')self.setMinimumHeight(600)self.setMinimumWidth(800)self.caculate_progress = QtWidgets.QProgressBar()self.caculate_status_label = QtWidgets.QLabel()layout_progress = QtWidgets.QHBoxLayout()layout_progress.addWidget(self.caculate_progress)layout_progress.addWidget(self.caculate_status_label)deliYear_tip = QtWidgets.QLabel('设置计算要使用的交割年份:')self.deliYear_spinbox = QtWidgets.QSpinBox()self.deliYear_spinbox.setMinimum(1900)self.deliYear_spinbox.setMaximum(9999)self.deliYear_spinbox.setValue(datetime.datetime.now().year)layout_one = QtWidgets.QHBoxLayout()layout_one.addWidget(deliYear_tip)layout_one.addWidget(self.deliYear_spinbox)layout_one.addStretch(1)choice_symbol_dir_btn = QtWidgets.QPushButton('选择合约所在目录')choice_symbol_dir_btn.clicked.connect(self.choice_symbol_dir_btn_clicked)self.choice_symbol_dir_lineedit = QtWidgets.QLineEdit()self.choice_symbol_dir_lineedit.setReadOnly(True)excute_step_one_btn = QtWidgets.QPushButton('执行')excute_step_one_btn.clicked.connect(self.excute_step_one_btn_clicked)layout_two = QtWidgets.QHBoxLayout()layout_two.addWidget(choice_symbol_dir_btn)layout_two.addWidget(self.choice_symbol_dir_lineedit)layout_two.addWidget(excute_step_one_btn)self.new_symbol_json_down_btn = QtWidgets.QPushButton('新增合约json下载')self.new_symbol_json_down_btn.clicked.connect(self.new_symbol_json_down_btn_clicked)self.append_symbol_json_down_btn = QtWidgets.QPushButton('追加合约json下载')self.append_symbol_json_down_btn.clicked.connect(self.append_symbol_json_down_btn_clicked)self.new_symbol_json_down_btn.setDisabled(True)self.append_symbol_json_down_btn.setDisabled(True)layout_three = QtWidgets.QHBoxLayout()layout_three.addWidget(self.new_symbol_json_down_btn)layout_three.addWidget(self.append_symbol_json_down_btn)choice_symbol_dir_label = QtWidgets.QLabel('提示:从优矿中下载最新的合约列表,放在一个目录中,选择该目录。执行完后,下载要更新的合约json文件')# 分界线 sh_line_one = QtWidgets.QFrame()h_line_one.setFrameShape(QtWidgets.QFrame.HLine)h_line_one.setFrameShadow(QtWidgets.QFrame.Sunken)# 分界线 eself.choice_daily_dir_btn = QtWidgets.QPushButton('选择日数据所在目录')self.choice_daily_dir_btn.clicked.connect(self.choice_daily_dir_btn_clicked)self.choice_daily_dir_btn.setDisabled(True)self.choice_daily_dir_lineedit = QtWidgets.QLineEdit()self.choice_daily_dir_lineedit.setReadOnly(True)excute_step_two_btn = QtWidgets.QPushButton('执行')excute_step_two_btn.clicked.connect(self.excute_step_two_btn_clicked)layout_four = QtWidgets.QHBoxLayout()layout_four.addWidget(self.choice_daily_dir_btn)layout_four.addWidget(self.choice_daily_dir_lineedit)layout_four.addWidget(excute_step_two_btn)choice_daily_dir_label = QtWidgets.QLabel('提示:从优矿在下载待更新的日数据,放在一个目录中,选择该目录')# 分界线 sh_line_two = QtWidgets.QFrame()h_line_two.setFrameShape(QtWidgets.QFrame.HLine)h_line_two.setFrameShadow(QtWidgets.QFrame.Sunken)# 分界线 etip_label = QtWidgets.QLabel('当前在线合约:')refresh_btn = QtWidgets.QPushButton('刷新')refresh_btn.clicked.connect(self.refresh_btn_clicked)download_btn = QtWidgets.QPushButton('下载')download_btn.clicked.connect(self.download_btn_clicked)layout_five = QtWidgets.QHBoxLayout()layout_five.addWidget(tip_label)layout_five.addStretch(1)layout_five.addWidget(refresh_btn)layout_five.addWidget(download_btn)self.left_table = QtWidgets.QTableWidget()self.left_table.setColumnCount(len(self.left_table_headers))self.left_table.setHorizontalHeaderLabels(self.left_table_headers)layout_six = QtWidgets.QVBoxLayout()layout_six.addLayout(layout_five)layout_six.addWidget(self.left_table)log_label = QtWidgets.QLabel('日志:')self.log_textedit = QtWidgets.QTextEdit()self.log_textedit.setReadOnly(True)layout_seven = QtWidgets.QVBoxLayout()layout_seven.addWidget(log_label)layout_seven.addWidget(self.log_textedit)layout_eight = QtWidgets.QHBoxLayout()layout_eight.addLayout(layout_six)layout_eight.addLayout(layout_seven)layout = QtWidgets.QVBoxLayout()layout.addLayout(layout_progress)layout.addLayout(layout_one)layout.addLayout(layout_two)layout.addLayout(layout_three)layout.addWidget(choice_symbol_dir_label)layout.addWidget(h_line_one)layout.addLayout(layout_four)layout.addWidget(choice_daily_dir_label)layout.addWidget(h_line_two)layout.addLayout(layout_eight)self.setLayout(layout)passdef register_event(self):self.signal_excute.connect(self.process_excute_event)passdef process_excute_event(self,data:Dict):mark_str = data['mark_str']if mark_str == self.mark_str_init:res_data = data['data']self.fill_left_table_content(res_data)self.write_log('初始化完毕。')self.thread = Noneself.progress_finished()passelif mark_str == self.mark_str_log_str:msg = data['data']self.write_log(msg)passelif mark_str == self.mark_str_error:msg = data['data']self.write_log(msg)self.thread = Noneself.progress_finished()QtWidgets.QMessageBox.information(self,'提示',msg,QtWidgets.QMessageBox.Yes)passelif mark_str == self.mark_str_step_one:self.new_symbol_json_down_btn.setDisabled(False)self.append_symbol_json_down_btn.setDisabled(False)self.choice_daily_dir_btn.setDisabled(False)self.thread = Noneself.progress_finished()self.write_log('合约更新完毕。')passelif mark_str == self.mark_str_step_two:self.write_log('日数据更新完毕')self.thread = Noneself.progress_finished()self.refresh_btn_clicked()# 删除 new.json文件 和 append文件try:os.remove(SQLITE_TO_DIR + 'new.json')self.write_log('new.json文件删除成功')except OSError as e:self.write_log(f"无法删除文件:{e}")try:shutil.rmtree(SQLITE_TO_DIR + 'append' + os.path.sep)self.write_log('append文件夹删除成功')except FileNotFoundError:self.write_log('append文件夹不存在')except PermissionError:self.write_log('删除append文件夹权限不足')passpassdef init_start(self):self.progress_init()self.start_caculate_thread(self.mark_str_init)passdef query_all_online_symbol(self):online_symbol_list = sqlite_tool.query_all_from_online_symbol()return online_symbol_listdef fill_left_table_content(self,data:List):self.left_table.clearContents()self.left_table.setRowCount(len(data))for r_i,row in enumerate(data):for c_i,col in enumerate(row):item = QtWidgets.QTableWidgetItem(str(col))self.left_table.setItem(r_i,c_i,item)passself.left_table.resizeColumnsToContents()passdef choice_symbol_dir_btn_clicked(self):path = QtWidgets.QFileDialog.getExistingDirectory(self,'选择合约所在目录',SQLITE_FROM_DIR)if not path:returnself.choice_symbol_dir_lineedit.setText(path)passdef excute_step_one_btn_clicked(self):dir_path = self.choice_symbol_dir_lineedit.text()if len(dir_path) <= 0:QtWidgets.QMessageBox.information(self,'提示','请选择合约所在目录',QtWidgets.QMessageBox.Yes)returnpre_map = {'dir_path': dir_path,'cur_deliYear': self.deliYear_spinbox.value()}self.start_caculate_thread(self.mark_str_step_one, pre_map)passdef new_symbol_json_down_btn_clicked(self):new_json_path = SQLITE_TO_DIR + 'new.json'if not os.path.exists(new_json_path):QtWidgets.QMessageBox.information(self,'提示','没有新增的合约',QtWidgets.QMessageBox.Yes)returnpath = QtWidgets.QFileDialog.getExistingDirectory(self,'选择要保存的路径','.')if not path:returnoutput_path = path + os.path.sep + 'new.json'shutil.copy(new_json_path, output_path)self.write_log('新增的合约列表导出完毕')QtWidgets.QMessageBox.information(self,'提示','新增的合约列表导出完毕',QtWidgets.QMessageBox.Yes)passdef append_symbol_json_down_btn_clicked(self):append_json_dir = SQLITE_TO_DIR + 'append' + os.path.sepfile_list = os.listdir(append_json_dir)if len(file_list) <= 0:QtWidgets.QMessageBox.information(self,'提示','没有要追加的合约',QtWidgets.QMessageBox.Yes)returnpath = QtWidgets.QFileDialog.getExistingDirectory(self,'选择要保存的路径','.')if not path:returnfor item in file_list:src_file_path = append_json_dir + itemtarget_file_path = path + os.path.sep + itemshutil.copy(src_file_path, target_file_path)self.write_log('待追加日数据的合约列表导出完毕')QtWidgets.QMessageBox.information(self,'提示','待追加日数据的合约列表导出完毕',QtWidgets.QMessageBox.Yes)passdef choice_daily_dir_btn_clicked(self):path = QtWidgets.QFileDialog.getExistingDirectory(self,'选择日数据所在目录',SQLITE_FROM_DIR)if not path:returnself.choice_daily_dir_lineedit.setText(path)passdef excute_step_two_btn_clicked(self):dir_path = self.choice_daily_dir_lineedit.text()if len(dir_path) <= 0:QtWidgets.QMessageBox.information(self,'提示','请选择日数据所在目录',QtWidgets.QMessageBox.Yes)returnpre_map = {'dir_path': dir_path,'cur_deliYear': self.deliYear_spinbox.value()}self.start_caculate_thread(self.mark_str_step_two, pre_map)passdef refresh_btn_clicked(self):res_list = self.query_all_online_symbol()self.fill_left_table_content(res_list)passdef download_btn_clicked(self):file_name = f"online_symbol_{datetime.datetime.now().strftime('%Y%m%d%H%M%S')}.csv"path, _ = QtWidgets.QFileDialog.getSaveFileName(self,'保存online_symbol数据文件',file_name,'CSV(*.csv)')if not path:returnres_list = self.query_all_online_symbol()df = pd.DataFrame(columns=['product_code', 'ticker', 'deliYear', 'newest_date'], data=res_list)df.to_csv(path, encoding='utf-8')QtWidgets.QMessageBox.information(self,'提示','online_symbol下载完毕',QtWidgets.QMessageBox.Yes)passdef start_caculate_thread(self,mark_str:str,data:Dict[str,Any]=None):if self.thread:QtWidgets.QMessageBox.information(self,'提示','线程正在执行任务,请稍后...',QtWidgets.QMessageBox.Yes)returnself.thread = Thread(target=self.running_caculate_thread,args=(mark_str,data,))self.thread.start()self.progress_busy()passdef running_caculate_thread(self,mark_str:str,data:Dict[str,Any]):try:if mark_str == self.mark_str_init:res_list = self.query_all_online_symbol()pre_map = {'mark_str': mark_str,'data': res_list}self.signal_excute.emit(pre_map)elif mark_str == self.mark_str_step_one:dir_path = data['dir_path']cur_deliYear = data['cur_deliYear']exist_tickers_list = sqlite_tool.query_all_ticker_deliYear_from_symbol_basemsg()exist_temp_tickers_list = []for item in exist_tickers_list:exist_temp_tickers_list.append(f"{item[0]}_{item[1]}")passfile_list = os.listdir(dir_path)# 逐一读取合约文件,筛出新增的合约,合并到一个df中new_df = pd.DataFrame()for item in file_list:file_path = dir_path + os.path.sep + itemdf = pd.read_csv(file_path)df['temp_ticker'] = df['ticker'].astype('str').str.cat(df['deliYear'].astype('str'),'_')one_df = df.loc[(~df['temp_ticker'].isin(exist_temp_tickers_list)) & (df['deliYear']>=cur_deliYear)].copy()if len(one_df)>0:new_df = pd.concat([new_df,one_df])pass# 将新增的合约插入到 t_symbol_basemsg 中,并导出到 new.json文件if len(new_df)>0:pre_list = new_df.loc[:,self.symbol_basemsg_column_list].values.tolist()sqlite_tool.batch_insert_symbol_basemsg(pre_list)self.thread_out_log(f"新增合约{len(pre_list)}")new_json = new_df['ticker'].values.tolist()with open(SQLITE_TO_DIR + 'new.json','w',encoding='utf-8') as fw:json.dump(new_json,fw)pass# 追加的ticker列表self.thread_out_log(f"开始计算要追加的ticker列表:")online_tickers = sqlite_tool.query_all_from_online_symbol()df_online = pd.DataFrame(columns=self.online_symbol_column_list,data=online_tickers)df_group = df_online.groupby(by='newest_date')target_json_dir = SQLITE_TO_DIR + 'append' + os.path.sepif not os.path.exists(target_json_dir):os.mkdir(target_json_dir)for name,group in df_group:name_str = name.replace('-','')append_json = group['ticker'].values.tolist()target_file_path = target_json_dir + name_str + '.json'with open(target_file_path,'w',encoding='utf-8') as fw:json.dump(append_json,fw)pre_map = {"mark_str":self.mark_str_step_one,"data":None}self.signal_excute.emit(pre_map)passelif mark_str == self.mark_str_step_two:dir_path = data['dir_path']cur_deliYear = data['cur_deliYear']file_list = os.listdir(dir_path)if len(file_list)<=0:self.thread_out_log('Error Error 没有日数据文件')pre_map = {'mark_str':self.mark_str_error,'data':'Error Error 没有日数据文件'}self.signal_excute.emit(pre_map)return# 将下载的日数据合并到同一个df中df = pd.DataFrame()for item in file_list:file_path = dir_path + os.path.sep + itemdf_one = pd.read_csv(file_path,encoding='utf-8')if not df_one.columns.isin(self.d_csv_column_list).any():self.thread_out_log(f"{item},文件不是日数据文件")continuedf = pd.concat([df,df_one])passif len(df)<=0:self.thread_out_log('Error Error 没有要更新的日数据')pre_map = {'mark_str': self.mark_str_error,'data': 'Error Error 没有要更新的日数据'}self.signal_excute.emit(pre_map)returnpre_add_tickers = df['ticker'].unique()ticker_list = sqlite_tool.query_pro_ticker_deliYear_from_symbol_basemsg_by_query(pre_add_tickers,cur_deliYear)ticker_map = {}for item in ticker_list:ticker_map[item[1]] = itempassdf['o_date'] = pd.to_datetime(df['tradeDate'])online_symbol_list = []df_group = df.groupby(by='ticker',as_index=False)self.thread_out_log('开始逐一处理每个合约:')for name,group in df_group:self.thread_out_log(f'开始追加 {name}')# 将新数据添加到 t_last30_dailypre_new_df = group.loc[:,self.d_csv_column_list].copy()pre_new_df['product_code'] = ticker_map[name][0]pre_new_df['ticker'] = ticker_map[name][1]pre_new_df['deliYear'] = ticker_map[name][2]pre_new_list = pre_new_df.loc[:,self.last30_daily_column_list].values.tolist()sqlite_tool.batch_insert_last30_daily(pre_new_list)# 将新数据更新到csvcsv_file_name = f"{name}_{ticker_map[name][2]}.csv"csv_file_path = YOUKUANG_D_DIR + csv_file_nameif not os.path.exists(csv_file_path):exist_df = pd.DataFrame()else:df_one = pd.read_csv(csv_file_path,encoding='utf-8')df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])exist_df = df_one.loc[:,self.pre_d_csv_column_list].copy()passnew_df = group.loc[:,self.pre_d_csv_column_list].copy()if len(exist_df)>0:exist_df.sort_values(by='o_date',ascending=True,inplace=True)new_df = new_df.loc[new_df['o_date']>exist_df.iloc[-1]['o_date']].copy()passif len(new_df)>0:pre_save_df = pd.concat([exist_df,new_df])pre_save_df.sort_values(by='o_date',ascending=True,inplace=True)pre_save_df = pre_save_df.loc[:,self.d_csv_column_list].copy()pre_save_df.to_csv(csv_file_path,encoding='utf-8')online_symbol_list.append([ticker_map[name][0],name,ticker_map[name][2],pre_save_df.iloc[-1]['tradeDate']])passpass# 删除 t_online_symbol 表格,加入新数据self.thread_out_log('t_online_symbol表格更新')sqlite_tool.delete_all_data_of_online_symbol()sqlite_tool.batch_insert_online_symbol(online_symbol_list)self.thread_out_log('------------- 开始计算主力合约:')product_code_list = sqlite_tool.query_all_product_codes_of_product()for pro_code in product_code_list:self.thread_out_log(f'产品:{pro_code}')csv_main_file_path = YOUKUANG_MAIN_DIR + pro_code + '.csv'if not os.path.exists(csv_main_file_path):# 新增品种cur_main_ticker = Nonecur_main_deliYear = Nonelast_one_tradeDate = Nonepasselse:df_one = pd.read_csv(csv_main_file_path,encoding='utf-8')df_one['o_date'] = pd.to_datetime(df_one['tradeDate'])df_one.sort_values(by='o_date',ascending=True,inplace=True)cur_main_ticker = df_one.iloc[-1]['ticker']cur_main_deliYear = df_one.iloc[-1]['deliYear']last_one_tradeDate = df_one.iloc[-1]['tradeDate']pass# 从 t_last30_daily 中计算追加这几天的主力合约daily_list = sqlite_tool.query_daily_by_pro_in_last30_daily(pro_code)df_two = pd.DataFrame(columns=self.pre_main_csv_column_list,data=daily_list)df_two['o_date'] = pd.to_datetime(df_two['tradeDate'])if last_one_tradeDate:df_two = df_two.loc[df_two['o_date']>last_one_tradeDate].copy()df_two.dropna(inplace=True)if len(df_two) <= 0:self.thread_out_log(f'{pro_code},没有可追加的主力数据')continuedf_two['row_i'] = [i for i in range(len(df_two))]df_two_group = df_two.groupby(by='o_date',as_index=False)df_main_new = pd.DataFrame()next_change_yeah = Falsepre_next_main_ticker = Nonepre_next_main_deliYear = Nonefor two_name,group in df_two_group:if len(group)<=1:# 当天该品种只有一个合约,那当天主力合约就为该合约df_main_new = pd.concat([df_main_new,group.iloc[[0]]])sqlite_tool.input_main_symbol(pro_code, group.iloc[0]['ticker'], group.iloc[0]['deliYear'],group.iloc[0]['tradeDate'])cur_main_ticker = group.iloc[0]['ticker']cur_main_deliYear = group.iloc[0]['deliYear']passelse:# 计算每日最大成交量和最大持仓量df_vol = group.sort_values(by='turnoverVol', ascending=False)df_inte = group.sort_values(by='openInt', ascending=False)if df_vol.iloc[0]['row_i'] == df_inte.iloc[0]['row_i']:# 当日成交量最大和持仓量最大 为同一个合约if next_change_yeah:# 前一个交易日满足 【主力合约要被切换,下一个交易日切换为新的合约】条件,# 检查今日新的合约是否依然是成交量和持仓量最大,如果不是,不切换新合约if pre_next_main_ticker == df_vol.iloc[0]['ticker'] and pre_next_main_deliYear == \df_vol.iloc[0]['deliYear']:# 切换新合约df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],df_vol.iloc[0]['tradeDate'])cur_main_ticker = pre_next_main_tickercur_main_deliYear = pre_next_main_deliYearnext_change_yeah = Falsepasselse:# 撤销昨日的 【主力合约要被切换,下一个交易日切换为新的合约】next_change_yeah = False# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 startpre_next_main_ticker = df_vol.iloc[0]['ticker']pre_next_main_deliYear = df_vol.iloc[0]['deliYear']if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:# 主力合约没有切换,延续使用df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])passelse:# 主力合约要被切换,下一个交易日切换为新的合约next_change_yeah = Truecur_df = group.loc[(group['ticker'] == cur_main_ticker) & (group['deliYear'] == cur_main_deliYear)].copy()df_main_new = pd.concat([df_main_new, cur_df])pass# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 endpasspasselse:# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 startpre_next_main_ticker = df_vol.iloc[0]['ticker']pre_next_main_deliYear = df_vol.iloc[0]['deliYear']if pre_next_main_ticker == cur_main_ticker and pre_next_main_deliYear == cur_main_deliYear:# 主力合约没有切换,延续使用df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])passelse:# 主力合约要被切换,下一个交易日切换为新的合约next_change_yeah = Truecur_df = group.loc[(group['ticker'] == cur_main_ticker) & (group['deliYear'] == cur_main_deliYear)].copy()df_main_new = pd.concat([df_main_new, cur_df])pass# ----------- 【当日成交量最大和持仓量最大 为同一个合约】 延续当前合约 endpasselse:# 当日成交量最大 和 持仓量最大 为不同的合约# 成交量最大 或 持仓量最大,这两者中有一个的合约是当前主力合约,该主力合约就延续if df_vol.iloc[0]['ticker'] == cur_main_ticker and df_vol.iloc[0]['deliYear'] == cur_main_deliYear:df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])elif df_inte.iloc[0]['ticker'] == cur_main_ticker and df_inte.iloc[0]['deliYear'] == cur_main_deliYear:df_main_new = pd.concat([df_main_new, df_inte.iloc[[0]]])else:# 当日成交量最大 和 持仓量最大 都不是当前主力合约,将主力合约切换为成交量最大的合约df_main_new = pd.concat([df_main_new, df_vol.iloc[[0]]])sqlite_tool.input_main_symbol(pro_code, df_vol.iloc[0]['ticker'], df_vol.iloc[0]['deliYear'],df_vol.iloc[0]['tradeDate'])cur_main_ticker = df_vol.iloc[0]['ticker']cur_main_deliYear = df_vol.iloc[0]['deliYear']passpasspasspassif len(df_main_new)<=0:self.thread_out_log(f'{pro_code},没有新增主力合约数据')passelse:df_main_new = df_main_new.loc[:,self.main_csv_column_list].copy()if not os.path.exists(csv_main_file_path):df_main_new.to_csv(csv_main_file_path, encoding='utf-8')passelse:df_one = df_one.loc[:, self.main_csv_column_list].copy()df_main_new00 = pd.concat([df_one, df_main_new])df_main_new00 = df_main_new00.loc[:, self.main_csv_column_list].copy()df_main_new00.to_csv(csv_main_file_path, encoding='utf-8')passpasspasspre_map = {'mark_str': self.mark_str_step_two,'data':None}self.signal_excute.emit(pre_map)passpassexcept Exception as e:pre_map = {'mark_str':self.mark_str_error,'data':traceback.format_exc()}self.signal_excute.emit(pre_map)passdef thread_out_log(self,msg:str):pre_map = {"mark_str":self.mark_str_log_str,"data":msg}self.signal_excute.emit(pre_map)passdef write_log(self,msg:str):now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')exist_str = self.log_textedit.toPlainText()pre_str = f"{now_str}::{msg}\n{exist_str}"self.log_textedit.setText(pre_str)passdef progress_init(self) -> None:self.caculate_progress.setValue(0)self.caculate_status_label.setText('无任务')def progress_busy(self) -> None:self.caculate_progress.setRange(0, 0)self.caculate_status_label.setText('正在执行')def progress_finished(self) -> None:self.caculate_progress.setRange(0, 100)self.caculate_progress.setValue(100)self.caculate_status_label.setText('执行完毕')passdef closeEvent(self, a0: typing.Optional[QtGui.QCloseEvent]) -> None:if self.thread and self.thread.isAlive():self.thread.join()self.thread = Noneself.close()passif __name__ == '__main__':QtCore.QCoreApplication.setAttribute(QtCore.Qt.HighDpiScaleFactorRoundingPolicy.PassThrough)app = QtWidgets.QApplication(sys.argv)main_window = DailyUpdateWidget()main_window.show()app.exec()pass