PostgreSQL的扩展 pg_cron
pg_cron 是 PostgreSQL 的一个开源扩展,它允许在数据库内部使用 cron 语法调度定期任务,是最接近 Oracle DBMS_SCHEDULER 的解决方案。
一 安装与配置
1 安装方法
下载路径:
https://github.com/citusdata/pg_cron/tags
编译安装:
cd pg_cron
make && make install
在 PostgreSQL 中配置:
-- 在 postgresql.conf 中添加:
shared_preload_libraries = 'pg_cron'-- 重启后创建扩展
CREATE EXTENSION pg_cron;
2. 权限配置
-- 创建专用角色
CREATE ROLE cron_job_runner WITH LOGIN;
GRANT USAGE ON SCHEMA cron TO cron_job_runner;-- 配置任务执行权限
ALTER SYSTEM SET cron.database_name = 'your_database';
ALTER SYSTEM SET cron.host = 'localhost';
ALTER SYSTEM SET cron.log_run = 'on';
二、核心功能使用
1. 基本任务调度
-- 每分钟执行
SELECT cron.schedule('test-job', '* * * * *', 'SELECT now()');-- 每天凌晨3点执行
SELECT cron.schedule('nightly-cleanup', '0 3 * * *', 'DELETE FROM logs WHERE created_at < now() - interval ''30 days''');-- 每周一早上执行
SELECT cron.schedule('weekly-report', '0 9 * * 1', 'CALL generate_weekly_report()');
2. 带参数的任务
-- 使用美元引用
SELECT cron.schedule('param-job', '0 * * * *', $$UPDATE stats SET value = value + 1 WHERE metric_id = 'page_views'$$);-- 调用存储过程
SELECT cron.schedule('call-proc', '0 0 * * *', 'CALL refresh_materialized_views(true)');
3. 任务管理
-- 查看所有任务
SELECT * FROM cron.job;-- 查看执行历史
SELECT * FROM cron.job_run_details ORDER BY start_time DESC LIMIT 10;-- 更新任务计划
SELECT cron.alter_job(job_id, schedule := '0 4 * * *');-- 删除任务
SELECT cron.unschedule(jobid);
三、高级功能
1. 任务重试机制
-- 带错误处理的任务
SELECT cron.schedule('retry-job', '*/5 * * * *', $$
BEGIN-- 业务逻辑INSERT INTO important_data SELECT * FROM external_source;
EXCEPTION WHEN OTHERS THENINSERT INTO job_errors VALUES ('retry-job', SQLERRM, now());
END;
$$);
2. 任务依赖链
-- 使用表状态控制流程
SELECT cron.schedule('step1', '0 1 * * *', $$TRUNCATE TABLE staging_data;COPY staging_data FROM '/data/import.csv';UPDATE job_status SET step1_complete = true WHERE job_name = 'daily_import';
$$);SELECT cron.schedule('step2', '0 2 * * *', $$BEGINPERFORM 1 FROM job_status WHERE job_name = 'daily_import' AND step1_complete;IF NOT FOUND THENRAISE EXCEPTION 'Step1 not completed';END IF;-- 处理数据...END;
$$);
3. 分布式任务
-- 在多个节点上调度(需Citus扩展)
SELECT cron.schedule('sharded-job', '0 * * * *', 'SELECT master_distribute_command($cmd$INSERT INTO sharded_events SELECT * FROM collect_events()$cmd$)');
四、监控与维护
1. 监控面板查询
-- 正在运行的任务
SELECT * FROM cron.job_run_details
WHERE status = 'running';-- 失败任务统计
SELECT jobid, jobname, COUNT(*) as failures
FROM cron.job_run_details
WHERE status = 'failed'
GROUP BY jobid, jobname
ORDER BY failures DESC;-- 任务执行时长分析
SELECT jobname,avg(end_time - start_time) as avg_duration,max(end_time - start_time) as max_duration
FROM cron.job_run_details
GROUP BY jobname;
2. 维护操作
-- 清理历史记录(保留30天)
DELETE FROM cron.job_run_details
WHERE start_time < now() - interval '30 days';-- 临时禁用所有任务
UPDATE cron.job SET active = false;-- 导出任务配置
COPY (SELECT jobname, schedule, command FROM cron.job)
TO '/backup/pg_cron_jobs.csv' WITH CSV HEADER;
五、典型应用场景
1. 数据维护任务
-- 自动VACUUM
SELECT cron.schedule('auto-vacuum', '0 4 * * *', $$VACUUM (VERBOSE, ANALYZE) tables_with_heavy_updates;
$$);-- 分区表维护
SELECT cron.schedule('partition-rotation', '0 3 * * *', $$CALL rotate_partitions('events', 'day', 7);
$$);
2. 数据ETL流程
-- 每小时数据抽取
SELECT cron.schedule('extract-hourly', '0 * * * *', $$INSERT INTO data_warehouse.hourly_factsSELECT * FROM extract_hourly_metrics();
$$);-- 每天数据转换
SELECT cron.schedule('transform-daily', '30 3 * * *', $$CALL transform_raw_to_dimensions();
$$);
3. 业务定时任务
-- 每月账单生成
SELECT cron.schedule('monthly-billing', '0 2 1 * *', $$CALL generate_invoices(date_trunc('month', CURRENT_DATE));
$$);-- 定时提醒
SELECT cron.schedule('appointment-reminders', '0 9 * * *', $$INSERT INTO notificationsSELECT user_id, 'appointment_reminder', appointment_timeFROM appointmentsWHERE appointment_time BETWEEN now() AND now() + interval '24 hours';
$$);
六、注意事项
-
性能影响:
- 避免调度过于频繁的CPU密集型任务
- 长时间运行的任务应设置超时
-
安全考虑:
-- 限制任务权限 REVOKE ALL ON SCHEMA cron FROM PUBLIC; GRANT USAGE ON SCHEMA cron TO scheduler_role;
-
高可用性:
- 在主备架构中,pg_cron只需在主节点运行
- 使用 repmgr 或 Patroni 时确保故障转移后任务继续执行
-
版本兼容性:
- pg_cron 1.0+ 需要 PostgreSQL 12+
- 新版本支持任务标签和更细粒度的控制
pg_cron 通过将 cron 功能直接集成到 PostgreSQL 中,提供了轻量级但强大的任务调度解决方案,特别适合需要与数据库紧密交互的定时任务场景。