同步机制¶
在多核或中断驱动的系统中,多个执行流可能同时访问共享数据。同步机制确保这些访问是有序的、不会破坏数据一致性。
为什么需要同步?¶
并发问题的本质¶
考虑一个简单的计数器:
初始值:counter = 0
CPU 0: counter++ CPU 1: counter++
读取 0 读取 0
计算 1 计算 1
写入 1 写入 1
结果:counter = 1(应该是 2!)
问题在于:读取-修改-写入不是原子操作,中间可能被其他执行流打断。
三类并发来源¶
- 多核并行:多个 CPU 核同时执行
- 中断打断:中断处理函数打断当前代码
- 进程切换:调度器切换进程
任何一类都可能导致并发问题。
临界区¶
访问共享资源的代码段叫临界区。同步的目标是:同一时刻只有一个执行流在临界区内。
进入临界区
访问共享数据
退出临界区
自旋锁:最基础的同步原语¶
什么是自旋锁?¶
自旋锁是最简单的锁:获取不到就一直循环检查(自旋),直到获取成功。
获取锁:
while (lock == 1) { // 锁被占用
自旋等待 // 不断检查
}
lock = 1; // 获取锁
释放锁:
lock = 0; // 释放锁
为什么自旋而不是睡眠?¶
自旋锁适用于短临界区: - 等待时间短,自旋的开销小于睡眠/唤醒的开销 - 在中断上下文中不能睡眠
如果临界区很长,应该用睡眠锁(信号量等),避免浪费 CPU。
自旋锁的硬件支持¶
上面的伪代码有个问题:检查和设置不是原子的。
RISC-V 提供原子指令解决这个问题:
amoswap.w rd, rs2, (rs1) // 原子交换
操作:
tmp = *rs1
*rs1 = rs2
rd = tmp
这一步是原子的,不可打断
GCC 内置函数封装了这些指令:
__atomic_exchange_n(&lock->locked, 1, __ATOMIC_ACQUIRE)
// 原子地:读取 locked,写入 1,返回旧值
锁与中断的关系¶
考虑这个场景:
1. CPU 0 获取锁 L
2. 定时器中断发生
3. 中断处理函数也要获取锁 L
4. 死锁!中断处理函数永远等不到锁
解决方案:获取锁时自动关中断。
获取锁:
关中断 // 防止中断打断
while (锁被占用) {
自旋
}
获取锁
释放锁:
释放锁
开中断 // 恢复中断
这样,持有锁时不会被中断打断,避免了中断处理函数中的死锁。
中断嵌套问题¶
如果代码嵌套获取多个锁:
函数 A:
获取锁 L1(关中断)
调用函数 B
函数 B:
获取锁 L2(关中断)
释放锁 L2(开中断)← 问题!此时还在函数 A 中
函数 A 继续:
释放锁 L1(此时中断已开,错误!)
解决方案:记录中断嵌套深度。
struct cpu {
u64 intr_state; // 保存原始中断状态
int intr_depth; // 嵌套深度
};
获取锁:
if (intr_depth == 0) {
intr_state = 当前中断状态 // 第一次才保存
}
intr_depth++
关中断
释放锁:
intr_depth--
if (intr_depth == 0) {
恢复 intr_state // 最外层才恢复
}
锁的调试支持¶
开启 SPINLOCK_DEBUG 后,可以检测:
- 递归锁:同一个 CPU 重复获取同一把锁
- 错误释放:释放未持有的锁
- 错误的 CPU:其他 CPU 释放锁
这些错误通常导致死锁,尽早检测很有帮助。
原子操作:无锁编程的基础¶
什么是原子操作?¶
原子操作是不可分割的操作:要么完全执行,要么完全不执行,没有中间状态。
原子类型¶
typedef struct {
volatile int32_t counter;
} atomic_t;
volatile 告诉编译器不要优化对 counter 的访问。
原子操作函数¶
读写:
- atomic_read:读取值
- atomic_set:设置值
算术:
- atomic_add_return:原子加,返回新值
- atomic_sub_return:原子减,返回新值
- atomic_fetch_add:原子加,返回旧值
交换:
- atomic_xchg:原子交换
- atomic_cmpxchg:比较并交换(CAS)
比较并交换(CAS)¶
CAS 是无锁编程的核心:
if (当前值 == 期望值) {
当前值 = 新值;
return 成功;
} else {
return 失败;
}
整个过程是原子的,可以用来实现无锁数据结构。
原子操作的应用场景¶
原子计数器:
static atomic_t active_count = ATOMIC_INIT(0);
void task_start(void) {
atomic_inc(&active_count);
}
void task_end(void) {
atomic_dec(&active_count);
}
无锁栈(简化版):
void push(struct node *n) {
do {
n->next = atomic_ptr_read(&top);
} while (atomic_ptr_cmpxchg(&top, n->next, n) != n->next);
}
如果 CAS 失败,说明其他线程修改了栈顶,需要重试。
内存屏障:保证访问顺序¶
为什么需要内存屏障?¶
CPU 和编译器都会重排序指令以提高性能:
原始代码:
a = 1;
b = 2;
实际执行可能:
b = 2; // 先执行
a = 1;
在单线程中,这没问题。但在多线程中,如果另一个线程依赖 a 和 b 的写入顺序,就会出错。
编译器屏障¶
#define barrier() asm volatile("" ::: "memory")
告诉编译器:不要把 barrier 前后的代码重排序。
硬件屏障¶
GCC 的 __atomic 内置函数已经包含了必要的屏障:
__ATOMIC_SEQ_CST:全序一致性(最强)__ATOMIC_ACQUIRE:读屏障__ATOMIC_RELEASE:写屏障__ATOMIC_RELAXED:无屏障
Acquire-Release 语义¶
Acquire(获取):后续操作不能重排到前面 Release(释放):前面操作不能重排到后面
获取锁(Acquire):
屏障
临界区代码
释放锁(Release):
屏障
这保证:临界区内的操作不会跑到临界区外面。
READ_ONCE / WRITE_ONCE¶
防止编译器过度优化:
int flag = 0;
// 错误:编译器可能假设 flag 不变
while (!flag) {
...
}
// 正确:强制每次读取
while (!READ_ONCE(flag)) {
...
}
锁的正确使用¶
避免死锁¶
死锁的四个必要条件:
- 互斥:资源不能共享
- 持有并等待:持有资源同时等待其他资源
- 不可抢占:资源不能强制回收
- 循环等待:形成等待环
破坏任意一个条件即可避免死锁。最常用的是破坏循环等待:按固定顺序获取锁。
// 死锁风险
线程 1: 锁 A → 锁 B
线程 2: 锁 B → 锁 A
// 正确:统一顺序
线程 1: 锁 A → 锁 B
线程 2: 锁 A → 锁 B // 先获取 A,再获取 B
锁粒度权衡¶
粗粒度锁(一把大锁): - 简单,不易出错 - 并发度低,性能差
细粒度锁(多把小锁): - 并发度高,性能好 - 复杂,容易死锁
选择原则:先粗后细,过早优化是万恶之源。
持锁时间¶
临界区应该尽可能短:
// 错误:持锁时做 I/O
spinlock_acquire(&lock);
write_to_disk(data); // 很慢!
spinlock_release(&lock);
// 正确:只保护共享数据
spinlock_acquire(&lock);
local_copy = shared_data;
spinlock_release(&lock);
write_to_disk(local_copy);
设计权衡¶
为什么只有自旋锁,没有睡眠锁?¶
睡眠锁需要调度器支持,当前实现较简单。未来可以添加:
- 互斥锁:获取不到时睡眠
- 信号量:计数器,支持 P/V 操作
- 读写锁:读共享,写独占
为什么不用 RCU?¶
RCU(Read-Copy-Update)是 Linux 的高级同步机制,读操作无锁。但实现复杂,需要:
- 宽限期管理
- 回调机制
- 多核同步
对于教学目的,自旋锁足够理解同步的基本原理。
思考题¶
- 如果在持有自旋锁时调用
sched_yield,会发生什么? - 原子操作能完全替代锁吗?什么情况下可以?
- 如何检测自旋锁的争用情况(多个 CPU 同时竞争同一把锁)?
- 为什么
atomic_t的counter字段要加volatile?