设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Linux下GCC的C++代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
C++程序实现,该程序使用多线程和分页查询从Snowflake导出数据到CSV文件,并记录详细日志:
此代码实现了需求中的核心功能,实际部署时可能需要根据具体环境调整:
- ODBC连接字符串参数
- 分页策略(可能需要优化分页方式)
- 数据类型转换(当前按字符串处理所有类型)
- 错误处理策略(增加重试机制等)
- 线程池实现(当前为每个任务创建独立线程)
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <nlohmann/json.hpp>
#include <sql.h>
#include <sqlext.h>using namespace std;
using json = nlohmann::json;// 配置结构
struct ExportTask {string label;string sql;int batch_size;
};// 日志类(线程安全)
class Logger {
private:mutex log_mutex;ofstream log_file;string current_time() {auto now = chrono::system_clock::now();time_t now_time = chrono::system_clock::to_time_t(now);tm tm = *localtime(&now_time);stringstream ss;ss << put_time(&tm, "%Y-%m-%d %H:%M:%S");return ss.str();}public:Logger(const string& filename) {log_file.open(filename, ios::app);}~Logger() {log_file.close();}void log(const string& label, const string& status, int rows, const string& error_msg, long duration_ms) {lock_guard<mutex> lock(log_mutex);log_file << current_time() << " | "<< label << " | "<< status << " | "<< rows << " | "<< duration_ms << "ms | "<< error_msg << endl;}
};// Snowflake导出器
class SnowflakeExporter {
private:string conn_str;void handle_odbc_error(SQLHANDLE handle, SQLSMALLINT type) {SQLCHAR sqlstate[6], message[SQL_MAX_MESSAGE_LENGTH];SQLINTEGER native_error;SQLSMALLINT length;SQLGetDiagRec(type, handle, 1, sqlstate, &native_error, message, SQL_MAX_MESSAGE_LENGTH, &length);throw runtime_error(string((char*)message));}public:SnowflakeExporter(const string& conn_str) : conn_str(conn_str) {}vector<vector<string>> execute_paged_query(const string& base_sql, int limit, int offset) {SQLHENV env;SQLHDBC dbc;SQLHSTMT stmt;vector<vector<string>> results;try {// 初始化ODBC环境SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void*)SQL_OV_ODBC3, 0);// 建立连接SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);SQLCHAR out_str[1024];SQLSMALLINT out_str_len;if (SQLDriverConnect(dbc, NULL, (SQLCHAR*)conn_str.c_str(), SQL_NTS,out_str, sizeof(out_str), &out_str_len,SQL_DRIVER_COMPLETE) != SQL_SUCCESS) {handle_odbc_error(dbc, SQL_HANDLE_DBC);}// 构造分页SQLstring sql = base_sql + " LIMIT " + to_string(limit) + " OFFSET " + to_string(offset);// 执行查询SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);if (SQLExecDirect(stmt, (SQLCHAR*)sql.c_str(), SQL_NTS) != SQL_SUCCESS) {handle_odbc_error(stmt, SQL_HANDLE_STMT);}// 获取结果列数SQLSMALLINT num_cols;SQLNumResultCols(stmt, &num_cols);// 绑定列并获取数据while (SQLFetch(stmt) == SQL_SUCCESS) {vector<string> row;for (int i = 1; i <= num_cols; ++i) {SQLCHAR val[4096];SQLLEN indicator;SQLGetData(stmt, i, SQL_C_CHAR, val, sizeof(val), &indicator);row.emplace_back(indicator == SQL_NULL_DATA ? "" : string((char*)val));}results.push_back(row);}// 清理资源SQLFreeHandle(SQL_HANDLE_STMT, stmt);SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);SQLFreeHandle(SQL_HANDLE_ENV, env);}catch (...) {SQLFreeHandle(SQL_HANDLE_STMT, stmt);SQLDisconnect(dbc);SQLFreeHandle(SQL_HANDLE_DBC, dbc);SQLFreeHandle(SQL_HANDLE_ENV, env);throw;}return results;}
};// CSV写入器
class CSVWriter {
private:ofstream file;mutex file_mutex;string escape_csv(const string& s) {if (s.find('"') != string::npos || s.find(',') != string::npos) {return "\"" + regex_replace(s, regex("\""), "\"\"") + "\"";}return s;}public:CSVWriter(const string& filename) {file.open(filename, ios::out | ios::trunc);}~CSVWriter() {file.close();}void write_rows(const vector<vector<string>>& rows) {lock_guard<mutex> lock(file_mutex);for (const auto& row : rows) {string line;for (size_t i = 0; i < row.size(); ++i) {line += escape_csv(row[i]);if (i != row.size() - 1) line += ",";}file << line << "\n";}}
};// 导出任务处理函数
void process_export_task(const ExportTask& task, const SnowflakeExporter& exporter,CSVWriter& writer,Logger& logger) {auto start_time = chrono::high_resolution_clock::now();int total_rows = 0;string error_msg;try {int offset = 0;while (true) {auto data = exporter.execute_paged_query(task.sql, task.batch_size, offset);if (data.empty()) break;writer.write_rows(data);total_rows += data.size();offset += task.batch_size;}}catch (const exception& e) {error_msg = e.what();}auto end_time = chrono::high_resolution_clock::now();auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time).count();logger.log(task.label, error_msg.empty() ? "SUCCESS" : "FAILED",total_rows,error_msg,duration);
}int main(int argc, char* argv[]) {// 加载配置文件ifstream config_file("config.json");json config = json::parse(config_file);// 解析连接字符串string conn_str = "DRIVER=SnowflakeDSIIDriver;""SERVER=" + config["server"].get<string>() + ";""DATABASE=" + config["database"].get<string>() + ";""SCHEMA=" + config["schema"].get<string>() + ";""UID=" + config["user"].get<string>() + ";""PWD=" + config["password"].get<string>() + ";";// 初始化组件Logger logger("export.log");SnowflakeExporter exporter(conn_str);// 创建线程池vector<thread> threads;vector<unique_ptr<CSVWriter>> writers;for (auto& task_json : config["tasks"]) {ExportTask task{task_json["label"],task_json["sql"],task_json["batch_size"]};writers.emplace_back(make_unique<CSVWriter>(task.label + ".csv"));threads.emplace_back(process_export_task, task, ref(exporter),ref(*writers.back()), ref(logger));}// 等待所有线程完成for (auto& t : threads) {t.join();}return 0;
}
编译说明:
- 需要安装Snowflake ODBC驱动
- 需要链接ODBC库(-lodbc)
- 需要nlohmann/json库
- 推荐使用C++17或更高标准编译
配置文件示例(config.json):
{"server": "your_account.snowflakecomputing.com","database": "your_db","schema": "your_schema","user": "your_user","password": "your_password","tasks": [{"label": "orders","sql": "SELECT * FROM orders ORDER BY order_date","batch_size": 10000},{"label": "customers","sql": "SELECT * FROM customers WHERE status = 'ACTIVE'","batch_size": 5000}]
}
程序特点:
- 多线程处理多个导出任务
- 分页查询处理大数据集
- CSV文件自动覆盖写入
- 线程安全的日志记录
- 详细的错误处理
- CSV特殊字符转义
- 性能指标记录(耗时、行数)
日志格式示例:
2024-03-20 14:30:45 | orders | SUCCESS | 250000 | 4500ms |
2024-03-20 14:31:02 | customers | FAILED | 12000 | 17000ms | Network connection error