with open(path_dst, 'ab') as f:
1、表示文件如果存在,在文件末尾添加二进制数据;不存在会创建一个新的二进制文件用于追加数据
Config = CommonST()Create_string_buffer(config.size)If mode_extract:For row in df_data.intertuples()If row[-2]!=0:Config.struct.pack_into(buffer, 0, *row)F.write(buffer)
2、将数据写入二进制文件
2.1 class CommonST(CommonBase)
# 分笔数据csv合成秒/分钟二进制Def __init__(self):# 源csv文件列信息Self.index_drop = [0,1] + list(range(7,28))Self.columns_tick = [“NOW”,“CJBS”,”AMOUNT”,”VOLUME”]Self.index_time_tick = 2Self.index_now=3Self.index_cjbs=4Self.index_amt=5Self.index_vol = 6Self.dc_resample={‘closed’:’left’,’label’:‘right’}# 目标文件列信息Self.columns = [‘TIME’,‘OPEN’,‘HIGH’,‘LOW’,‘VOLUME’,‘AMOUNT’,‘CJBS’]Self.index_time=0Self.index_float=list(range(4))Self.index_int=list(range(4,7))# 二进制数据结构Self.setStruct(‘I’+4*‘f’+2*‘Q’+’I’)
3、csv转Tick数据
Df_data = df.copy()# 删除不需要的数据Df_data.drop(config.index_drop, axis = 1, inplace=True)# 调整收盘数据时间
t = df_data.loc[df_data.index[-1], config.index_time]If t[-8:] >‘15:00:00’:Df_data.loc[df_data.index[-1],config.index_time] = t[:-8] + ‘15:00:00’# 转换数据类型Df_data[config.index_time]=df_data[config.index_time].apply(Util.UtilDatetime.str2Timestamp).astype(unit32)Df_data[config.index_float] = df_data[config.index_float].astype(float32)Df_data[config.index_int] = df_data[config.index_int].apply(Util.UtilUnit.str2Uint32)
4、删除成交额为0的数据
Df_data = df_data[df_data[config.index_amt].astype(int) != 0]Df_data.reset_index(drop=True, inplace=True)
5、zip用法
Ls_n = [1,2,3]Ls_path_dst=[‘a’,‘b’,‘c’]Result = zip(ls_n, ls_path_dst)List(result) # 输出:[(1,‘a’),(2,‘b’),(3,‘c’)]
6、数字长度不足2位,则左侧填充0
f‘0930{n:0>2}’
7、调整开盘和收盘价格
t = Timestamp(df_data_copy.loc[0, config.index_time_tick])Dc_date = {‘year’: t.year,‘month’: t.month,‘day’:t.day}Time = Timestamp(hour=9,minute=30,second=0,**dc_date)
8、时间序列
Df_data_copy.set_index(config.index_time_tick, inplace=True)Df_data_copy.index = DatetimeIndex(df_data_copy.index)Df_data_copy.columns = config.columns_tick
9、重采样
Df_result[0] = df_data_copy.NOW.resample(**dc_resample).first()
10、增减数据
Time1 = Timestamp(hour=11, minute=30, second=0, **dc_date)Time2 = Timestamp(hour=13,minute=0,second=0,**dc_date)Df_result = df_result.loc[(df_result.index <= time1) | (df_result.index > time2)
11、补充缺失的价格:
For index in range(1, len(df_result)):If isnan(df_result.iloc[index, 3]):Last_close = df_result.iloc[index - 1,3]Df_result.iloc[index, list(range(4))] = last_close
12、转换数据类型:
Df_result.index = df_result.index.to_series().apply(Util.UtilDatetime.str2Timestamp).astype(uint32)Df_result[config.index_float] = df_result[config.index_float].astype(float32)Df_result[config.index_int] = df_result[config.index_int].astype(unit64)
13、合成1分钟数据
① 删除不需要的数据
② 删除成交额为0的数据
③ 获取上一个收盘价
④ 调整开盘数据时间
⑤ 调整收盘数据时间
⑥ 时间序列
⑦ 降采样
⑧ 增减数据
⑨ 补充缺失的价格
⑩ 转换数据类型
返回1T数据
14、
resampleChart(config, path_dst, df_1T, dc_resample):
① 降采样
② 整理数据类型
③ 返回nT数据
15、从1分钟合成n分钟
① 时间序列
② 合成
16、复权
① 全部计算
② 读取除权数据
Read_csv(path_pwr, header=None)
③ 计算除权价格
前复权、后复权、不复权
17、整理分笔csv数据
读取数据
① tick.csv -> tick.bin
分笔csv文件转换为二进制
② tick.csv -> 15S/30S/1T
分笔数据计算秒钟数据和1分钟数据
③ 1T -> nT
1分钟数据合成n分钟数据
④ with0_none -> with0_back/forth
删除成交额为0的数据
⑤ 除权数据
18、主程序
① 更新除权数据
PwrMgr().getPwr()
② 整理分笔数据
Path.join(PATH_FOLDER_TICK_TODAY,market)Listdir(path_folder_tick_today_market)TickProcessProcess(market, path_folder_tick_today_market,ls_file_tick_csv[i:i+300])Process.start()Ls_process.append(process)
③ 新数据替换旧数据
PwrMgr().replacePwr()
19、停止定时任务
# ============停止定时任务============
def produce_stop_bat(pid, name):cmd_stop = f'taskkill /pid {pid} /f' # 关闭指定进程cmd_del_self = 'del %0' # 删除自身文件bat_name = f'stop_{name}.bat'with open(bat_name, 'w') as f:f.write(f'{cmd_stop}\n{cmd_del_self}')pid = os.getpid()
myfilename = os.path.split(__file__)[-1].split(".")[0]
produce_stop_bat(pid, myfilename)
20、记录日志
# ============记录日志===============
log_dir = 'log' # 日志存放文件夹名称
log_path = os.path.join(os.getcwd(), log_dir)
if not os.path.isdir(log_path):os.makedirs(log_path)logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
log_time = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time()))
log_path = os.path.join(log_dir, f'{log_time}.log')
main_log_handler = logging.FileHandler(log_path, 'w+', encoding='utf8')
main_log_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
main_log_handler.setFormatter(formatter)
logger.addHandler(main_log_handler)
# ============记录日志===============
21、定时任务
# ============定时任务===============
file_autotask = os.path.join(os.getcwd(), 'autotask(代码).csv')
# print(file_autotask)
sec = 60
while True:datetime_now = datetime.now()weekday_now = str(datetime_now.weekday() + 1)# print(weekday_now)time_now = datetime_now.hour * 60 + datetime_now.minutewith open(file_autotask, 'r', encoding='utf8') as f:ls_row = [row.strip('\r\n').split(',') for row in f.readlines()]for task_name, task_date, task_time, task_path in ls_row:if weekday_now in str(task_date):# if len(weekday_now) != task_date:# print('\n[Warn]落库异常:' + str(len(task_date)) + '条\n')task_hour, task_min = task_time.split(':')task_time = int(task_hour) * 60 + int(task_min)if time_now == task_time:Popen(rf'python {task_path}')command = rf'python {task_path}'logger.info(f'执行 {task_name}: {command}')time.sleep(sec)
# ============定时任务===============
22、不同模块导入方法
# CommonUtil模块位于D:\1lqm\lqm_test\
module_path = 'D:\\1lqm\\lqm_test\\'
if module_path not in sys.path:sys.path.append(module_path)
from CommonUtil.dateUtil import DateHandler