Linux下GCC和C++实现带多组标签的Snowflake SQL查询批量数据导出程序

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Linux下GCC的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

C++程序实现,该程序使用多线程和分页查询从Snowflake导出数据到CSV文件,并记录详细日志:

此代码实现了需求中的核心功能,实际部署时可能需要根据具体环境调整:

  1. ODBC连接字符串参数
  2. 分页策略(可能需要优化分页方式)
  3. 数据类型转换(当前按字符串处理所有类型)
  4. 错误处理策略(增加重试机制等)
  5. 线程池实现(当前为每个任务创建独立线程)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <nlohmann/json.hpp>
#include <sql.h>
#include <sqlext.h>using namespace std;
using json = nlohmann::json;// 配置结构
struct ExportTask {string label;string sql;int batch_size;
};// 日志类(线程安全)
class Logger {
private:mutex log_mutex;ofstream log_file;string current_time() {auto now = chrono::system_clock::now();time_t now_time = chrono::system_clock::to_time_t(now);tm tm = *localtime(&now_time);stringstream ss;ss << put_time(&tm, "%Y-%m-%d %H:%M:%S");return ss.str();}public:Logger(const string& filename) {log_file.open(filename, ios::app);}~Logger() {log_file.close();}void log(const string& label, const string& status, int rows, const string& error_msg, long duration_ms) {lock_guard<mutex> lock(log_mutex);log_file << current_time() << " | "<< label << " | "<< status << " | "<< rows << " | "<< duration_ms << "ms | "<< error_msg << endl;}
};// Snowflake导出器
class SnowflakeExporter {
private:string conn_str;void handle_odbc_error(SQLHANDLE handle, SQLSMALLINT type) {SQLCHAR sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH];SQLINTEGER native_error;SQLSMALLINT length;SQLGetDiagRec(type, handle, 1, sqlstate, &native_error, message, SQL_MAX_MESSAGE_LENGTH, &length);throw runtime_error(string((char*)message));}public:SnowflakeExporter(const string& conn_str) : conn_str(conn_str) {}vector<vector<string>> execute_paged_query(const string& base_sql, int limit, int offset) {SQLHENV env;SQLHDBC dbc;SQLHSTMT stmt;vector<vector<string>> results;try {// 初始化ODBC环境SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0);// 建立连接SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);SQLCHAR out_str[1024];SQLSMALLINT out_str_len;if (SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,out_str, sizeof(out_str), &out_str_len,SQL_DRIVER_COMPLETE) != SQL_SUCCESS) {handle_odbc_error(dbc, SQL_HANDLE_DBC);}// 构造分页SQLstring sql = base_sql + " LIMIT " + to_string(limit) + " OFFSET " + to_string(offset);// 执行查询SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);if (SQLExecDirect(stmt, (SQLCHAR*)sql.c_str(), SQL_NTS) != SQL_SUCCESS) {handle_odbc_error(stmt, SQL_HANDLE_STMT);}// 获取结果列数SQLSMALLINT num_cols;SQLNumResultCols(stmt, &num_cols);// 绑定列并获取数据while (SQLFetch(stmt) == SQL_SUCCESS) {vector<string> row;for (int i = 1; i <= num_cols; ++i) {SQLCHAR val[4096];SQLLEN indicator;SQLGetData(stmt, i, SQL_C_CHAR, val, sizeof(val), &indicator);row.emplace_back(indicator == SQL_NULL_DATA ? "" : string((char*)val));}results.push_back(row);}// 清理资源SQLFreeHandle(SQL_HANDLE_STMT, stmt);SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);SQLFreeHandle(SQL_HANDLE_ENV, env);}catch (...) {SQLFreeHandle(SQL_HANDLE_STMT, stmt);SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);SQLFreeHandle(SQL_HANDLE_ENV, env);throw;}return results;}
};// CSV写入器
class CSVWriter {
private:ofstream file;mutex file_mutex;string escape_csv(const string& s) {if (s.find('"') != string::npos || s.find(',') != string::npos) {return "\"" + regex_replace(s, regex("\""), "\"\"") + "\"";}return s;}public:CSVWriter(const string& filename) {file.open(filename, ios::out | ios::trunc);}~CSVWriter() {file.close();}void write_rows(const vector<vector<string>>& rows) {lock_guard<mutex> lock(file_mutex);for (const auto& row : rows) {string line;for (size_t i = 0; i < row.size(); ++i) {line += escape_csv(row[i]);if (i != row.size() - 1) line += ",";}file << line << "\n";}}
};// 导出任务处理函数
void process_export_task(const ExportTask& task, const SnowflakeExporter& exporter,CSVWriter& writer,Logger& logger) {auto start_time = chrono::high_resolution_clock::now();int total_rows = 0;string error_msg;try {int offset = 0;while (true) {auto data = exporter.execute_paged_query(task.sql, task.batch_size, offset);if (data.empty()) break;writer.write_rows(data);total_rows += data.size();offset += task.batch_size;}}catch (const exception& e) {error_msg = e.what();}auto end_time = chrono::high_resolution_clock::now();auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();logger.log(task.label, error_msg.empty() ? "SUCCESS" : "FAILED",total_rows,error_msg,duration);
}int main(int argc, char* argv[]) {// 加载配置文件ifstream config_file("config.json");json config = json::parse(config_file);// 解析连接字符串string conn_str = "DRIVER=SnowflakeDSIIDriver;""SERVER=" + config["server"].get<string>() + ";""DATABASE=" + config["database"].get<string>() + ";""SCHEMA=" + config["schema"].get<string>() + ";""UID=" + config["user"].get<string>() + ";""PWD=" + config["password"].get<string>() + ";";// 初始化组件Logger logger("export.log");SnowflakeExporter exporter(conn_str);// 创建线程池vector<thread> threads;vector<unique_ptr<CSVWriter>> writers;for (auto& task_json : config["tasks"]) {ExportTask task{task_json["label"],task_json["sql"],task_json["batch_size"]};writers.emplace_back(make_unique<CSVWriter>(task.label + ".csv"));threads.emplace_back(process_export_task, task, ref(exporter),ref(*writers.back()), ref(logger));}// 等待所有线程完成for (auto& t : threads) {t.join();}return 0;
}

编译说明:

  1. 需要安装Snowflake ODBC驱动
  2. 需要链接ODBC库(-lodbc)
  3. 需要nlohmann/json库
  4. 推荐使用C++17或更高标准编译

配置文件示例(config.json):

{"server": "your_account.snowflakecomputing.com","database": "your_db","schema": "your_schema","user": "your_user","password": "your_password","tasks": [{"label": "orders","sql": "SELECT * FROM orders ORDER BY order_date","batch_size": 10000},{"label": "customers","sql": "SELECT * FROM customers WHERE status = 'ACTIVE'","batch_size": 5000}]
}

程序特点:

  1. 多线程处理多个导出任务
  2. 分页查询处理大数据集
  3. CSV文件自动覆盖写入
  4. 线程安全的日志记录
  5. 详细的错误处理
  6. CSV特殊字符转义
  7. 性能指标记录(耗时、行数)

日志格式示例:

2024-03-20 14:30:45 | orders | SUCCESS | 250000 | 4500ms | 
2024-03-20 14:31:02 | customers | FAILED | 12000 | 17000ms | Network connection error

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/73433.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Trae AI 辅助修复uniapp 微信小程序的Bug

一、transparent的兼容问题 设计稿&#xff1a; 实际在iphone 6 plu上&#xff1a; 直接让Trae AI修复&#xff1a; 修改后验证通过。 二、v-if分支中子元素根据输入框中内容长度动态添加class样式失效 遇到了个“怪问题”&#xff0c;在其他手机或者开发者工具都正常。也…

conda install 和 pip install 的区别

conda install 和 pip install 是两个常用的包安装命令&#xff0c;但它们在很多方面存在差异。 1. 所属管理系统不同 1.1 conda install conda install 是Anaconda和Miniconda发行版自带的包管理工具 conda 的安装命令。conda 是一个跨平台的开源包管理系统和环境管理系统&…

uni-app App 端分段导出 JSON 数据为文件

在开发过程中&#xff0c;我们经常需要将大量数据导出为 JSON 文件&#xff0c;尤其是在处理长列表或大数据集时。然而&#xff0c;直接将所有数据写入一个文件可能会导致性能问题&#xff0c;尤其是在移动设备上。为了优化性能并提高用户体验&#xff0c;我们可以将数据分段导…

视频推拉流EasyDSS案例分析:互联网直播/点播技术与平台创新应用

随着互联网技术的快速发展&#xff0c;直播/点播平台已成为信息传播和娱乐的重要载体。特别是在电视购物领域&#xff0c;互联网直播/点播平台与技术的应用&#xff0c;不仅为用户带来了全新的购物体验&#xff0c;也为商家提供了更广阔的营销渠道。传统媒体再一次切实感受到了…

MySQL再次基础 向初级工程师迈进

作者&#xff1a;在计算机行业找不到工作的大四失业者 Run run run ! ! ! 1、MySQL概述 1.1数据库相关概念 1.2MySQL数据库 2、SQL 2.1SQL通用语法 SQL语句可以单行或多行书写&#xff0c;以分号结尾。SQL语句可以使用空格/缩进来增强语句的可读性。MySQL数据库的SQL语句不区…

手写一个简易版的tomcat

Tomcat 是一个广泛使用的开源 Servlet 容器&#xff0c;用于运行 Java Web 应用程序。深入理解 Tomcat 的工作原理对于 Java 开发者来说是非常有价值的。本文将带领大家手动实现一个简易版的 Tomcat&#xff0c;通过这个过程&#xff0c;我们可以更清晰地了解 Tomcat 是如何处理…

VSTO(C#)Excel开发8:打包发布安装卸载

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

如何逐步迭代衍生出一个网络安全产品

逐步迭代衍生出一个网络安全产品需要结合市场需求、技术趋势和用户反馈&#xff0c;通过系统化的开发和优化过程来实现。以下是逐步迭代的详细步骤&#xff1a; 1. 确定市场需求和产品定位 市场调研&#xff1a;分析当前网络安全市场的痛点和趋势&#xff0c;如云安全、零信任、…

uni-app打包h5并部署到nginx,路由模式history

uni-app打包有些坑&#xff0c;当时运行的基础路径填写了./&#xff0c;导致在二级页面刷新之后&#xff0c;页面直接空白。就只能换一个路径了&#xff0c;nginx也要跟着改&#xff0c;下面是具体步骤。 manifest.json配置web 运行路径写/h5/&#xff0c;或者写你们网站的目…

Ceph(1):分布式存储技术简介

1 分布式存储技术简介 1.1 分布式存储系统的特性 &#xff08;1&#xff09;可扩展 分布式存储系统可以扩展到几百台甚至几千台的集群规模&#xff0c;而且随着集群规模的增长&#xff0c;系统整体性能表现为线性增长。分布式存储的水平扩展有以下几个特性&#xff1a; 节点…

Linux驱动开发实战(五):Qt应用程序点RGB灯(保姆级快速入门!)

Linux驱动开发实战&#xff08;五&#xff09;&#xff1a;Qt应用程序点RGB灯&#xff08;保姆级快速入门&#xff01;&#xff09; 文章目录 Linux驱动开发实战&#xff08;五&#xff09;&#xff1a;Qt应用程序点RGB灯&#xff08;保姆级快速入门&#xff01;&#xff09;前…

Docker安装Kafka(内含zookeeper)

因为kafka是基于zookeeper做的&#xff0c;所以必须要有zookeeper 一、Zookeeper 1.拉取镜像 docker pull zookeeper:3.7.02.运行 docker run --restartalways \--log-driver json-file \--log-opt max-size100m \--log-opt max-file2 \--name zookeeper -p 2181:2181 \-v…

芯谷D8563TS实时时钟/日历芯片详解可替代PCF8563

概述 芯谷D8563TS是一款低功耗CMOS实时时钟/日历芯片&#xff0c;广泛应用于移动电话、便携式仪器、传真机和电池供电产品等领域。该芯片通过两线双向IC总线进行数据传输&#xff0c;最大总线速度为400 kbits/s。D8563TS内置了自动递增的字地址寄存器&#xff0c;支持多种功能…

【一次成功】Win10本地化单机部署k8s v1.31.2版本及可视化看板

【一次成功】Win10本地化单机部署k8s v1.31.2版本及可视化看板 零、安装清单一、安装Docker Desktop软件1.1 安装前<启用或关闭Windows功能> 中的描红的三项1.2 查看软件版本1.3 配置Docker镜像 二、更新装Docker Desktop三、安装 k8s3.1 点击启动安装3.2 查看状态3.3 查…

MoonSharp 文档五

目录 13.Coroutines&#xff08;协程&#xff09; Lua中的协程 从CLR代码中的协程 从CLR代码中的协程作为CLR迭代器 注意事项 抢占式协程 14.Hardwire descriptors&#xff08;硬编码描述符&#xff09; 为什么需要“硬编码” 什么是“硬编码” 如何进行硬编码 硬编…

【初级篇】如何使用DeepSeek和Dify构建高效的企业级智能客服系统

在当今数字化时代,企业面临着日益增长的客户服务需求。使用Dify创建智能客服不仅能够提升客户体验,还能显著提高企业的运营效率。关于DIfy的安装部署,大家可以参考之前的文章: 【入门级篇】Dify安装+DeepSeek模型配置保姆级教程_mindie dify deepseek-CSDN博客 AI智能客服…

【网络编程】HTTP网络编程

13.1 HTTP 简介 HTTP(Hyper Text Transfer Protocol,超文本传输协议)是用于从万维网(WWW:World Wide Web) 服务器(简称Web 服务器)传输超文本到本地浏览器的传送协议&#xff0c;基于TCP/IP 通信协 议来传递数据 (HTML 文件、图片文件、查询结果等)。 13.2 HTTP 的工作原理 …

用Scrum敏捷的视角看《哪吒2》的创作

去年我们公司邀请Scrum中文网的老师培训了敏捷开发课程&#xff0c;让我对敏捷有了更深入的理解。前阵子我参加了scrum中文网的一个直播&#xff0c;老师分享了敏捷在个人领域或生活其他领域的应用&#xff0c;很有意思。因为我学习敏捷&#xff0c;除了应用到本身软件研发的工…

Docker+Flask 实战:打造高并发微服务架构

DockerFlask 实战&#xff1a;打造高并发微服务架构 今天我们要深入探讨一个非常热门且实用的主题&#xff1a;基于 Docker 部署 Python Flask 应用。Docker 作为当下最流行的容器化技术&#xff0c;已经广泛应用于各种开发和部署场景&#xff0c;尤其是在微服务架构中。而 Fl…

Linux find 命令完全指南

find 是 Linux 系统最强大的文件搜索工具&#xff0c;支持 嵌套遍历、条件筛选、执行动作。以下通过场景分类解析核心用法&#xff0c;涵盖高效搜索、文件管理及高级技巧&#xff1a; 一、基础搜索模式 1. 按文件名搜索&#xff08;精确/模糊匹配&#xff09; <BASH> f…