PostgreSQL 的 pg_advisory_lock 函数
pg_advisory_lock 是 PostgreSQL 提供的一种应用级锁机制,它不锁定具体的数据库对象(如表或行),而是通过数字键值来协调应用间的并发控制。
锁的基本概念
PostgreSQL 提供两种咨询锁(advisory lock):
- 会话级咨询锁:锁在会话结束时自动释放
- 事务级咨询锁:锁在事务结束时自动释放
主要函数列表
函数 | 描述 |
---|---|
pg_advisory_lock(key) | 获取会话级咨询锁(阻塞) |
pg_try_advisory_lock(key) | 尝试获取会话级咨询锁(非阻塞) |
pg_advisory_xact_lock(key) | 获取事务级咨询锁(阻塞) |
pg_try_advisory_xact_lock(key) | 尝试获取事务级咨询锁(非阻塞) |
pg_advisory_unlock(key) | 释放会话级咨询锁 |
pg_advisory_unlock_all() | 释放当前会话持有的所有咨询锁 |
函数详解
1 pg_advisory_lock(key bigint)
功能:获取一个会话级别的咨询锁(如果锁已被其他会话持有,则阻塞等待)
参数:
key
:64位整数锁标识
示例:
SELECT pg_advisory_lock(123456);
-- 执行需要同步的操作
SELECT pg_advisory_unlock(123456);
2 pg_try_advisory_lock(key bigint)
功能:尝试获取会话级咨询锁(非阻塞,立即返回成功与否)
返回值:boolean(true表示获取成功)
示例:
DO $$
BEGINIF pg_try_advisory_lock(123456) THENRAISE NOTICE 'Lock acquired, performing work...';-- 执行受保护的操作PERFORM pg_advisory_unlock(123456);ELSERAISE NOTICE 'Could not acquire lock, skipping...';END IF;
END $$;
3 pg_advisory_xact_lock(key bigint)
功能:获取事务级咨询锁(锁在事务结束时自动释放)
示例:
BEGIN;
SELECT pg_advisory_xact_lock(123456);
-- 执行需要同步的操作
COMMIT; -- 锁自动释放
4 pg_try_advisory_xact_lock(key bigint)
功能:尝试获取事务级咨询锁(非阻塞)
示例:
BEGIN;
SELECT pg_try_advisory_xact_lock(123456);
-- 无论是否获取成功都继续执行
COMMIT;
锁的键值设计
咨询锁使用64位整数作为键值,有两种使用方式:
-
单键模式:使用一个64位整数
SELECT pg_advisory_lock(123456789);
-
双键模式:使用两个32位整数组合
SELECT pg_advisory_lock(123, 456);
实际应用场景
场景1:防止定时任务重复执行
-- 在定时任务开始时检查锁
DO $$
BEGINIF NOT pg_try_advisory_xact_lock(987654) THENRAISE NOTICE 'Task is already running in another process';RETURN;END IF;RAISE NOTICE 'Starting scheduled task...';-- 执行定时任务逻辑-- ...COMMIT; -- 锁自动释放
END $$;
场景2:应用级分布式锁
-- 应用1获取锁
SELECT pg_advisory_lock(555555) FROM my_table WHERE id = 1;-- 应用2尝试获取同样的锁
SELECT pg_try_advisory_lock(555555); -- 返回false-- 应用1释放锁
SELECT pg_advisory_unlock(555555);
场景3:确保单实例初始化
-- 系统初始化时确保只执行一次
DO $$
BEGIN-- 尝试获取锁,等待最多5秒FOR i IN 1..5 LOOPIF pg_try_advisory_lock(1357924680) THEN-- 检查是否已经初始化IF NOT EXISTS (SELECT 1 FROM system_status WHERE initialized = true) THEN-- 执行初始化INSERT INTO system_status(initialized) VALUES (true);RAISE NOTICE 'System initialized successfully';ELSERAISE NOTICE 'System already initialized';END IF;-- 显式释放锁(虽然会话结束会自动释放)PERFORM pg_advisory_unlock(1357924680);RETURN;END IF;PERFORM pg_sleep(1); -- 等待1秒END LOOP;RAISE EXCEPTION 'Could not acquire initialization lock after 5 seconds';
END $$;
监控咨询锁
查看当前持有的咨询锁
SELECT locktype, classid, objid, objsubid, mode, granted
FROM pg_locks
WHERE locktype = 'advisory';
查看所有咨询锁(包括已授予和等待的)
SELECT pid, locktype, mode, granted, fastpath, virtualtransaction
FROM pg_locks
WHERE locktype = 'advisory';
注意事项
-
锁释放:
- 会话级锁必须显式释放或会话结束自动释放
- 事务级锁在事务结束时自动释放
-
死锁风险:
- 按固定顺序获取多个咨询锁以避免死锁
- 使用 pg_try_advisory_lock 可以降低死锁风险
-
性能影响:
- 咨询锁比表锁/行锁更轻量级
- 大量使用仍可能影响性能
-
集群环境:
- 咨询锁只在单个PostgreSQL实例内有效
- 不适用于跨多个数据库实例的协调
-
锁标识管理:
- 建议在应用中集中管理锁标识
- 使用有意义的常量而非魔法数字
高级用法
超时获取锁
DO $$
DECLARElock_acquired BOOLEAN := false;timeout INTERVAL := '5 seconds';start_time TIMESTAMP := clock_timestamp();
BEGINWHILE (clock_timestamp() - start_time) < timeout LOOPIF pg_try_advisory_lock(424242) THENlock_acquired := true;EXIT;END IF;PERFORM pg_sleep(0.1); -- 等待100msEND LOOP;IF lock_acquired THENRAISE NOTICE 'Lock acquired after %', clock_timestamp() - start_time;-- 执行受保护的操作PERFORM pg_advisory_unlock(424242);ELSERAISE EXCEPTION 'Could not acquire lock within timeout';END IF;
END $$;
使用咨询锁实现队列
-- 生产者
SELECT pg_advisory_lock(987); -- 全局写锁-- 插入队列项
INSERT INTO job_queue(job_data) VALUES ('some data');SELECT pg_advisory_unlock(987);-- 消费者
SELECT pg_advisory_lock(988); -- 全局读锁-- 获取并锁定一个作业
UPDATE job_queue
SET status = 'processing', worker_id = pg_backend_pid(),claimed_at = NOW()
WHERE id = (SELECT id FROM job_queue WHERE status = 'pending' ORDER BY created_at LIMIT 1
)
RETURNING *;SELECT pg_advisory_unlock(988);
pg_advisory_lock 是 PostgreSQL 强大的应用级同步机制,合理使用可以解决复杂的并发控制问题,但需要谨慎设计以避免死锁和性能问题。