Some Multi Thread

Last Updated on 2024年2月17日

C++标准库提供的 mutex 在大部分场合都足以保证线程安全, 但是当问题变得更加极端时,就可能需要使用lockfree风格的并行编程了. 而为了正确实现lockfree, 你将打开一扇新的大门, 接下来的名词都是在学习过程中必须正确理解的: memory model, reordering, weak(relax), strong(strict), fence, barrier, release, acquire, seq_cst, consume, mutex, futex, lockfree(lockless), synchronize-with, happens-before, ABA, DCLP,

背景

在单核时代,CPU设计中引入了很多特性,其中很重要的一点就是乱序执行,乱序执行的基本原则是:乱序执行的最终效果应当和顺序执行一致,这个原则在单线程下是比较容易保证的.例如,对于下面的代码,先写入a还是先写入b都不影响后序的指令执行的正确性,所以编译器及硬件都可以自行选择先执行哪一个.

a = 8;
b = 9;

当进入并行环境之后,乱序执行的基本原则可以描述为:从线程自身来看,线程内代码的执行效果和顺序执行一致,这个原则是被大多数编译器/OS/硬件所支持的, 它也从另一个方面暗示: 跨线程时,则可能不一致.

在并行环境中, 各个线程之间能共享的主要是主存,也就是说,在一个线程看来,另一个线程就是在执行一系列Load/Store指令, 以改变主存中的数据. 注意, 这里说的是任意Load/Store指令,并不限定操作的地址.

乱序机制虽然可以保证线程观察自身的IO时,认为指令执行顺序和issue顺序一致,但并不保证其他线程看到的内存IO和自身观察的结果一致. 例如,线程A在执行过程中issue了一系列IO指令, 并且认为这些IO指令就是顺序执行的, 但是其他线程则可能认为A是按另外的顺序在执行IO.

乱序执行IO的理由一般都是"更好的性能". 对于现代的硬件, 在issue IO指令时,load和store一般都会被暂存起来, 从而能把物理地址相近的IO指令聚合起来,然后用单次transaction完成内存IO, 这样就导致了后Issue的指令在客观上被先执行. 硬件有复杂的机制保证这种"后发射先执行"不会被当前的物理核心观测到,但是在跨核心时,这种乱序就可以被观察到了.

最后,编译器为了能尽量利用好硬件多发射乱序执行的特性,也可能在编译期调整指令的顺序, 这种调整一般都是很保守的,因为它将使得生成的二进制代码语义上就已经和原始代码不一致了.

Memory Model

我们已经知道硬件/软件层面均存在乱序执行的特征了, 而乱序执行将有可能影响我们代码的正确性, 这是我们在多线程开发中共享数据时必须关注的.

例如,下面的代码就是一个经典的例子, 在开发者的意图中, 应该认为(1)R0=0,R1=1,(2)R0=2,R1=0,(3)R0=2,R1=1是可接受的,但是绝对不是R0=0,R1=0, 而在特定的环境中,R0=0,R1=0是可能出现的,这时,代码的正确性就被破坏了. 也就是说,代码可能在某些平台可以稳定运行,但是换了一个平台后,出现了莫名其妙的错误.

int A=0;int B=0;
// thread 0
A = 1;
R0 = B;

// thread 1
B = 2;
R1 = A;

为了避免这种错误,我们就需要显式的控制Ordering, 此时就可以用到Memory Model了, 它一方面描述了语言(或硬件平台)乱序IO的特性, 另一方面还会给出手动控制Ordering的方法.

总的来说, 我们要做的是两件事

  1. 在线程内部保证指令的顺序, 保证线程内的happens-before
  2. 在线程间建立同步关系, 进一步跨线程的建立happens-before

Memory Model 是一个非常有歧义的词, 因为它过于简略了. 在现代的编程语言中, Memory Model 可以用 The Model of Memory IO Reordering in Parallel Env 来对应. 也就是说, Memory Model 实际是一个描述并行环境中乱序IO特性的模型.

线程内: Fence

对于任何两个IO指令, 一共能形成4种组合, LoadLoad, LoadStore, StoreLoad, StoreStore. 这四种组合也对应了四种乱序. 例如, LoadLoad意味着两条指令在指令中的相对顺序本来是 Load0,Load1,但是在调整后变成了Load1,Load0.

我们可以通过Fence来控制IO指令的有序性, 此时,需要选择一个Instruction作为fence, 这个fence可以是某个IO指令,也可以是专门用作fence的独立指令. 当fence确定时,就需要关注以下问题

  1. 禁止从哪个方向穿越,例如Load0, fence, Load1
    • 有时候可以允许向下穿越,但不允许向上穿越,也就是说,乱序为fence,Load1,Load0fence,Load0,Load1是可以接受的,但乱序为Load0,Load1,fenceLoad1,Load0,fence则是不可接受的,
    • 有时候两个方向都不允许.
  2. fence影响哪些指令.
    • 虽然就硬件而言,fence一般都很强,会约束fence之前的所有指令和fence之后的所有指令.但是在软件层面,我们常常希望能用更加精细的方式来描述fence,这样当未来硬件有改进时,可以直接利用上层提供的信息进行细粒度的控制.
    • fence可以只约束部分指令,甚至也可以只约束成对的两个指令,在不确定时,一定要查询手册,确定fence影响的指令范围.例如,C++11中, release-operation作为fence时, 只约束该operation之前的所有IO指令,但是独立的release-fence作为fence时, 则还会额外约束fence之后的指令.

大部分硬件都会提供以下四种fence,这些fence都禁止双向穿越,且对fence两侧的所有指令都有约束

  1. LoadLoad: 禁止 fence 前的任何Load指令向下穿越Fence,禁止fence之后的任何Load指令向上穿越Fence,从而使得 fence 之前的 Load 指令总是比 fence 之后的Load指令先完成.
  2. LoadStore: 同理
  3. StoreStore: 同理
  4. StoreLoad: 同理

除了StoreLoad fence, 其他的fence一般都被认为是轻量级的. 一般而言,硬件在实现fence时,主要会实现两个,一个是轻量的(lightweight fence),它同时有LoadLoad/StoreStore/LoadStore的功能; 另一个是全量的(full fence), 则是包含了四个fence的功能

C++11标准库提供的fence主要有:

  1. release_operation: 它相当于StoreStore和LoadStore的组合,不过仅约束release_operation之前的指令,允许之后的IO指令向上穿越
  2. acquire_operation: 它相当于LoadLoad和LoadStore的组合,不过仅约束acquire_operation之后的指令,允许之前的IO指令向下穿越.
  3. consume_operation: 它功能上和acquire_operation相同,不过约束的指令更加精细: 它只约束和consume_operation形成数据依赖的指令
  4. release_fence: 相当于StoreStore和LoadStore的组合.
  5. acquire_fence: 相当于LoadLoad和LoadStore的组合.
  6. seq_cst_operation及seq_cst_fence: 最强的fence,相当于四种硬件fence的组合

C++的atomic主要目标并不是保证原子性, 而是保证顺序性, 当我们只需要原子性的时候,需要强制使用memory_order_relaxed来声明, 否则默认是seq_cst,而seq_cst可能是相当heavy的。

大部分时候,函数调用都是作为一个 full mem fence 存在的, 因为调用侧不能假定函数内没有fence 指令, 必要时,可以通过compiler directive来对编译器进行指示.

seq_cst 的意思是 Sequential Consistent with Source Code, 类似于"所见即所得"的意思.

注意, C++20引入了barrier, 它的功能与fence完全不同, 如果你在看C++20之前的资料,经常会有barrier和fence这两个单词混用的情况, 务必注意.

Happens-before 和 Synchronize-with

Happens-before 是一个题外话, 它并不和内存乱序严格绑定, 但在内存乱序的这个语境下, Happens-before 是一种约束, 需要我们通过特定的手段才能建立, 它的意思是: 如果A物理上在B之前发生, A看起来就应该在B之前发生.

显然, fence就是建立这种线程内happens-before关系的工具

Synchronize-with 主要用于建立跨线程的happens-before, 若A sync with B, 那么当A客观上在B之前发生时, 一定有A happens-before B, 这进一步意味着,对任意X happens-before A, 那么X happens-before B.

注意, 建立 sync 依赖于物理地址, A and B 必须操作同一个地址,注意,这意味着当使用fence建立synchronize-with时,需要分析出具体同步的operation

具体举例, 我们假定payload是一个store指令, 那么可以在一个线程内先建立 payload_store happens-before A , 然后再跨线程建立 A synchronize-with B , 那么这样一来, 当 A真的在B之前发生时, 由于 payload 一定在A之前发生了, 所以payload自然也在B之前发生, 那么payload一定是对B可见的.

目前而言,常见的synchronize-with有:

  1. unlock synchronize-with lock, 它意味着unlock之前的操作都happens-before lock
  2. release synchronize-with acquire, 它意味着release之前的IO operation 都在acquire成功前执行了.
  3. release with rmw_chain synchronize-with acquire: 如果一个release_write经过一系列RMW指令后被一个acquire_load读取,那么总是能建立一系列synchronize-with关系,这种一般称为release-sequences
    1. 中间的RMW可以是任意内存序(如果RMW在末尾,则只能是consume或acquire)
    2. 说直白点,它的意思是,如果物理上发生的时间顺序是,store,rmw0,rmw1,rmw2,load,那么任意两个相邻的指令间都有happens-before关系。这意味着后序的RMW总能读到之前RMW指令写入的值,例如,有一个线程在produce数据,多个线程在用RWM消费数据,那么当其中任意一个线程RMW成功消费之后,其他的RMW读取到的一定是这个线程写入的新值。
    3. 尽管如此,一般开发中建议RMW总是使用rel_acq的内存序,这样可以简化心智负担。

C++还规定了很多编译器必须满足的synchronize-with关系,这些一般很难会影响到代码的逻辑

In Action

好, 我们已经知道的足够多了, 现在我们可以看看我们可以解决哪些问题了.

lock-free (lockless)

如果一个系统不会因为某个线程出现的问题而永远卡死,那么这个系统就称为lockfree的

lock-free 是系统级别的,只要整个系统有进展就行,更强的约束是wait-free,它要求每个线程都能必须会有进展,一般来说,必须设计非常复杂的代码逻辑才能保证实现的功能是wait-free的,所以按这个约束设计的代码比较少。

假设存在一个外部攻击者, 他通过某种手段让一个线程在获得mutex后, 就永久陷入阻塞. 那么其他线程就再也没有机会获得锁了,整个系统就永久卡死了. 这也就是说使用了mutex就一定不是lockfree.

另一方面,即便使用了原子操作,也可能由于代码逻辑不严谨的问题,导致系统永远卡死, 这时候系统也不能称为是lockfree的. 只有使用了atomic,且有正确的代码逻辑,才能保证系统是lockfree的.

最后,lockless很难被正确实现, 当需要使用lockless时, 应当优先使用成熟的算法. 这是因为1. lock-free中常常需要使用CAS的loop,在实现不够严谨时,而这些loop非常容易被等效成一个spin lock. 2. memory_order的逻辑有一定的心智负担,容易写错。

你真的需要使用lock-free吗? 可以先看看An Introduction to lock free programing,以及Locks aren’t slow, Lock contention is

优先级倒置问题在使用mutex时很难避免,但是用lockfree就比较容易保证. 例如, T1和T3都需要使用mutex, 且T1的优先级更高, 如果T3先获得了mutex,那么无论T1如何抢占CPU,都不能继续推进,只能等待T3释放mutex后才可以开始执行. 显然, 如果是使用lockfree,就不存在这种问题,只要T1分配到了时间片,它就可以继续推进,而不用等待T3释放锁.

Gossip of Mutex

之所以称mutex性能不好,是因为mutex.lock()失败时,至少要使用一次系统调用,以让出CPU时间片. 进一步, 当mutex.unlock()时,又需要通过系统调唤醒被阻塞的线程. 也就是说,当有大量线程lock失败时(contention), 就会有大量的系统调用, 这性能一定不好.

对于现代的大部分系统, mutex已经是基于atomic operation实现的了, mutex.lock()成功时是没有系统调用的,而当没有线程被阻塞时, mutex.unlock()也不需要唤醒其他线程, 没有系统调用. 此时的性能是很好的.

在经过benchmark后,一般可以说:在lock contention不强烈时, mutex 其实是非常好,也非常易用的工具.

总有一种迷思认为mutex的性能非常差,这主要是一个历史问题,以前的mutex一般被称为 kernel-space mutex, 每次加锁必然有至少一次系统调用, 所以性能很差. 目前的mutex都是在用户空间使用原子操作实现的, 只有在lock失败时才有系统调用. (注意,windows的mutex仍然是kernelspace的, windows下和linux mutex对应的组件是critical-section) 另外, 尽管futex是 fast userspace mutex 的缩写, 但是由于一些历史演进, futex已经不是一个用于实现线程安全的工具了, 它主要提供wait/wake两个syscall, mutex会基于futex提供的syscall来实现自己的功能.

Others

ABA

如果一个值被从A改为B,又从B改为A,那么它究竟变过没有呢? 当使用lockfree编程时,这种问题就很容易出现,只能通过正确的编码逻辑避免.

Always consider memory order when using atomic

使用atomic时, 一般来说,只使用原子性往往是不足的,必须要考虑ordering.

Use CAS to implement any Atomic Operation

You Can Do Any Kind of Atomic Read-Modify-Write Operation (preshing.com)

DCLP

Double-Checked Locking Pattern 是一个非常经典的问题,互联网上有很多讨论。但我们可以先谈另一个重要特性"C++11后,static对象的构造一定是线程安全的,且能保证顺序性",这可以用下面的代码描述。

MyType * Instance(){
    static MyType obj;
    return &obj;
}

上面的代码是C++11后实现单例模式的标准做法,巧合的是,大部分编译器就是通过DCLP的策略来保证上面的代码是线程安全且性能足够优秀的。

下面我们以单例模式来介绍DCLP的逻辑

std::atomic<Singleton*> Singleton::m_instance;
Singleton* Singleton::getInstance() {
    Singleton* tmp = m_instance;
    if (tmp == NULL) {
        Lock lock;
        tmp = m_instance;
        if (tmp == NULL) {
            tmp = new Singleton;
            m_instance = tmp;
        }
    }
    return tmp;
}

首先来描述一下上面代码潜在的问题,当对象的构造函数inline时,上面的构造部分代码可以被描述为tmp = malloc(); INITIALIZE(tmp); m_instance = tmp,这个INITIALIZE(tmp)里面就可能包含了若干的store指令,
且这些store指令可能会和m_instance = tmp这条指令发生乱序,那么当其他线程读取到更新后的m_instance时,可能对象的构造其实还没有完成。
因此,我们必须保证:

  1. INITIALIZE(tmp) happens-before m_instance = tmp
  2. m_instance = tmp synchronize-with Singleton* tmp = m_instance;

这也就有了下面的一个DCLP版本

std::atomic<Singleton*> Singleton::m_instance;
std::mutex Singleton::m_mutex;

Singleton* Singleton::getInstance() {
    Singleton* tmp = m_instance.load(std::memory_order_relaxed);
    std::atomic_thread_fence(std::memory_order_acquire);
    if (tmp == nullptr) {
        std::lock_guard<std::mutex> lock(m_mutex);
        tmp = m_instance.load(std::memory_order_relaxed);
        if (tmp == nullptr) {
            tmp = new Singleton;
            std::atomic_thread_fence(std::memory_order_release);
            m_instance.store(tmp, std::memory_order_relaxed);
        }
    }
    return tmp;
}

Barrier (Latch)

Barrier 用于保证K个线程能同时到达某个同步点,这对于并行计算往往是非常常用的(例如CUDA的__sync_threads),C++ 20提供了标准的std::barrier来实现这个功能,std::latch也可以实现相似的功能。

#include <barrier>
#include <iostream>
#include <string>
#include <thread>
#include <vector>

int main() {
  const int WORKER_NUM = 3;

  auto on_completion = []() noexcept {
    static int count = 0;
    ++count;
    std::cout << "barrier completed " << count << std::endl;
  };
  std::barrier sync_point(WORKER_NUM, on_completion);

  auto work = [&](int id) {
    std::cout << "worker " << id << " started" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * id));
    sync_point.arrive_and_wait();
    std::cout << "worker " << id << " started again" << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(100 * (WORKER_NUM - id)));
    sync_point.arrive_and_wait();
  };

  std::vector<std::thread> threads;
  for (auto i = 0; i < WORKER_NUM; ++i) {
    threads.emplace_back(work, i);
  }
  for (auto &thread : threads) {
    thread.join();
  }
}

经验谈

  • 使用的工具越高级,将来引入的bug就可能越复杂.
    • 例如,应当优先使用标准库的atomic,而不应当自己写lock-free的代码.
  • std::mutexlock/unlock总应该通过std::unique_lock/std::lock_guard这样的工具实现.
  • 为了避免不必要的麻烦: 尽可能不要在多线程环境中使用fork和signal
  • 库最好不要自动开启新线程,把线程开启/关闭的控制权全部交给库用户.
  • 为了避免麻烦,只用return来结束线程.
  • atomic<T>要求T是可memcpy/memcmp的类型,特别的,double/float这样的浮点数是被当做binary的数据来处理的,而不是当做数值。

读写锁的替代: shared_ptr + copy on write

核心目标:

  1. write是低频的,所以开销可以大一点无所谓。 read是高频的,所以read持有锁的时间应该越短越好。
  2. 更新应当不影响已有线程的正常工作
shared_ptr<Data> d;
mutex write_mutex;
mutex read_mutex;
void read()
{
    shared_ptr<Data> local;
    {
        // 可以用 local = std::atomic_load(&d); 替代,从而避免read_mutex
        lock_guard read_guard(read_mutex);
        local=d;
    }
    process(local)
}

void write(){

    lock_guard write_guard(write_mutex);// 只有一个线程能执行写入.

    shared_ptr<Data> new_data= get_new_data(); // 准备好新的数据.

    // 可以用std::atomic_store(&d,new_data);替代,从而避免read_mutex
    lock_guard read_guard(read_mutex);// 更新数据指针
    d=new_data;
}