使用clickhouse-connect库并指定列类型进行插入到clickhouse中

news/2026/1/22 22:00:57/文章来源:https://www.cnblogs.com/love-DanDan/p/19519142

使用clickhouse-connect库并指定列类型进行插入到clickhouse中

以前一直自己构造sql语句进行写入,现在使用参数化插入/orm进行写入,比较方便的是,无需理会一些乱七八糟的符号问题导致的插入失败.

#!/usr/bin/python3
# -*- coding: utf-8 -*-'''
-- default.historydata_021002 definition
CREATE TABLE default.historydata_021002
(`id` UInt64,`create_time` DateTime DEFAULT now() COMMENT '入库时间',`acquisition_time` DateTime COMMENT '监测时间',`device_code` String COMMENT '测点id',`phase` String COMMENT '相别',`city_id` String COMMENT '地市id',`station_ast_id` String COMMENT '变电站资产id',`linkedDeviceAstId` Nullable(String) COMMENT '被监测设备资产id',`linkedDevicePsrId` Nullable(String) COMMENT '被监测设备资源id',`ch4` Nullable(Float32) COMMENT '甲烷',`c2h2` Nullable(Float32) COMMENT '乙炔',`c2h4` Nullable(Float32) COMMENT '乙烯',`c2h6` Nullable(Float32) COMMENT '乙烷',`co` Nullable(Float32) COMMENT '一氧化碳',`co2` Nullable(Float32) COMMENT '二氧化碳',`n2` Nullable(Float32) COMMENT '氮气',`h2` Nullable(Float32) COMMENT '氢气',`o2` Nullable(Float32) COMMENT '氧气',`total_hydrocarbons` Nullable(Float32) COMMENT '总烃',`sampling_time` Nullable(DateTime64(0)) COMMENT '采集时间',`detection_time` Nullable(DateTime64(0)) COMMENT '检测时间'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(acquisition_time)
PRIMARY KEY (acquisition_time,device_code)
ORDER BY (acquisition_time,device_code)
TTL acquisition_time + toIntervalYear(1)
SETTINGS index_granularity = 8192
COMMENT '油中溶解气体装置的历史数据表,一年有效期';
-- default.historydata_023001 definition
CREATE TABLE default.historydata_023001
(`id` UInt64,`create_time` DateTime DEFAULT now() COMMENT '入库时间',`acquisition_time` DateTime COMMENT '监测时间',`device_code` String COMMENT '测点id',`phase` String COMMENT '相别',`city_id` String COMMENT '地市id',`station_ast_id` String COMMENT '变电站资产id',`linkedDeviceAstId` Nullable(String) COMMENT '被监测设备资产id',`linkedDevicePsrId` Nullable(String) COMMENT '被监测设备资源id',`temperature` Nullable(Float32) COMMENT '温度',`third_harmonic_current` Nullable(Float32) COMMENT '三次谐波电流',`total_current` Nullable(Float32) COMMENT '全电流',`last_time_lightn_strike` Nullable(DateTime64(0)) COMMENT '最后一次雷击时间',`humidity` Nullable(Float32) COMMENT '湿度',`resistive_current` Nullable(Float32) COMMENT '阻性电流',`cumulative_number` Nullable(Float32) COMMENT '累计动作次数',`leakage_current` Nullable(Float32) COMMENT '泄漏电流',`system_voltage` Nullable(Float32) COMMENT '系统电压'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(acquisition_time)
PRIMARY KEY (acquisition_time,device_code)
ORDER BY (acquisition_time,device_code)
TTL acquisition_time + toIntervalYear(1)
SETTINGS index_granularity = 8192
COMMENT '金属氧化物避雷器绝缘监测装置的历史数据表,一年有效期';
-- default.historydata_024001 definition
CREATE TABLE default.historydata_024001
(`id` UInt64,`create_time` DateTime DEFAULT now() COMMENT '入库时间',`acquisition_time` DateTime COMMENT '监测时间',`device_code` String COMMENT '测点id',`phase` String COMMENT '相别',`city_id` String COMMENT '地市id',`station_ast_id` String COMMENT '变电站资产id',`linkedDeviceAstId` Nullable(String) COMMENT '被监测设备资产id',`linkedDevicePsrId` Nullable(String) COMMENT '被监测设备资源id',`discharge_quantity` Nullable(Float32) COMMENT '放电量',`alarm_type` Nullable(Float32) COMMENT '报警类型',`alarm_level` Nullable(Float32) COMMENT '报警级别',`max_discharge_quantity` Nullable(Float32) COMMENT '最大放电量',`number_of_pulses` Nullable(Float32) COMMENT '脉冲个数',`aver_discharge_rate` Nullable(Float32) COMMENT '平均放电率',`mean_discharge` Nullable(Float32) COMMENT '放电均值',`peak_discharge` Nullable(Float32) COMMENT '放电峰值'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(acquisition_time)
PRIMARY KEY (acquisition_time,device_code)
ORDER BY (acquisition_time,device_code)
TTL acquisition_time + toIntervalYear(1)
SETTINGS index_granularity = 8192
COMMENT 'GIS局部放电装置的历史数据表,一年有效期';
-- default.historydata_024004 definition
CREATE TABLE default.historydata_024004
(`id` UInt64,`create_time` DateTime DEFAULT now() COMMENT '入库时间',`acquisition_time` DateTime COMMENT '监测时间',`device_code` String COMMENT '测点id',`phase` String COMMENT '相别',`city_id` String COMMENT '地市id',`station_ast_id` String COMMENT '变电站资产id',`linkedDeviceAstId` Nullable(String) COMMENT '被监测设备资产id',`linkedDevicePsrId` Nullable(String) COMMENT '被监测设备资源id',`temperature` Nullable(Float32) COMMENT '温度',`pressure` Nullable(Float32) COMMENT '压力',`density` Nullable(Float32) COMMENT '密度',`abs_pressure` Nullable(Float32) COMMENT '绝对压力'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(acquisition_time)
PRIMARY KEY (acquisition_time,device_code)
ORDER BY (acquisition_time,device_code)
TTL acquisition_time + toIntervalYear(1)
SETTINGS index_granularity = 8192
COMMENT 'SF6气体压力装置的历史数据表,一年有效期';
-- default.historydata_026007 definition
CREATE TABLE default.historydata_026007
(`id` UInt64,`create_time` DateTime DEFAULT now() COMMENT '入库时间',`acquisition_time` DateTime COMMENT '监测时间',`device_code` String COMMENT '测点id',`phase` String COMMENT '相别',`city_id` String COMMENT '地市id',`station_ast_id` String COMMENT '变电站资产id',`linkedDeviceAstId` Nullable(String) COMMENT '被监测设备资产id',`linkedDevicePsrId` Nullable(String) COMMENT '被监测设备资源id',`a_upper_temp` Nullable(Float32) COMMENT 'A相上触头温度',`a_lower_temp` Nullable(Float32) COMMENT 'A相下触头温度',`a_cablehead_temp` Nullable(Float32) COMMENT 'A相电缆头温度',`b_upper_temp` Nullable(Float32) COMMENT 'B相上触头温度',`b_lower_temp` Nullable(Float32) COMMENT 'B相下触头温度',`b_cablehead_temp` Nullable(Float32) COMMENT 'B相电缆头温度',`c_upper_temp` Nullable(Float32) COMMENT 'C相上触头温度',`c_lower_temp` Nullable(Float32) COMMENT 'C相下触头温度',`c_cablehead_temp` Nullable(Float32) COMMENT 'C相电缆头温度',`primary_current` Nullable(Float32) COMMENT '一次电流',`instrument_bin_temp` Nullable(Float32) COMMENT '仪表仓温度',`instrument_bin_humidity` Nullable(Float32) COMMENT '仪表仓湿度',`ground_wave_amplitude` Nullable(Float32) COMMENT '地电波幅值',`partial_discharge_amplitude` Nullable(Float32) COMMENT '局放通道幅值',`partial_discharge_energy` Nullable(Float32) COMMENT '局放通道总能量',`partial_discharge_frequency` Nullable(Float32) COMMENT '局放通道频次',`fault_interruption_count` Nullable(Float32) COMMENT '断路器开断故障电流次数',`interruption_count` Nullable(Float32) COMMENT '断路器开断次数',`cable_bin_temp` Nullable(Float32) COMMENT '电缆仓温度',`cable_bin_humidity` Nullable(Float32) COMMENT '电缆仓湿度',`ultrasonic_amplitude` Nullable(Float32) COMMENT '超声幅值'
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(acquisition_time)
PRIMARY KEY (acquisition_time,device_code)
ORDER BY (acquisition_time,device_code)
TTL acquisition_time + toIntervalYear(1)
SETTINGS index_granularity = 8192
COMMENT '开关柜综合状态监测装置的历史数据表,一年有效期';'''import logging
import logging.handlers
import os
import json
import time
import csv
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
import threadingfrom kafka import KafkaConsumer
from kafka.structs import TopicPartition,OffsetAndMetadata
import clickhouse_connect@dataclass
class DeviceConfig:"""设备配置类"""device_type: strch_table: strcsv_file_prefix: strfield_mapping: Dict[str, str]  # 原始字段名 -> CSV字段名(监测字段部分)csv_schema: Dict[str, str]      # CSV字段名 -> 数据类型(监测字段部分)data_fields: Dict[str, str]     # 所有入库字段映射(原始字段名 -> CSV字段名)class MultiDeviceConsumer:def __init__(self):"""初始化消费者"""# 直接使用配置字典(先定义配置,再初始化日志)self.config = {"logging": {"level": "INFO"  # 可选: DEBUG, INFO, WARNING, ERROR, CRITICAL
            },"kafka": {"bootstrap_servers": os.environ.get("KAFKA_SERVERS", "22.62.96.52:19092"), # 通过环境变量配置"group_id": os.environ.get("KAFKA_GROUP_ID", "jx_KafkaConsumer_data_gyj"),"auto_offset_reset": "earliest","enable_auto_commit": False,"topics": os.environ.get("KAFKA_TOPICS", "SBTD_TO_JKZ_16F37FCECE6E0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC28C0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBD350ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBC7A0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCECD8D0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBFCF0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC5730ED1E0530100007F2F45,SBTD_TO_JKZ_ALARM_16F37FCECE6E0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC7000ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBEB30ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCECEFB0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCECA990ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBB1F0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC0FF0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC5D40ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBE1F0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC1CD0ED1E0530100007F2F45,SBTD_TO_JKZ_ALARM_16F37FCEC28C0ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC6910ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEBB8A0ED1E0530100007F2F45,SBTD_TO_JKZ_ALARM_16F37FCEC5730ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC44F0ED1E0530100007F2F45,SBTD_TO_JKZ_ALARM_16F37FCEC7000ED1E0530100007F2F45,SBTD_TO_JKZ_ALARM_16F37FCEBD350ED1E0530100007F2F45,SBTD_TO_JKZ_16F37FCEC8220ED1E0530100007F2F45").split(",")},"clickhouse": {"host": os.environ.get("CK_HOST", "jx_clickhouse"),"port": int(os.environ.get("CK_PORT", 31200)),"username": os.environ.get("CK_USER", "default"),"password": os.environ.get("CK_PASSWORD", "cK_Jx!2026"),"database": os.environ.get("CK_DATABASE", "default"),"secure": os.environ.get("CK_SECURE", "false").lower() == "true"},"csv_output_dir": "./csv_output","device_data_conf": {"021017": {"clickhouse_table": "historydata_021017","csv_file_prefix": "historydata_021017","field_mapping": {  # 监测字段映射"表压": "oilpressure","AB相压差": "pab","BC相压差": "pbc","AC相压差": "pac","环境温度": "temperature","大气压": "airpressure","温压传感器数据异常": "ta_fault","压力传感器数据异常": "px_fault","设备压力异常": "dev_fault","汇集装置异常": "con_fault","设备压力异常类型": "fault_type"},"csv_schema": {  # csv文件字段类型"oilpressure": "Float32","pab": "Float32","pbc": "Float32","pac": "Float32","temperature": "Float32","airpressure": "Float32","ta_fault": "UInt8","px_fault": "UInt8","dev_fault": "UInt8","con_fault": "UInt8","fault_type": "UInt8"},"data_fields": {  # 所有入库字段映射"表压": "oilpressure","AB相压差": "pab","BC相压差": "pbc","AC相压差": "pac","环境温度": "temperature","大气压": "airpressure","温压传感器数据异常": "ta_fault","压力传感器数据异常": "px_fault","设备压力异常": "dev_fault","汇集装置异常": "con_fault","设备压力异常类型": "fault_type","创建时间": "create_time","监测时间": "acquisition_time",  # 公共字段acquisitionTime"测点ID": "device_code",  # 公共字段deviceCode"相别": "phase",  # 公共字段phase"城市ID": "city_id",  # 公共字段cityId"变电站资产id": "station_ast_id",  # 公共字段stationAstId"被监测设备资产id": "linkedDeviceAstId",  # 公共字段linkedDeviceAstId"被监测设备压力传感器id": "linkedDevicePsrId",  # 公共字段linkedDevicePsrId"id":"id",}},"021002": {"clickhouse_table": "historydata_021002","csv_file_prefix": "historydata_021002","field_mapping": {"氢气": "h2","甲烷": "ch4","乙烷": "c2h6","乙烯": "c2h4","乙炔": "c2h2","一氧化碳": "co","二氧化碳": "co2","氮气": "n2","氧气": "o2","总烃": "total_hydrocarbons","采集时间": "sampling_time","检测时间": "detection_time"},"csv_schema": {"h2": "Float32","ch4": "Float32","c2h6": "Float32","c2h4": "Float32","c2h2": "Float32","co": "Float32","co2": "Float32","n2": "Float32","o2": "Float32","total_hydrocarbons": "Float32",},"data_fields": {"氢气": "h2","甲烷": "ch4","乙烷": "c2h6","乙烯": "c2h4","乙炔": "c2h2","一氧化碳": "co","二氧化碳": "co2","氮气": "n2","氧气": "o2","总烃": "total_hydrocarbons","采集时间": "sampling_time","检测时间": "detection_time","创建时间": "create_time","监测时间": "acquisition_time","测点ID": "device_code","相别": "phase","城市ID": "city_id","变电站资产id": "station_ast_id","被监测设备资产id": "linkedDeviceAstId","被监测设备资源id": "linkedDevicePsrId","id":"id",}},"024004": {"clickhouse_table": "historydata_024004","csv_file_prefix": "historydata_024004","field_mapping": {"压力值": "pressure","温度值": "temperature","密度": "density","绝对压力": "abs_pressure"},"csv_schema": {"temperature": "Float32","pressure": "Float32","density": "Float32","abs_pressure": "Float32"},"data_fields": {"压力值": "pressure","温度值": "temperature","密度": "density","绝对压力": "abs_pressure","创建时间": "create_time","监测时间": "acquisition_time","测点ID": "device_code","相别": "phase","城市ID": "city_id","变电站资产id": "station_ast_id","被监测设备资产id": "linkedDeviceAstId","被监测设备资源id": "linkedDevicePsrId","id":"id",}},"026007": {"clickhouse_table": "historydata_026007","csv_file_prefix": "historydata_026007","field_mapping": {"A相上触头温度": "a_upper_temp","A相下触头温度": "a_lower_temp","A相电缆头温度": "a_cablehead_temp","B相上触头温度": "b_upper_temp","B相下触头温度": "b_lower_temp","B相电缆头温度": "b_cablehead_temp","C相上触头温度": "c_upper_temp","C相下触头温度": "c_lower_temp","C相电缆头温度": "c_cablehead_temp","一次电流": "primary_current","仪表仓温度": "instrument_bin_temp","仪表仓湿度": "instrument_bin_humidity","地电波幅值": "ground_wave_amplitude","局放通道幅值": "partial_discharge_amplitude","局放通道总能量": "partial_discharge_energy","局放通道频次": "partial_discharge_frequency","断路器开断故障电流次数": "fault_interruption_count","断路器开断次数": "interruption_count","电缆仓温度": "cable_bin_temp","电缆仓湿度": "cable_bin_humidity","超声幅值": "ultrasonic_amplitude"},"csv_schema": {"a_upper_temp": "Float32","a_lower_temp": "Float32","a_cablehead_temp": "Float32","b_upper_temp": "Float32","b_lower_temp": "Float32","b_cablehead_temp": "Float32","c_upper_temp": "Float32","c_lower_temp": "Float32","c_cablehead_temp": "Float32","primary_current": "Float32","instrument_bin_temp": "Float32","instrument_bin_humidity": "Float32","ground_wave_amplitude": "Float32","partial_discharge_amplitude": "Float32","partial_discharge_energy": "Float32","partial_discharge_frequency": "Float32","fault_interruption_count": "Float32","interruption_count": "Float32","cable_bin_temp": "Float32","cable_bin_humidity": "Float32","ultrasonic_amplitude": "Float32"},"data_fields": {"A相上触头温度": "a_upper_temp","A相下触头温度": "a_lower_temp","A相电缆头温度": "a_cablehead_temp","B相上触头温度": "b_upper_temp","B相下触头温度": "b_lower_temp","B相电缆头温度": "b_cablehead_temp","C相上触头温度": "c_upper_temp","C相下触头温度": "c_lower_temp","C相电缆头温度": "c_cablehead_temp","一次电流": "primary_current","仪表仓温度": "instrument_bin_temp","仪表仓湿度": "instrument_bin_humidity","地电波幅值": "ground_wave_amplitude","局放通道幅值": "partial_discharge_amplitude","局放通道总能量": "partial_discharge_energy","局放通道频次": "partial_discharge_frequency","断路器开断故障电流次数": "fault_interruption_count","断路器开断次数": "interruption_count","电缆仓温度": "cable_bin_temp","电缆仓湿度": "cable_bin_humidity","超声幅值": "ultrasonic_amplitude","创建时间": "create_time","监测时间": "acquisition_time","测点ID": "device_code","相别": "phase","城市ID": "city_id","变电站资产id": "station_ast_id","被监测设备资产id": "linkedDeviceAstId","被监测设备资源id": "linkedDevicePsrId","id":"id",}},"024001": {"clickhouse_table": "historydata_024001","csv_file_prefix": "historydata_024001","field_mapping": {"放电量": "discharge_quantity","报警类型": "alarm_type","报警级别": "alarm_level","最大放电量": "max_discharge_quantity","脉冲个数": "number_of_pulses","平均放电率": "aver_discharge_rate","放电均值": "mean_discharge","放电峰值": "peak_discharge"},"csv_schema": {"discharge_quantity": "Float32","alarm_type": "Float32","alarm_level": "Float32","max_discharge_quantity": "Float32","number_of_pulses": "Float32","aver_discharge_rate": "Float32","mean_discharge": "Float32","peak_discharge": "Float32"},"data_fields": {"放电量": "discharge_quantity","报警类型": "alarm_type","报警级别": "alarm_level","最大放电量": "max_discharge_quantity","脉冲个数": "number_of_pulses","平均放电率": "aver_discharge_rate","放电均值": "mean_discharge","放电峰值": "peak_discharge","创建时间": "create_time","监测时间": "acquisition_time","测点ID": "device_code","相别": "phase","城市ID": "city_id","变电站资产id": "station_ast_id","被监测设备资产id": "linkedDeviceAstId","被监测设备资源id": "linkedDevicePsrId","id":"id",}},"023001": {"clickhouse_table": "historydata_023001","csv_file_prefix": "historydata_023001","field_mapping": {"温度": "temperature","三次谐波电流": "third_harmonic_current","全电流": "total_current","最后一次雷击时间": "last_time_lightn_strike","湿度": "humidity","阻性电流": "resistive_current","累计动作次数": "cumulative_number","泄漏电流": "leakage_current","系统电压": "system_voltage"},"csv_schema": {"temperature": "Float32","third_harmonic_current": "Float32","total_current": "Float32","humidity": "Float32","resistive_current": "Float32","cumulative_number": "Float32","leakage_current": "Float32","system_voltage": "Float32"},"data_fields": {"温度": "temperature","三次谐波电流": "third_harmonic_current","全电流": "total_current","最后一次雷击时间": "last_time_lightn_strike","湿度": "humidity","阻性电流": "resistive_current","累计动作次数": "cumulative_number","泄漏电流": "leakage_current","系统电压": "system_voltage","创建时间": "create_time","监测时间": "acquisition_time","测点ID": "device_code","相别": "phase","城市ID": "city_id","变电站资产id": "station_ast_id","被监测设备资产id": "linkedDeviceAstId","被监测设备资源id": "linkedDevicePsrId","id":"id",}}},"ck_table_col_type":{# "021017":{},"021002":{'h2':'Nullable(Float32)','ch4':'Nullable(Float32)','c2h6':'Nullable(Float32)','c2h4':'Nullable(Float32)','c2h2':'Nullable(Float32)','co':'Nullable(Float32)','co2':'Nullable(Float32)','n2':'Nullable(Float32)','o2':'Nullable(Float32)','total_hydrocarbons':'Nullable(Float32)','sampling_time':'Nullable(DateTime64(0))','detection_time':'Nullable(DateTime64(0))','create_time':'DateTime','acquisition_time':'DateTime','device_code':'String','phase':'String','city_id':'String','station_ast_id':'String','linkedDeviceAstId':'Nullable(String)','linkedDevicePsrId':'Nullable(String)','id':'UInt64',},"024004":{'pressure':'Nullable(Float32)','temperature':'Nullable(Float32)','density':'Nullable(Float32)','abs_pressure':'Nullable(Float32)','create_time':'DateTime','acquisition_time':'DateTime','device_code':'String','phase':'String','city_id':'String','station_ast_id':'String','linkedDeviceAstId':'Nullable(String)','linkedDevicePsrId':'Nullable(String)','id':'UInt64',},"026007":{'a_upper_temp':'Nullable(Float32)','a_lower_temp':'Nullable(Float32)','a_cablehead_temp':'Nullable(Float32)','b_upper_temp':'Nullable(Float32)','b_lower_temp':'Nullable(Float32)','b_cablehead_temp':'Nullable(Float32)','c_upper_temp':'Nullable(Float32)','c_lower_temp':'Nullable(Float32)','c_cablehead_temp':'Nullable(Float32)','primary_current':'Nullable(Float32)','instrument_bin_temp':'Nullable(Float32)','instrument_bin_humidity':'Nullable(Float32)','ground_wave_amplitude':'Nullable(Float32)','partial_discharge_amplitude':'Nullable(Float32)','partial_discharge_energy':'Nullable(Float32)','partial_discharge_frequency':'Nullable(Float32)','fault_interruption_count':'Nullable(Float32)','interruption_count':'Nullable(Float32)','cable_bin_temp':'Nullable(Float32)','cable_bin_humidity':'Nullable(Float32)','ultrasonic_amplitude':'Nullable(Float32)','create_time':'DateTime','acquisition_time':'DateTime','device_code':'String','phase':'String','city_id':'String','station_ast_id':'String','linkedDeviceAstId':'Nullable(String)','linkedDevicePsrId':'Nullable(String)','id':'UInt64',},"024001":{'discharge_quantity':'Nullable(Float32)','alarm_type':'Nullable(Float32)','alarm_level':'Nullable(Float32)','max_discharge_quantity':'Nullable(Float32)','number_of_pulses':'Nullable(Float32)','aver_discharge_rate':'Nullable(Float32)','mean_discharge':'Nullable(Float32)','peak_discharge':'Nullable(Float32)','create_time':'DateTime','acquisition_time':'DateTime','device_code':'String','phase':'String','city_id':'String','station_ast_id':'String','linkedDeviceAstId':'Nullable(String)','linkedDevicePsrId':'Nullable(String)','id':'UInt64',},"023001":{'temperature':'Nullable(Float32)','third_harmonic_current':'Nullable(Float32)','total_current':'Nullable(Float32)','last_time_lightn_strike':'Nullable(DateTime64(0))','humidity':'Nullable(Float32)','resistive_current':'Nullable(Float32)','cumulative_number':'Nullable(Float32)','leakage_current':'Nullable(Float32)','system_voltage':'Nullable(Float32)','create_time':'DateTime','acquisition_time':'DateTime','device_code':'String','phase':'String','city_id':'String','station_ast_id':'String','linkedDeviceAstId':'Nullable(String)','linkedDevicePsrId':'Nullable(String)','id':'UInt64',},},}# 初始化日志(在 config 定义之后)self.logger = self._setup_logger()# CSV文件句柄缓存self.csv_handles = {}self.csv_writers = {}# CSV输出目录self.csv_output_dir = self.config.get("csv_output_dir", "./csv_output")Path(self.csv_output_dir).mkdir(parents=True, exist_ok=True)# ClickHouse 相关self.ch_client = Noneself.ch_batch_data: Dict[str, List[Dict[str, Any]]] = {}self.ch_lock = threading.Lock()self.ch_timer: Optional[threading.Timer] = Noneself.ch_running = Trueself._init_clickhouse_connection()# 公共字段映射(从Kafka消息字段名到CSV字段名)self.common_fields = {"cityId": "city_id","cityName": "city_name","stationName": "station_name","stationPsrId": "station_psr_id","stationAstId": "station_ast_id","linkedName": "linked_name","linkedDevicePsrId": "linkedDevicePsrId","linkedDeviceAstId": "linkedDeviceAstId","deviceCode": "device_code","deviceType": "device_type","phase": "phase","acquisitionTime": "acquisition_time","pushTime": "push_time","type": "data_type","id": "id"}# 加载设备配置self.device_configs = self._load_device_configs()def _init_clickhouse_connection(self):"""初始化 ClickHouse 连接"""try:ch_cfg = self.config.get("clickhouse", {})self.ch_client = clickhouse_connect.get_client(host=ch_cfg.get("host", "127.0.0.1"),port=ch_cfg.get("port", 8123),username=ch_cfg.get("username", "default"),password=ch_cfg.get("password", ""),database=ch_cfg.get("database", "default"),secure=ch_cfg.get("secure", False))self.logger.info("ClickHouse 连接成功")# 启动 3 秒定时批量写入
            self._start_batch_insert_timer()except Exception as e:self.logger.error(f"ClickHouse 连接失败: {e}", exc_info=True)self.ch_client = Nonedef _start_batch_insert_timer(self):"""启动定时批量写入任务(每 3 秒)"""if not self.ch_running:returntry:self._batch_insert_to_clickhouse()except Exception as e:self.logger.error(f"定时批量写入失败: {e}", exc_info=True)# 下一轮if self.ch_running:self.ch_timer = threading.Timer(3.0, self._start_batch_insert_timer)self.ch_timer.daemon = Trueself.ch_timer.start()def _batch_insert_to_clickhouse(self):"""批量写入缓存数据到 ClickHouse"""if not self.ch_client:returnwith self.ch_lock:# 遍历各设备类型缓冲for device_type, rows in list(self.ch_batch_data.items()):if not rows:continuetry:config = self.device_configs.get(device_type)if not config:self.logger.warning(f"设备类型 {device_type} 配置不存在, 跳过写入")continuetable_name = config.ch_tablefieldnames = self._get_fieldnames(config)# 构建数据矩阵data_matrix = [[row.get(col) for col in fieldnames] for row in rows]# 构建数据列类型矩阵col_type_list = [self.config.get('ck_table_col_type').get(device_type).get(col) for col in fieldnames]# if(device_type=='021002'):#     for data in data_matrix:#         try:#             self.ch_client.insert(table_name, [data], column_names=fieldnames,column_type_names=col_type_list)#         except Exception as e:#             self.logger.error(f"写入表 {table_name} 失败: {e}\nfieldnames: {fieldnames}\ndata:{data}", exc_info=True)# else:#     # 插入#     self.ch_client.insert(table_name, data_matrix, column_names=fieldnames,column_type_names=col_type_list)self.ch_client.insert(table_name, data_matrix, column_names=fieldnames,column_type_names=col_type_list)self.logger.info(f"成功写入 {len(rows)} 条数据到表 {table_name}")# 清空该类型缓冲self.ch_batch_data[device_type] = []except Exception as e:self.logger.error(f"写入表 {table_name} 失败: {e}\nfieldnames: {fieldnames}\ndata_matrix:{data_matrix}", exc_info=True)def _add_to_batch(self, device_type: str, row: Dict[str, Any]):"""添加一行到批量缓冲"""with self.ch_lock:if device_type not in self.ch_batch_data:self.ch_batch_data[device_type] = []self.ch_batch_data[device_type].append(row)def _setup_logger(self) -> logging.Logger:"""设置日志 - 输出到 stdout/stderr(容器化部署)"""logger = logging.getLogger('multi_device_consumer')# 从 config 中读取日志级别,默认为 DEBUGlog_level_str = self.config.get("logging", {}).get("level", "DEBUG").upper()log_level = getattr(logging, log_level_str, logging.DEBUG)logger.setLevel(log_level)# 避免重复处理器if logger.handlers:return loggerformatter = logging.Formatter('%(asctime)s - %(levelname)s - %(lineno)s - %(message)s')# 仅使用控制台处理器,直接输出到 stdout/stderrstream_handler = logging.StreamHandler()stream_handler.setLevel(log_level)stream_handler.setFormatter(formatter)logger.addHandler(stream_handler)return loggerdef _load_device_configs(self) -> Dict[str, DeviceConfig]:"""从config字典加载设备配置"""device_configs = {}try:device_data_conf = self.config.get("device_data_conf", {})for device_type, device_info in device_data_conf.items():device_configs[device_type] = DeviceConfig(device_type=device_type,ch_table=device_info.get("clickhouse_table", f"historydata_{device_type}"),csv_file_prefix=device_info.get("csv_file_prefix", f"historydata_{device_type}"),field_mapping=device_info.get("field_mapping", {}),csv_schema=device_info.get("csv_schema", {}),data_fields=device_info.get("data_fields", {}))self.logger.info(f"成功加载 {len(device_configs)} 种设备配置")except Exception as e:self.logger.error(f"加载设备配置失败: {e}")# 如果加载失败,返回空字典device_configs = {}return device_configsdef _get_csv_file_path(self, device_type: str) -> str:"""获取CSV文件路径"""today = datetime.now().strftime("%Y%m%d")config = self.device_configs.get(device_type)if not config:return os.path.join(self.csv_output_dir, f"unknown_{today}.csv")filename = f"{config.csv_file_prefix}_{today}.csv"return os.path.join(self.csv_output_dir, filename)def _get_csv_writer(self, device_type: str):"""获取CSV写入器"""if device_type in self.csv_writers:return self.csv_writers[device_type]csv_file_path = self._get_csv_file_path(device_type)config = self.device_configs.get(device_type)if not config:self.logger.error(f"设备类型 {device_type} 的配置不存在")return None# 检查文件是否存在,如果不存在则创建并写入表头file_exists = os.path.exists(csv_file_path)try:csv_file = open(csv_file_path, 'a', newline='', encoding='utf-8')fieldnames=self._get_fieldnames(config)# self.logger.info(f"device_type: {device_type}\tfieldnames: {fieldnames}")writer = csv.DictWriter(csv_file, fieldnames=fieldnames)# 如果文件不存在,写入表头if not file_exists:writer.writeheader()self.logger.info(f"创建CSV文件: {csv_file_path}")# 缓存文件句柄和写入器self.csv_handles[device_type] = csv_fileself.csv_writers[device_type] = writerreturn writerexcept Exception as e:self.logger.error(f"打开CSV文件失败: {e}")return Nonedef _get_fieldnames(self, config: DeviceConfig) -> List[str]:"""获取CSV字段名列表 - 使用data_fields中定义的CSV字段名"""# 获取data_fields中的所有CSV字段名(英文)csv_fieldnames = []# 添加data_fields中定义的所有字段(英文)for csv_field in config.data_fields.values():if csv_field not in csv_fieldnames:csv_fieldnames.append(csv_field)return csv_fieldnamesdef _close_csv_files(self):"""关闭所有CSV文件"""for device_type, csv_file in self.csv_handles.items():try:csv_file.close()self.logger.info(f"关闭CSV文件: {device_type}")except Exception as e:self.logger.error(f"关闭CSV文件失败: {e}")self.csv_handles.clear()self.csv_writers.clear()def _rotate_csv_files(self):"""轮换CSV文件(每天或定期)"""self._close_csv_files()def _process_021017(self, data: Dict, sensor_data: Dict) -> Dict:"""处理021017类型数据(少油监测)"""processed_data = {}config = self.device_configs.get("021017")if not config:return processed_data# 处理监测数据字段for src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]# 类型转换if csv_field in config.csv_schema:data_type = config.csv_schema[csv_field]try:if "Float" in data_type:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)elif "UInt" in data_type:processed_data[csv_field] = int(value) if not isinstance(value, str) else int(value)else:processed_data[csv_field] = valueexcept (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _process_021002(self, data: Dict, sensor_data: Dict) -> Dict:"""处理021002类型数据(油中溶解气体)"""processed_data = {}config = self.device_configs.get("021002")if not config:return processed_datafor src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]if(src_field in ('采集时间','检测时间')):try:value = datetime.strptime(value,'%Y-%m-%d %H:%M:%S')except Exception as e:value = Nonefinally:processed_data[csv_field]=valueelse:try:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)except (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _process_024004(self, data: Dict, sensor_data: Dict) -> Dict:"""处理024004类型数据(SF6气体压力)"""processed_data = {}config = self.device_configs.get("024004")if not config:return processed_datafor src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]try:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)except (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _process_026007(self, data: Dict, sensor_data: Dict) -> Dict:"""处理026007类型数据(开关柜综合状态监测)"""processed_data = {}config = self.device_configs.get("026007")if not config:return processed_datafor src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]try:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)except (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _process_024001(self, data: Dict, sensor_data: Dict) -> Dict:"""处理024001类型数据(GIS局部放电)"""processed_data = {}config = self.device_configs.get("024001")if not config:return processed_datafor src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]try:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)except (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _process_023001(self, data: Dict, sensor_data: Dict) -> Dict:"""处理023001类型数据(金属氧化物避雷器绝缘监测)"""processed_data = {}config = self.device_configs.get("023001")if not config:return processed_datafor src_field, csv_field in config.field_mapping.items():if src_field in sensor_data:value = sensor_data[src_field]if(src_field in ('最后一次雷击时间')):try:value = datetime.strptime(value,'%Y-%m-%d %H:%M:%S')except Exception as e:value = Nonefinally:processed_data[csv_field]=valuetry:processed_data[csv_field] = float(value) if not isinstance(value, str) else float(value)except (ValueError, TypeError):processed_data[csv_field] = Noneelse:processed_data[csv_field] = Nonereturn processed_datadef _extract_common_fields(self, data: Dict) -> Dict:"""提取公共字段"""processed_data = {}# 处理公共字段for src_field, csv_field in self.common_fields.items():if src_field in data:processed_data[csv_field] = data[src_field]else:processed_data[csv_field] = Nonereturn processed_datadef process_data(self, data: Dict) -> bool:"""处理单条数据并写入CSV:param data: Kafka消息数据:return: 是否处理成功"""try:# 获取设备类型device_type = data.get("deviceType")if not device_type:self.logger.warning("消息中缺少deviceType字段")return False# 检查是否支持该设备类型if device_type not in self.device_configs:self.logger.debug(f"不支持的设备类型: {device_type}")return Falseconfig = self.device_configs[device_type]# 解析传感器数据sensor_data = {}try:data_str = data.get("data", "{}")if isinstance(data_str, str):sensor_data = json.loads(data_str)elif isinstance(data_str, dict):sensor_data = data_strexcept Exception as e:self.logger.warning(f"解析传感器数据失败: {e}")sensor_data = {}# 根据设备类型调用对应的处理函数processed_data = {}if device_type == "021017":# processed_data = self._process_021017(data, sensor_data)# 没有对少油建表。return Trueelif device_type == "021002":processed_data = self._process_021002(data, sensor_data)elif device_type == "024004":processed_data = self._process_024004(data, sensor_data)elif device_type == "026007":processed_data = self._process_026007(data, sensor_data)elif device_type == "024001":processed_data = self._process_024001(data, sensor_data)elif device_type == "023001":processed_data = self._process_023001(data, sensor_data)else:# self.logger.warning(f"未实现设备类型 {device_type} 的处理函数")return False# 提取公共字段common_data = self._extract_common_fields(data)processed_data.update(common_data)# 添加额外字段# processed_data["data_id"] = str(uuid.uuid4())processed_data["create_time"] = datetime.now()# 确保所有字段都存在fieldnames = self._get_fieldnames(config)final_row = {}# 使用data_fields映射构建最终的行数据for src_field, csv_field in config.data_fields.items():# 首先检查是否在processed_data中直接存在(通过field_mapping处理过的)if csv_field in processed_data:final_row[csv_field] = processed_data[csv_field]# 其次检查是否在sensor_data中(原始中文字段)elif src_field in sensor_data:# 获取原始值value = sensor_data[src_field]# 根据csv_schema进行类型转换if csv_field in config.csv_schema:data_type = config.csv_schema[csv_field]try:if "Float" in data_type:final_row[csv_field] = float(value) if not isinstance(value, str) else float(value)elif "UInt" in data_type:final_row[csv_field] = int(value) if not isinstance(value, str) else int(value)else:final_row[csv_field] = valueexcept (ValueError, TypeError):final_row[csv_field] = Noneelse:final_row[csv_field] = value# 最后检查是否在公共字段中elif src_field in self.common_fields:# 检查是否有反向映射(从中文到英文)for k, v in self.common_fields.items():if v == csv_field and k in data:final_row[csv_field] = data[k]breakelse:final_row[csv_field] = Noneelse:final_row[csv_field] = None# final_row["monitor_data"] = sensor_data    # 将原始报文写入# 写入CSV# csv_success = self._write_to_csv(device_type, final_row)# 添加到 ClickHouse 批量缓存try:value = final_row.get('acquisition_time')value=datetime.strptime(value,'%Y-%m-%d %H:%M:%S')except Exception as e:value = datetime.now()finally:final_row['acquisition_time']=valuetry:self._add_to_batch(device_type, final_row)except Exception as e:self.logger.error(f"添加批量写入缓冲失败: {e}")# return csv_successreturn Trueexcept Exception as e:self.logger.error(f"处理数据失败: {e}", exc_info=True)return Falsedef _write_to_csv(self, device_type: str, data: Dict) -> bool:"""写入数据到CSV文件:param device_type: 设备类型:param data: 数据字典:return: 是否成功"""try:# 获取CSV写入器writer = self._get_csv_writer(device_type)if not writer:self.logger.error(f"无法获取设备类型 {device_type} 的CSV写入器")return False# 只写入需要的字段config = self.device_configs.get(device_type)if not config:return False# 获取字段名列表fieldnames = self._get_fieldnames(config)# fieldnames.append('monitor_data')row_data = {}# 只提取需要的字段for field in fieldnames:row_data[field] = data.get(field)# row_data['monitor_data'] = data.get('monitor_data')# 写入CSV
            writer.writerow(row_data)# 刷新缓冲区if device_type in self.csv_handles:self.csv_handles[device_type].flush()self.logger.debug(f"成功写入数据到 {device_type} 的CSV文件")return Trueexcept Exception as e:self.logger.error(f"写入CSV失败: {e}\ndevice_type: {device_type}\ndata: {data}")return Falsedef 过滤变电站(self,stationName:str)->bool:'''是否是所需要的站'''if(stationName in ['1000kVa变电站','1000kVb变电站','1000kVc变电站',]):return Trueelse:return Falsedef _get_topic_end_offsets(self, consumer: KafkaConsumer, topics: List[str]) -> Dict[TopicPartition, int]:"""获取指定topics所有分区的最新偏移量(end offset)"""end_offsets = {}for topic in topics:# 获取topic的所有分区partitions = consumer.partitions_for_topic(topic)if not partitions:self.logger.warning(f"Topic {topic} 无可用分区")continue# 构造TopicPartition对象topic_partitions = [TopicPartition(topic, p) for p in partitions]# 获取每个分区的最新偏移量partition_end_offsets = consumer.end_offsets(topic_partitions)end_offsets.update(partition_end_offsets)self.logger.info(f"Topic {topic} 分区最新偏移量: {[(tp.partition, off) for tp, off in partition_end_offsets.items()]}")return end_offsetsdef _is_all_partitions_consumed(self, consumer: KafkaConsumer, end_offsets: Dict[TopicPartition, int]) -> bool:"""检查是否所有分区都已消费到最新偏移量"""# 获取当前消费者的已消费偏移量# 遍历所有分区,检查是否消费到最新for tp, end_off in end_offsets.items():try:current_offsets = consumer.position(tp)if current_offsets < end_off:return Falseexcept Exception as e:self.logger.error(f'获取分区 {tp} 偏移量失败: {e}')return False            return Truedef start_consuming(self):"""开始消费Kafka数据"""last_rotation_time = datetime.now()consumer = Nonetry:# 从配置中获取Kafka topicskafka_cfg = self.config["kafka"]topics = kafka_cfg.get("topics", [])if not topics:self.logger.error("配置中没有指定Kafka topics,30秒后重试")time.sleep(30)return# 创建Kafka消费者consumer = KafkaConsumer(*topics,group_id=kafka_cfg["group_id"],bootstrap_servers=kafka_cfg["bootstrap_servers"],auto_offset_reset=kafka_cfg.get("auto_offset_reset", "earliest"),enable_auto_commit=kafka_cfg.get("enable_auto_commit", True),value_deserializer=lambda x: json.loads(x.decode('utf-8')) if x else None)self.logger.info(f"Kafka消费者启动,订阅topics: {topics},持续消费模式")# 持续消费:无新消息则等待(指数退避)last_rotation_time = datetime.now()empty_wait = 0.5max_empty_wait = 5.0backoff_factor = 1.5while True:# 批量消费(每次最多拉取100条,可调整)messages = consumer.poll(timeout_ms=1000, max_records=100)if not messages:# 无消息:指数退避等待,避免空转try:time.sleep(empty_wait)except KeyboardInterrupt:raiseempty_wait = min(max_empty_wait, empty_wait * backoff_factor)continue# 有消息:重置等待间隔empty_wait = 0.5# 处理批量消息for tp, msg_list in messages.items():for message in msg_list:try:data = message.valueif not data:continue# stationName = data.get('stationName')# if(self.过滤变电站(stationName)):#     self.process_data(data)
                            self.process_data(data)# 检查是否需要轮换文件# current_time = datetime.now()# if current_time.date() != last_rotation_time.date():#     self.logger.info("检测到日期变化,轮换CSV文件")#     self._rotate_csv_files()#     last_rotation_time = current_timeexcept Exception as e:self.logger.error(f"处理Kafka消息失败: {e}\nmessage: {message}")continue# 手动提交当前分区的偏移量(消费完一批提交一次)# consumer.commit({tp: OffsetAndMetadata(offset=message.offset + 1, metadata="")})
                    consumer.commit()self.logger.debug(f"提交分区 {tp} 偏移量: {message.offset + 1}")# 正常不会到达这里(持续消费)except Exception as e:self.logger.error(f"Kafka消费者异常: {e}", exc_info=True)finally:# 清理资源:关闭 Kafka consumertry:if consumer:consumer.close()self.logger.info("Kafka 消费者已关闭")except Exception:passdef close(self):"""关闭连接"""# 停止定时器self.ch_running = Falsetry:if self.ch_timer:self.ch_timer.cancel()except Exception:pass# 最后一次批量写入try:self._batch_insert_to_clickhouse()except Exception as e:self.logger.error(f"最后一次批量写入失败: {e}")# 关闭 ClickHouse 连接if self.ch_client:try:self.ch_client.close()self.logger.info("ClickHouse 连接已关闭")except Exception as e:self.logger.error(f"关闭 ClickHouse 连接失败: {e}")# 关闭CSV文件
        self._close_csv_files()def __del__(self):"""析构函数"""self.close()if __name__ == '__main__':# 启动消费者consumer = MultiDeviceConsumer()try:consumer.start_consuming()except KeyboardInterrupt:consumer.logger.info("接收到中断信号,正在关闭...")finally:consumer.close()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/1201877.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【毕业设计】基于springboot的饰品商城系统(源码+文档+远程调试,全bao定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

Java计算机毕设之基于springboot的西安秦岭野生动物园智能化管理系统基于Springboot+Vue的野生动物园智能化管理系统(完整前后端代码+说明文档+LW,调试定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

Java毕设项目:基于springboot的闲一品闲置品交易平台(源码+文档,讲解、调试运行,定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

信号处理仿真:傅里叶变换与频谱分析_(9).噪声信号的频谱特征

噪声信号的频谱特征 噪声信号在通信与信息系统中是一个重要的研究对象。噪声信号可以来源于多种途径&#xff0c;包括设备内部的热噪声、外部环境的电磁干扰、传输信道中的随机干扰等。噪声信号的频谱特征对于理解其对系统性能的影响、设计有效的噪声抑制算法以及优化系统性能…

计算机Java毕设实战-基于springboot的西安秦岭野生动物园智能化管理系统动物园售票系统的设计与实现【完整源码+LW+部署说明+演示视频,全bao一条龙等】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

信号处理仿真:傅里叶变换与频谱分析_(11).频谱泄漏与窗函数

频谱泄漏与窗函数 频谱泄漏原理 频谱泄漏&#xff08;Spectral Leakage&#xff09;是傅里叶变换中常见的现象&#xff0c;指的是当信号的周期不是采样点数的整数倍时&#xff0c;FFT&#xff08;快速傅里叶变换&#xff09;结果会出现能量扩散&#xff0c;导致原本集中在某些…

信号处理仿真:傅里叶变换与频谱分析_(13).频谱分析在雷达信号处理中的应用

频谱分析在雷达信号处理中的应用 引言 雷达信号处理是现代雷达系统中不可或缺的重要环节&#xff0c;而频谱分析则是雷达信号处理中的关键技术之一。通过频谱分析&#xff0c;可以将时域信号转换为频域信号&#xff0c;从而更好地理解和处理雷达信号中的各种特性。傅里叶变换…

【毕业设计】基于springboot的西安秦岭野生动物园智能化管理系统(源码+文档+远程调试,全bao定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

信号处理仿真:傅里叶变换与频谱分析_(15).频谱分析在电力系统中的应用

频谱分析在电力系统中的应用 引言 频谱分析是信号处理中的一个重要工具&#xff0c;特别是在电力系统中&#xff0c;它可以帮助我们理解和分析电力信号的频率成分。电力系统中的信号通常包含各种频率成分&#xff0c;包括基波、谐波和干扰信号。通过频谱分析&#xff0c;我们可…

【毕业设计】基于springboot的闲一品闲置品交易平台(源码+文档+远程调试,全bao定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

【课程设计/毕业设计】基于Springboot+Vue的野生动物园智能化管理系统野生动物园科普教育基于springboot的西安秦岭野生动物园智能化管理系统【附源码、数据库、万字文档】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

day165—递归—最长回文子序列(LeetCode-516)

题目描述给你一个字符串 s &#xff0c;找出其中最长的回文子序列&#xff0c;并返回该序列的长度。子序列定义为&#xff1a;不改变剩余字符顺序的情况下&#xff0c;删除某些字符或者不删除任何字符形成的一个序列。示例 1&#xff1a;输入&#xff1a;s "bbbab" …

Java毕设项目:基于springboot的西安秦岭野生动物园智能化管理系统(源码+文档,讲解、调试运行,定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

2025.12.20 作业 - # P13339 [EGOI 2025] Gift Boxes / 礼品盒

2025.12.20 作业 - # P13339 [EGOI 2025] Gift Boxes / 礼品盒题目描述 今年的 EGOI 在波恩举办。主办方希望为比赛中的每支队伍最多分发一个礼品盒,每支队伍的编号为 \(0\) 到 \(T-1\)。所有参赛选手排成一排,但他们…

2025.12.20 作业 - # P12134 [蓝桥杯 2025 省 B] 画展布置

2025.12.20 作业 - # P12134 [蓝桥杯 2025 省 B] 画展布置题目描述 画展策展人小蓝和助理小桥为即将举办的画展准备了 \(N\) 幅画作,其艺术价值分别为 \(A_1, A_2, \dots , A_N\)。他们需要从这 \(N\) 幅画中挑选 \(M…

信号处理仿真:傅里叶变换与频谱分析_(6).非周期信号的频谱分析

非周期信号的频谱分析 1. 引言 在信号处理领域&#xff0c;非周期信号的频谱分析是一个重要的课题。非周期信号在自然界和工程应用中广泛存在&#xff0c;例如语音信号、雷达回波信号、生物医学信号等。傅里叶变换&#xff08;Fourier Transform&#xff09;是频谱分析的核心工…

Java计算机毕设之基于springboot的饰品商城系统基于SpringBoot+Vue的饰品商城系统(完整前后端代码+说明文档+LW,调试定制等)

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…

ARM Cortex-M 存储器系统中的栈存储

ARM Cortex-M 存储器系统中的栈存储 本文来自于我关于ARM Cortex-M 的存储器系统的系列文章。欢迎阅读、点评与交流~ 1、ARM Cortex-M 的存储器系统特性 2、ARM Cortex-M 存储器映射 3、ARM Cortex-M 存储器系统中的栈存储 文章目录ARM Cortex-M 存储器系统中的栈存储一、栈的基…

Java毕设选题推荐:基于Vue的野生动物园智能化管理系统基于springboot的西安秦岭野生动物园智能化管理系统【附源码、mysql、文档、调试+代码讲解+全bao等】

博主介绍&#xff1a;✌️码农一枚 &#xff0c;专注于大学生项目实战开发、讲解和毕业&#x1f6a2;文撰写修改等。全栈领域优质创作者&#xff0c;博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java、小程序技术领域和毕业项目实战 ✌️技术范围&#xff1a;&am…