一文讲清多线程和多线程同步


本文转载自美团技术团队,原文链接

多线程编程是现代软件开发中的一项关键技术,在多线程编程中,开发者可以将复杂的任务分解为多个独立的线程,使其并行执行,从而充分利用多核处理器的优势。然而,多线程编程也带来了挑战,例如线程同步、死锁和竞态条件等问题。本篇文章将深入探讨多线程编程的基本概念(原子操作、CAS、Lock-free、内存屏障、伪共享、乱序执行等)、常见模式和最佳实践。通过具体的代码示例,希望能够帮助大家掌握多线程编程的核心技术,并在实际开发中应用这些知识,提升软件的性能和稳定性。

多线程

1 线程的概念

十多年前,主流观点主张在可能的情况下优先选择多进程而非多线程。如今,多线程编程已经成为编程领域的事实标准。多线程技术在很大程度上改善了程序的性能和响应能力,使其能够更加高效地利用系统资源,这不仅归功于多核处理器的普及和软硬件技术的进步,还归功于开发者对多线程编程的深入理解和技术创新。

那么什么是线程呢?线程是一个执行上下文,它包含诸多状态数据:每个线程有自己的执行流、调用栈、错误码、信号掩码、私有数据。Linux 内核用任务(Task)表示一个执行流。

1.1 执行流

一个任务里被依次执行的指令会形成一个指令序列(IP寄存器值的历史记录),这个指令序列就是一个指令流,每个线程会有自己的执行流。考虑下面的代码(本文代码块为C++):

  int calc(int a, int b, char op) {
    int c = 0;
    if (op == '+')
      c = a + b;
    else if (op == '-')
      c = a - b;
    else if (op == '*')
      c = a * b;
    else if (op == '/')
      c = a / b;
    else
      printf("invalid operation\n");
    return c;
  }

calc 函数被编译成汇编指令,一行C代码对应一个或多个汇编指令,在一个线程里执行 calc,那么这些机器指令会被依次执行。但是,被执行的指令序列跟代码顺序可能不完全一致,代码中的分支、跳转等语句,以及编译器对指令重排、处理器乱序执行会影响指令的真正执行顺序。

1.2 逻辑线程 vs 硬件线程

线程可以进一步区分为逻辑线程和硬件线程。

逻辑线程

程序上的线程是一个逻辑上的概念,也叫任务、软线程、逻辑线程。线程的执行逻辑由代码描述,比如编写一个函数实现对一个整型数组的元素求和:

  int sum(int a[], int n) {
      int x = 0;
      for (int i = 0; i < n; ++i) 
          x += a[i];
      return x;
  }

这个函数的逻辑很简单,它没有再调用其他函数(更复杂的功能逻辑可以在函数里调用其他函数)。我们可以在一个线程里调用这个函数对某数组求和;也可以把 sum 设置为某线程的入口函数,每个线程都会有一个入口函数,线程从入口函数开始执行。sum 函数描述了逻辑,即要做什么以及怎么做,偏设计;但它没有描述物质,即没有描述这个事情由谁做,事情最终需要派发到实体去完成。

硬件线程

与逻辑线程对应的是硬件线程,这是逻辑线程被执行的物质基础。

芯片设计领域,一个硬件线程通常指为执行指令序列而配套的硬件单元,一个 CPU 可能有多个核心,然后核心还可能支持超线程,1个核心的2个超线程复用一些硬件。从软件的视角来看,无须区分是真正的 Core 和超出来的 VCore,基本上可以认为是2个独立的执行单元,每个执行单元是一个逻辑 CPU,从软件的视角看 CPU 只需关注逻辑 CPU。一个软件线程由哪个 CPU/Core 去执行,以及何时执行,不归应用程序员管,它由操作系统决定,操作系统中的调度系统负责此项工作。

2 线程、核心、函数的关系

线程入口函数是线程执行的起点,线程从入口函数开始、一个指令接着一个指令执行,中间它可能会调用其他函数,那么它的控制流就转到了被调用的函数继续执行,被调用函数里还可以继续调用其他函数,这样便形成一个函数调用链。

前面的数组求和例子,如果数组特别大,则哪怕是一个简单的循环累加也可能耗费很长的时间,可以把这个整型数组分成多个小数组,或者表示成二维数组(数组的数组),每个线程负责一个小数组的求和,多个线程并发执行,最后再累加结果。

所以,为了提升处理速度,可以让多个线程在不同数据区段上执行相同(或相似)的计算逻辑,同样的处理逻辑可以有多个执行实例(线程),这对应对数据拆分线程。当然,也可以为两个线程指定不同的入口函数,让各线程执行不同的计算逻辑,这对应对逻辑拆分线程。

我们用一个例子来阐述线程、核心和函数之间的关系,假设有遛狗、扫地两类工作要做:

那什么对应软件线程? 任务拆分。

一个例子

假设现在有2条狗需要遛、3个房间需要打扫。可以把遛狗拆成2个任务,一个任务是遛小狗,另一个任务是遛大狗;打扫房间拆分为3个任务,3个房间对应3个任务,执行这样的拆分策略后,将会产生 2+3=5 个任务。但如果只有2个人,2个人无法同时做5件事,让某人在某时干某事由调度系统负责。

如果张三在遛小狗,那就对应一个线程被执行,李四在扫房间A,则表示另一个线程在执行中,可见线程是一个动态的概念。

软件线程不会一直处于执行中,原因是多方面的。上述例子是因为人手不够,所以遛大狗的任务还处于等待被执行的状态,其他的原因包括中断、抢占、条件依赖等。比如李四扫地过程中接到一个电话,他需要去处理更紧急的事情(接电话),则扫地这个事情被挂起,李四打完电话后继续扫地,则这个线程会被继续执行。

如果只有1个人,则上述5个任务依然可以被依次或交错完成,所以多线程是一个编程模型,多线程并不一定需要多 CPU 多 Core,单 CPU 单 Core 系统依然可以运行多线程程序(虽然最大化利用多 CPU 多 Core 的处理能力是多线程程序设计的一个重要目标)。1个人无法同时做多件事,单 CPU/单 Core 也不可以,操作系统通过时间分片技术应对远多于 CPU/Core 数的多任务执行的挑战。也可以把有些任务只分配给某些人去完成,这对应到 CPU 亲和性和绑核。

3 程序、进程、线程、协程

进程和线程是操作系统领域的两个重要概念,两者既有区别又有联系。

3.1 可执行程序

C/C++ 源文件经过编译器(编译+链接)处理后,会产生可执行程序文件,不同系统有不同格式,比如 Linux 系统的 ELF 格式、Windows 系统的 EXE 格式,可执行程序文件是一个静态的概念。

3.2 进程是什么

可执行程序在操作系统上的一次执行对应一个进程,进程是一个动态的概念:进程是执行中的程序。同一份可执行文件执行多次,会产生多个进程,这跟一个类可以创建多个实例一样。进程是资源分配的基本单位。

3.3 线程是什么

一个进程内的多个线程代表着多个执行流,这些线程以并发模式独立执行。操作系统中,被调度执行的最小单位是线程而非进程。进程是通过共享存储空间对用户呈现的逻辑概念,同一进程内的多个线程共享地址空间和文件描述符,共享地址空间意味着进程的代码(函数)区域、全局变量、堆、栈都被进程内的多线程共享。

3.4 进程和线程的关系

先看看 Linus 的论述,在1996年的一封邮件里,Linus 详细阐述了他对进程和线程关系的深刻洞见,他在邮件里写道:

简言之,内核不要基于进程/线程的概念做设计,而应该围绕 COE 的思考方式去做设计,然后,通过暴露有限的接口给用户去满足 pthreads 库的要求。

3.5 协程

用户态的多执行流,上下文切换成本比线程更低,微信用协程改造后台系统后,获得了更大吞吐能力和更高稳定性。如今,协程库也进了 C++ 20 新标准。

4 为什么需要多线程

4.1 什么是多线程

一个进程内多个线程并发执行的情况就叫多线程,每个线程是一个独立的执行流,多线程是一种编程模型,它与处理器无关、跟设计有关。

需要多线程的原因包括:

我们用2个例子作说明。

4.2 通过多线程并发提升处理能力

假设你要编写一个程序,用于统计一批文本文件的单词出现次数,程序的输入是文件名列表,输出一个单词到次数的映射。

  // 类型别名:单词到次数的映射
  using word2count = std::map<std::string, unsigned int>;

  // 合并“单词到次数映射列表”
  word2count merge(const std::vector<word2count>& w2c_list) {/*todo*/}

  // 统计一个文件里单词出现次数(单词到次数的映射)
  word2count word_count_a_file(const std::string& file) {/*todo*/}

  // 统计一批文本文件的单词出现次数
  word2count word_count_files(const std::vector<std::string>& files) {
      std::vector<word2count> w2c_list;
      for (auto &file : files) {
          w2c_list.push_back(word_count_a_file(file));
      }
      return merge(w2c_list);
  }

  int main(int argc, char* argv[]) {
      std::vector<std::string> files;
      for (int i = 1; i < argc; ++i) {
          files.push_back(argv[i]);
      }
      auto w2c = word_count_files(files);
      return 0;
  }

这是一个单线程程序,word_count_files 函数在主线程里被 main 函数调用。如果文件不多、又或者文件不大,那么运行这个程序,很快就会得到统计结果,否则,可能要等一段长的时间才能返回结果。

重新审视这个程序会发现:函数 word_count_a_file 接受一个文件名,吐出从该文件计算出的局部结果,它不依赖于其他外部数据和逻辑,可以并发执行,所以,可以为每个文件启动一个单独的线程去运行 word_count_a_file,等到所有线程都执行完,再合并得到最终结果。

实际上,为每个文件启动一个线程未必合适,因为如果有数万个小文件,那么启动数万个线程,每个线程运行很短暂的时间,大量时间将耗费在线程创建和销毁上,一个改进的设计:

这样的多线程程序能加快处理速度,前面数组求和可以采用相似的处理,如果程序运行在多 CPU 多 Core 的机器上,就能充分利用多 CPU 多 Core 硬件优势,多线程加速执行是多线程的一个显而易见的主要目的,此其一。

4.3 通过多线程改变程序编写方式

其二,有些场景会有阻塞的调用,如果不用多线程,那么代码不好编写。

比如某程序在执行密集计算的同时,需要监控标准输入(键盘),如果键盘有输入,那么读取输入并解析执行,但如果获取键盘输入的调用是阻塞的,而此时键盘没有输入到来,那么其他逻辑将得不到机会执行。

代码看起来会像下面这样子:

  // 从键盘接收输入,经解释后,会构建一个Command对象返回
  Command command = getCommandFromStdInput();
  // 执行命令
  command.run();

针对这种情况,我们通常会开启一个单独的线程去接收输入,而用另外的线程去处理其他计算逻辑,避免处理输入阻塞其他逻辑处理,这也是多线程的典型应用,它改变了程序的编写方式,此其二。

5 线程相关概念

5.1 时间分片

CPU 先执行线程A一段时间,然后再执行线程B一段时间,然后再执行线程A一段时间,CPU 时间被切分成短的时间片、分给不同线程执行的策略就是 CPU 时间分片。时间分片是对调度策略的一个极度简化,实际上操作系统的调度策略非常精细,要比简单的时间分片复杂的多。如果一秒钟被分成大量的非常短的时间片,比如100个10毫秒的时间片,10毫秒对人的感官而言太短了,以致于用户觉察不到延迟,仿佛计算机被该用户的任务所独占(实际上并不是),操作系统通过进程的抽象获得了这种任务独占 CPU 的效果(另一个抽象是进程通过虚拟内存独占存储)。

5.2 上下文切换

把当前正在 CPU 上运行的任务迁走,并挑选一个新任务到 CPU 上执行的过程叫调度,任务调度的过程会发生上下文切换(context swap),即保存当前 CPU 上正在运行的线程状态,并恢复将要被执行的线程的状态,这项工作由操作系统完成,需要占用 CPU 时间(sys time)。

5.3 线程安全函数与可重入

一个进程可以有多个线程在同时运行,这些线程可能同时执行一个函数,如果多线程并发执行的结果和单线程依次执行的结果是一样的,那么就是线程安全的,反之就不是线程安全的。

不访问共享数据,共享数据包括全局变量、static local 变量、类成员变量,只操作参数、无副作用的函数是线程安全函数,线程安全函数可多线程重入。每个线程有独立的栈,而函数参数保存在寄存器或栈上,局部变量在栈上,所以只操作参数和局部变量的函数被多线程并发调用不存在数据竞争。

C标准库有很多编程接口都是非线程安全的,比如时间操作/转换相关的接口:ctime() / gmtime() / localtime(),C标准通过提供带 _r 后缀的线程安全版本,比如:

  char* ctime_r(const time* clock, char* buf);

这些接口的线程安全版本,一般都需要传递一个额外的 char * buf 参数,这样的话,函数会操作这块 buf,而不是基于 static 共享数据,从而做到符合线程安全的要求。

5.4 线程私有数据

因为全局变量(包括模块内的 static 变量)是进程内的所有线程共享的,但有时应用程序设计中需要提供线程私有的全局变量,这个变量仅在函数被执行的线程中有效,但却可以跨多个函数被访问。

比如在程序里可能需要每个线程维护一个链表,而会使用相同的函数来操作这个链表,最简单的方法就是使用同名而不同变量地址的线程相关数据结构。这样的数据结构可以由 Posix 线程库维护,成为线程私有数据 (Thread-specific Data,或称为 TSD)。

Posix 有线程私有数据相关接口,而 C/C++ 等语言提供 thread_local 关键字,在语言层面直接提供支持。

5.5 阻塞和非阻塞

一个线程对应一个执行流,正常情况下,指令序列会被依次执行,计算逻辑会往前推进。但如果因为某种原因,一个线程的执行逻辑不能继续往前走,那么我们就说线程被阻塞住了。就像下班回家,但走到家门口发现没带钥匙,只能在门口徘徊,任由时间流逝,而不能进入房间。

线程阻塞的原因有很多种,比如:

如果某个系统调用或者编程接口有可能导致线程阻塞,那么便被称之为阻塞系统调用;与之对应的是非阻塞调用,调用非阻塞的函数不会陷入阻塞,如果请求的资源不能得到满足,它会立即返回并通过返回值或错误码报告原因,调用的地方可以选择重试或者返回。

多线程同步

前面讲了多线程相关的基础知识,现在进入第二个话题,多线程同步。

1 什么是多线程同步

同一进程内的多个线程会共享数据,对共享数据的并发访问会出现 Race Condition,这个词的官方翻译是竞争条件,但 condition 翻译成条件令人困惑,特别是对初学者而言,它不够清晰明了,翻译软件显示 condition 有状况、状态的含义,可能翻译成竞争状况更直白。

多线程同步是指:

要掌握多线程同步,需先理解为什么需要多线程同步、哪些情况需要同步。

2 为什么需要同步

理解为什么要同步是多线程编程的关键,它甚至比掌握多线程同步机制本身更加重要。识别什么地方需要同步是编写多线程程序的难点,只有准确识别需要保护的数据、需要同步的点,再配合系统或语言提供的合适的同步机制,才能编写安全高效的多线程程序。

下面通过几个例子解释为什么需要同步。

示例1

1个长度为256的字符数组 msg 用于保存消息,函数 read_msg()write_msg() 分别用于 msg 的读和写:

  // example 1

  char msg[256] = "this is old msg";

  char* read_msg() {
      return msg;
  }

  void write_msg(char new_msg[], size_t len) {
      memcpy(msg, new_msg, std::min(len, sizeof(msg)));
  }

  void thread1() {
      char new_msg[256] = "this is new msg, it's too looooooong";
      write_msg(new_msg, sizeof(new_msg));
  }

  void thread2() {
      printf("msg=%s\n", read_msg());
  }

如果线程1调用 write_msg(),线程2调用 read_msg(),并发操作,不加保护。因为msg的长度是256字节,完成长达256字节的写入需要多个内存周期,在线程1写入新消息期间,线程2可能读到不一致的数据。即可能读到 “this is new msg”,而后半段内容 “it’s very…” 线程1还没来得及写入,它不是完整的新消息。

在这个例子中,不一致表现为数据不完整。

示例2

比如对于二叉搜索树(BST)的节点,一个结构体有3个成分:

  // example 2

  struct Node {
    struct Node *parent;
    struct Node *left_child, *right_child;
  };

这3个成分是有关联的,将节点加入 BST,要设置这3个指针域,从BST删除该节点,要修改该节点的父、左孩子节点、右孩子节点的指针域。对多个指针域的修改,不能在一个指令周期完成,如果完成了一个成分的写入,还没来得修改其他成分,就有可能被其他线程读到了,但此时节点的有些指针域还没有设置好,通过指针域去取数可能会出错。

示例3

考虑两个线程对同一个整型变量做自增,变量的初始值是0,我们预期2个线程完成自增后变量的值为2。

  // example 3

  int x = 0; // 初始值为0
  void thread1() { ++x; }
  void thread2() { ++x; }

简单的自增操作,包括三步:

两个线程并发执行 ++x,让我们看看真实情况是什么样的:

  1. 如果2个线程,先后执行自增,在时间上完成错开。无论是1先2后,或是2先1后,那么 x 的最终值是2,符合预期。但多线程并发并不能确保对一个变量的访问在时间上完全错开。
  2. 如果时间上没有完全错开,假设线程1在 core1 上执行,线程2在 core2 上执行,那么,一个可能的执行过程如下:
    • 首先,线程1把 x 读到 core1 的寄存器,线程2也把 x 的值加载到 core2 的寄存器,此时,存放在两个 core 的寄存器中 x 的副本都是0。
    • 然后,线程1完成自增,更新寄存器里 x 的值的副本(0变1),线程2也完成自增,更新寄存器里 x 的值的副本(0变1)。
    • 再然后,线程1将更新后的新值1写入变量 x 的内存位置。
    • 最后,线程2将更新后的新值1写入同一内存位置,变量 x 的最终值是1,不符合预期。

线程1和线程2在同一个 core 上交错执行,也有可能出现同样的问题,这个问题跟硬件结构无关。之所以会出现不符合预期的情况,主要是因为“ 加载+更新+保存 ”这3个步骤不能在一个内存周期内完成。多个线程对同一变量并发读写,不加同步的话会出现数据不一致。

在这个例子中,不一致表现为 x 的终值既可能为1也可能为2。

示例4

用C++类模板实现一个队列:

  // example 4

  template <typename T>
  class Queue {
      static const unsigned int CAPACITY = 100;
      T elements[CAPACITY];
      int num = 0, head = 0, tail = -1;
  public:
      // 入队
      bool push(const T& element) {
          if (num == CAPACITY) return false;
          tail = (++tail) % CAPACITY;
          elements[tail] = element;
          ++num;
          return true;
      }
      // 出队
      void pop() {
          assert(!empty());
          head = (++head) % CAPACITY;
          --num;
      }
      // 判空
      bool empty() const { 
          return num == 0; 
      }
      // 访队首
      const T& front() const {
          assert(!empty());
          return elements[head];
      }
  };

代码解释:

假设现在有一个 Queue<int> 实例 q,因为直接调用 pop() 可能 assert 失败,我们封装一个 try_pop(),代码如下:

  Queue<int> q;
  void try_pop() {
      if (!q.empty()) {
          q.pop();
      }
  }

如果多个线程调用 try_pop(),会有问题,为什么?

原因:判空+出队这2个操作,不能在一个指令周期内完成。如果线程1在判断队列非空后,线程2穿插进来,判空也为伪,这样就有可能2个线程竞争弹出唯一的元素。

多线程环境下,读变量然后基于值做进一步操作,这样的逻辑如果不加保护就会出错,这是由数据使用方式引入的问题。

示例5

再看一个简单的,简单的对 int32_t 多线程读写。

  // example 5  

  int32_t data[8] = {1,2,3,4,5,6,7,8}; 

  struct Foo {
      int32_t get() const { return x; }
      void set(int32_t x) { this->x = x; }
      int32_t x;
  } foo;

  void thread_write1() {
      for (;;) { for (auto v : data) { foo.set(v); } }
  }

  void thread_write2() {
      for (;;) { for (auto v : data) { foo.set(v); } }
  }

  void thread_read() {
      for (;;) { printf("%d", foo.get()); }
  }

2个写线程1个读线程,写线程在无限循环里用 data 里的元素值设置 foo 对象的 x 成分,读线程简单的打印 foo 对象的 x 值。程序一直跑下去,最后打印出来的数据,会出现除 data 初始化值外的数据吗?

Foo::get 的实现有问题吗?如果有问题?是什么问题?

示例6

看一个用数组实现 FIFO 队列的程序,一个线程写 put(),一个线程读 get()

  // example 6

  #include <iostream>
  #include <algorithm>

  // 用数组实现的环型队列
  class FIFO {
      static const unsigned int CAPACITY = 1024;  // 容量:需要满足是2^N

      unsigned char buffer[CAPACITY];             // 保存数据的缓冲区
      unsigned int in = 0;                        // 写入位置
      unsigned int out = 0;                       // 读取位置

      unsigned int free_space() const { return CAPACITY - in + out; }
  public:
      // 返回实际写入的数据长度(<= len),返回小于len时对应空闲空间不足
      unsigned int put(unsigned char* src, unsigned int len) {
          // 计算实际可写入数据长度(<=len)
          len = std::min(len, free_space());

          // 计算从in位置到buffer结尾有多少空闲空间
          unsigned int l = std::min(len, CAPACITY - (in & (CAPACITY - 1)));
          // 1. 把数据放入buffer的in开始的缓冲区,最多到buffer结尾
          memcpy(buffer + (in & (CAPACITY - 1)), src, l);   
          // 2. 把数据放入buffer开头(如果上一步还没有放完),len - l为0代表上一步完成数据写入
          memcpy(buffer, src + l, len - l);
          
          in += len; // 修改in位置,累加,到达uint32_max后溢出回绕
          return len;
      }

      // 返回实际读取的数据长度(<= len),返回小于len时对应buffer数据不够
      unsigned int get(unsigned char *dst, unsigned int len) {
          // 计算实际可读取的数据长度
          len = std::min(len, in - out);

          unsigned int l = std::min(len, CAPACITY - (out & (CAPACITY - 1)));
          // 1. 从out位置开始拷贝数据到dst,最多拷贝到buffer结尾
          memcpy(dst, buffer + (out & (CAPACITY - 1)), l);
          // 2. 从buffer开头继续拷贝数据(如果上一步还没拷贝完),len - l为0代表上一步完成数据获取
          memcpy(dst + l, buffer, len - l);

          out += len; // 修改out,累加,到达uint32_max后溢出回绕
          return len;
      }
  };

环型队列只是逻辑上的概念,因为采用了数组作为数据结构,所以实际物理存储上并非环型。

直觉告诉我们2个线程不加同步的并发读写,会有问题,但真有问题吗?如果有,到底有什么问题?怎么解决?

3 保护什么

多线程程序里,我们要保护的是数据而非代码,虽然 Java 等语言里有临界代码、sync方法,但最终要保护的还是代码访问的数据。

4 串行化

如果有一个线程正在访问某共享(临界)资源,那么在它结束访问之前,其他线程不能执行访问同一资源的代码(访问临界资源的代码叫临界代码),其他线程想要访问同一资源,则它必须等待,直到那个线程访问完成,它才能获得访问的机会,现实中有很多这样的例子。比如高速公路上的汽车过检查站,假设检查站只有一个车道,则无论高速路上有多少车道,过检查站的时候只能一辆车接着一辆车,从单一车道鱼贯而入。

对多线程访问共享资源施加此种约束就叫串行化。

5 原子操作和原子变量

针对前面的两个线程对同一整型变量自增的问题,如果“ load、update、store ”这3个步骤是不可分割的整体,即自增操作 ++x 满足原子性,上面的程序便不会有问题。

因为这样的话,2个线程并发执行 ++x,只会有2个结果:

除此之外,不会出现第三种情况,线程a、b孰先孰后,取决于线程调度,但不影响最终结果。

Linux 操作系统和 C/C++ 编程语言都提供了整型原子变量,原子变量的自增、自减等操作都是原子的,操作是原子性的,意味着它是一个不可细分的操作整体,原子变量的用户观察它,只能看到未完成和已完成2种状态,看不到半完成状态。

如何保证原子性是实现层面的问题,应用程序员只需要从逻辑上理解原子性,并能恰当的使用它就行了。原子变量非常适用于计数、产生序列号这样的应用场景。

6 锁

前面举了很多例子,阐述多线程不加同步并发访问数据会引起什么问题,下面讲解用锁如何做同步。

6.1 互斥锁

针对线程1 write_msg() + 线程2 read_msg() 的问题,如果能让线程1 write_msg() 的过程中,线程2不能 read_msg(),那就不会有问题。这个要求,其实就是要让多个线程互斥访问共享资源。

互斥锁就是能满足上述要求的同步机制,互斥是排他的意思,它可以确保在同一时间,只能有一个线程对那个共享资源进行访问。

互斥锁有且只有2种状态:

互斥锁提供加锁和解锁两个接口:

  DataType shared_resource;
  Mutex shared_resource_mutex;

  void shared_resource_visitor1() {
      // step1: 加锁
      shared_resource_mutex.lock();
      // step2: operate shared_resouce
      // operation1
      // step3: 解锁
      shared_resource_mutex.unlock();
  }

  void shared_resource_visitor2() {
      // step1: 加锁
      shared_resource_mutex.lock();
      // step2: operate shared_resouce
      // operation2
      // step3: 解锁
      shared_resource_mutex.unlock();
  }

shared_resource_visitor1()shared_resource_visitor2() 代表对共享资源的不同操作,多个线程可能调用同一个操作函数,也可能调用不同的操作函数。

假设线程1执行 shared_resource_visitor1(),该函数在访问数据之前,申请加锁,如果互斥锁已经被其他线程加锁,则调用该函数的线程会阻塞在加锁操作上,直到其他线程访问完数据,释放(解)锁,阻塞在加锁操作的线程1才会被唤醒,并尝试加锁:

注意:在访问资源前申请锁访问后释放锁,是一个编程契约,通过遵守契约而获得数据一致性的保障,它并非一种硬性的限制,即如果别的线程遵从三步曲,而另一个线程不遵从这种约定,代码能通过编译且程序能运行,但结果可能是错的。

6.2 读写锁

读写锁跟互斥锁类似,也是申请锁的时候,如果不能得到满足则阻塞,但读写锁跟互斥锁也有不同,读写锁有3个状态:

对应3个状态,读写锁有3个接口:加读锁,加写锁,解锁:

读写锁提升了线程的并行度,可以提升吞吐。它可以让多个读线程同时读共享资源,而写线程访问共享资源的时候,其他线程不能执行,所以,读写锁适合对共享资源访问“读大于写”的场合。读写锁也叫“共享互斥锁”,多个读线程可以并发访问同一资源,这对应共享的概念,而写线程是互斥的,写线程访问资源的时候,其他线程无论读写,都不可以进入临界代码区。

考虑一个场景:如果有线程1、2、3共享资源 x,读写锁 rwlock 保护资源,线程1读访问某资源,然后线程2以写的形式访问同一资源 x,因为 rwlock 已经被加了读锁,所以线程2被阻塞,然后过了一段时间,线程3也读访问资源 x,这时候线程3可以继续执行,因为读是共享的,然后线程1读访问完成,线程3继续访问,过了一段时间,在线程3访问完成前,线程1又申请读资源,那么它还是会获得访问权,但是写资源的线程2会一直被阻塞。

为了避免共享的读线程饿死写线程,通常读写锁的实现,会给写线程优先权,当然这处决于读写锁的实现,作为读写锁的使用方,理解它的语义和使用场景就够了。

6.3 自旋锁

自旋锁(Spinlock)的接口跟互斥量差不多,但实现原理不同。线程在 acquire 自旋锁失败的时候,它不会主动让出CPU从而进入睡眠状态,而是会忙等,它会紧凑的执行测试和设置(Test-And-Set)指令,直到 TAS 成功,否则就一直占着 CPU 做 TAS。

自旋锁对使用场景有一些期待,它期待 acquire 自旋锁成功后很快会 release 锁,线程运行临界区代码的时间很短,访问共享资源的逻辑简单,这样的话,别的 acquire 自旋锁的线程只需要忙等很短的时间就能获得自旋锁,从而避免被调度走陷入睡眠,它假设自旋的成本比调度的低,它不愿耗费时间在线程调度上(线程调度需要保存和恢复上下文需要耗费CPU)。

内核态线程很容易满足这些条件,因为运行在内核态的中断处理函数里可以通过关闭调度,从而避免 CPU 被抢占,而且有些内核态线程调用的处理函数不能睡眠,只能使用自旋锁。

而运行在用户态的应用程序,则推荐使用互斥锁等睡眠锁。因为运行在用户态应用程序,虽然很容易满足临界区代码简短,但持有锁时间依然可能很长。在分时共享的多任务系统上、当用户态线程的时间配额耗尽,或者在支持抢占式的系统上、有更高优先级的任务就绪,那么持有自旋锁的线程就会被系统调度走,这样持有锁的过程就有可能很长,而忙等自旋锁的其他线程就会白白消耗 CPU 资源,这样的话,就跟自旋锁的理念相背。

Linux 系统优化过后的 mutex 实现,在加锁的时候会先做有限次数的自旋,只有有限次自旋失败后,才会进入睡眠让出 CPU,所以,实际使用中,它的性能也足够好。此外,自旋锁必须在多 CPU 或者多 Core 架构下,试想如果只有一个核,那么它执行自旋逻辑的时候,别的线程没有办法运行,也就没有机会释放锁。

6.4 锁的粒度

合理设置锁的粒度,粒度太大会降低性能,太小会增加代码编写复杂度。

6.5 锁的范围

锁的范围要尽量小,最小化持有锁的时间。

6.6 死锁

程序出现死锁有两种典型原因:

ABBA 锁

假设程序中有2个资源 X 和 Y,分别被锁 A 和 B 保护,线程1持有锁 A 后,想要访问资源 Y,而访问资源 Y 之前需要申请锁 B,而如果线程2正持有锁 B,并想要访问资源 X,为了访问资源 X,所以线程2需要申请锁 A。线程1和线程2分别持有锁 A 和 B,并都希望申请对方持有的锁,因为线程申请对方持有的锁,得不到满足,所以便会陷入等待,也就没有机会释放自己持有的锁,对方执行流也就没有办法继续前进,导致相持不下,无限互等,进而死锁。

上述的情况似乎很明显,但如果代码量很大,有时候,这种死锁的逻辑不会这么浅显,它被复杂的调用逻辑所掩盖,但抽茧剥丝,最根本的逻辑就是上面描述的那样。这种情况叫 ABBA 锁,既某个线程持有A锁申请B锁,而另一个线程持有 B 锁申请 A 锁。这种情况可以通过 try lock 实现,尝试获取锁,如果不成功,则释放自己持有的锁,而不一根筋下去。另一种解法就是锁排序,对A/B两把锁的加锁操作,都遵从同样的顺序(比如先 A 后 B),也能避免死锁。

自死锁

对于不支持重复加锁的锁,如果线程持有某个锁,而后又再次申请锁,因为该锁已经被自己持有,再次申请锁必然得不到满足,从而导致死锁。

7 条件变量

条件变量常用于生产者消费者模式,需配合互斥量使用。

假设你要编写一个网络处理程序,I/O 线程从套接字接收字节流,反序列化后产生一个个消息(自定义协议),然后投递到一个消息队列,一组工作线程负责从消息队列取出并处理消息。这是典型的生产者-消费者模式,I/O 线程生产消息(往队列put),Work线程消费消息(从队列get),I/O 线程和 Work 线程并发访问消息队列,显然,消息队列是竞争资源,需要同步。

可以给队列配置互斥锁,put 和 get 操作前都先加锁,操作完成再解锁。代码差不多是这样的:

  void io_thread() {
      while (1) {
          Msg* msg = read_msg_from_socket();
          msg_queue_mutex.lock();
          msg_queue.put(msg);
          msg_queue_mutex.unlock();
      }
  }

  void work_thread() {
      while (1) {
          msg_queue_mutex.lock();
          Msg* msg = msg_queue.get();
          msg_queue_mutex.unlock();
          if (msg != nullptr) {
              process(msg);
          }
      }
  }

work 线程组的每个线程都忙于检查消息队列是否有消息,如果有消息就取一个出来,然后处理消息,如果没有消息就在循环里不停检查,这样的话,即使负载很轻,但 work_thread 还是会消耗大量的CPU时间。

我们当然可以在两次查询之间加入短暂的 sleep,从而让出 CPU,但是这个睡眠的时间设置为多少合适呢?设置长了的话,会出现消息到来得不到及时处理(延迟上升);设置太短了,还是无辜消耗了 CPU 资源,这种不断问询的方式在编程上叫轮询。

轮询行为逻辑上,相当于你在等一个投递到楼下小邮局的包裹,你下楼查验没有之后就上楼回房间,然后又下楼查验,你不停的上下楼查验,其实大可不必如此,何不等包裹到达以后,让门卫打电话通知你去取呢?

条件变量提供了一种类似通知 notify 的机制,它让两类线程能够在一个点交汇。条件变量能够让线程等待某个条件发生,条件本身受互斥锁保护,因此条件变量必须搭配互斥锁使用,锁保护条件,线程在改变条件前先获得锁,然后改变条件状态,再解锁,最后发出通知,等待条件的睡眠中的线程在被唤醒前,必须先获得锁,再判断条件状态,如果条件不成立,则继续转入睡眠并释放锁。

对应到上面的例子,工作线程等待的条件是消息队列有消息(非空),用条件变量改写上面的代码:

  void io_thread() {
      while (1) {
          Msg* msg = read_msg_from_socket();
          {
              std::lock_guard<std::mutex> lock(msg_queue_mutex);
              msg_queue.push_back(msg);
          }
          msg_queue_not_empty.notify_all();
      }
  }

  void work_thread() {
      while (1) {
          Msg* msg = nullptr;
          {
              std::unique_lock<std::mutex> lock(msg_queue_mutex);
              msg_queue_not_empty.wait(lock, []{ return !msg_queue.empty(); });
              msg = msg_queue.get();
          }
          process(msg);
      }
  }

std::lock_guard 是互斥量的一个RAII包装类,std::unique_lock 除了会在析构函数自动解锁外,还支持主动 unlock()

生产者在往 msg_queue 投递消息的时候,需要对 msg_queue 加锁,通知 work 线程的代码可以放在解锁之后,等待 msg_queue_not_empty 条件必须受 msg_queue_mutex 保护,wait() 的第二个参数是一个 lambda 表达式,因为会有多个 work 线程被唤醒,线程被唤醒后,会重新获得锁,检查条件,如果不成立,则再次睡眠。条件变量的使用需要非常谨慎,否则容易出现不能唤醒的情况。

C语言的条件变量、Posix 条件变量的编程接口跟 C++ 的类似,概念上是一致的,故在此不展开介绍。

8 Lock-free 和无锁数据结构

8.1 锁同步的问题

线程同步分为阻塞型同步和非阻塞型同步。

互斥量、信号、条件变量这些系统提供的机制都属于阻塞型同步,在争用资源的时候,会导致调用线程阻塞。 非阻塞型同步是指在无锁的情况下,通过某种算法和技术手段实现不用阻塞而同步。 锁是阻塞同步机制,阻塞同步机制的缺陷是可能挂起你的程序,如果持有锁的线程崩溃或者 hang 住,则锁永远得不到释放,而其他线程则将陷入无限等待;另外,它也可能导致优先级倒转等问题。所以,我们需要 Lock-free 这类非阻塞的同步机制。

8.2 什么是 Lock-free

Lock-free 没有锁同步的问题,所有线程无阻碍的执行原子指令,而不是等待。比如一个线程读 atomic 类型变量,一个线程写 atomic 变量,它们没有任何等待,硬件原子指令确保不会出现数据不一致,写入数据不会出现半完成,读取数据也不会读一半。

那到底什么是 Lock-free?有人说 Lock-free 就是不使用 mutex / semaphores 之类的无锁(Lock-Less)编程,这句话严格来说并不对。

我们先看一下 wiki 对 Lock-free 的描述:

Lock-freedom allows individual threads to starve but guarantees system-wide throughput. An algorithm is Lock-free if, when the program threads are run for a sufficiently long time, at least one of the threads makes progress (for some sensible definition of progress). All wait-free algorithms are Lock-free. In particular, if one thread is suspended, then a Lock-free algorithm guarantees that the remaining threads can still make progress. Hence, if two threads can contend for the same mutex lock or spinlock, then the algorithm is not Lock-free. (If we suspend one thread that holds the lock, then the second thread will block.)

翻译一下:

第1段给 Lock-free 下定义,第2段则是对 Lock-free 作解释:如果2个线程竞争同一个互斥锁或者自旋锁,那它就不是 Lock-free 的;因为如果暂停(Hang)持有锁的线程,那么另一个线程会被阻塞。

wiki 的这段描述很抽象,它不够直观,稍微再解释一下:Lock-free 描述的是代码逻辑的属性,不使用锁的代码,大部分具有这种属性。大家经常会混淆这 Lock-free 和无锁这2个概念。其实,Lock-free 是对代码(算法)性质的描述,是属性;而无锁是说代码如何实现,是手段。

Lock-free 的关键描述是:如果一个线程被暂停,那么其他线程应能继续前进,它需要有系统级(system-wide)的吞吐。

如图,两个线程在时间线上,至少有一个线程处于 running 状态。

我们从反面举例来看,假设我们要借助锁实现一个无锁队列,我们可以直接使用线程不安全的 std::queue + std::mutex 来做:

  template <typename T>
  class Queue {
  public:
      void push(const T& t) {
          q_mutex.lock();
          q.push(t);
          q_mutex.unlock();
      }
  private:
      std::queue<T> q;
      std::mutex q_mutex;
  };

如果有线程 A/B/C 同时执行 push() 方法,最先进入的线程 A 获得互斥锁。线程 B 和 C 因为获取不到互斥锁而陷入等待。这个时候,线程 A 如果因为某个原因(如出现异常,或者等待某个资源)而被永久挂起,那么同样执行 push() 的线程 B/C 将被永久挂起,系统整体(system-wide)没法推进,而这显然不符合 Lock-free 的要求。因此:所有基于锁(包括 spinlock )的并发实现,都不是 Lock-free 的。

因为它们都会遇到同样的问题:即如果永久暂停当前占有锁的线程/进程的执行,将会阻塞其他线程/进程的执行。而对照 Lock-free 的描述,它允许部分 process(理解为执行流)饿死但必须保证整体逻辑的持续前进,基于锁的并发显然是违背 Lock-free 要求的。

8.3 CAS loop 实现 Lock-free

Lock-free 同步主要依靠 CPU 提供的 read-modify-write 原语,著名的“比较和交换“ CAS(Compare And Swap)在 X86 机器上是通过 cmpxchg 系列指令实现的原子操作,CAS 逻辑上用代码表达是这样的:

  bool CAS(T* ptr, T expect_value, T new_value) {
    if (*ptr != expect_value) {
        return false;
    }
    *ptr = new_value;
    return true;
  }

CAS 接受3个参数:

内存地址 期望值,通常传第一个参数所指内存地址中的旧值 新值 逻辑描述:CAS 比较内存地址中的值和期望值,如果不相同就返回失败,如果相同就将新值写入内存并返回成功。

当然这个C函数描述的只是 CAS 的逻辑,这个函数操作不是原子的,因为它可以划分成几个步骤:读取内存值、判断、写入新值,各步骤之间是可以插入其他操作的。不过前面讲了,原子指令相当于把这些步骤打包,它可能是通过 lock:cmpxchg 指令实现的,但那是实现细节,程序员更应该注重在逻辑上理解它的行为。

通过 CAS 实现 Lock-free 的代码通常借助循环,代码如下:

  do {
      T expect_value = *ptr;
  } while (!CAS(ptr, expect_value, new_value));

创建共享数据的本地副本:expect_value。 根据需要修改本地副本,从 ptr 指向的共享数据里 load 后赋值给 expect_value。 检查共享的数据跟本地副本是否相等,如果相等,则把新值复制到共享数据。 第三步是关键,虽然CAS是原子的,但加载 expect_value 跟 CAS 这2个步骤,并不是原子的。所以,我们需要借助循环,如果ptr内存位置的值没有变(*ptr \== expect_value),那就存入新值返回成功;否则说明加载 expect_value 后,ptr 指向的内存位置被其他线程修改了,这时候就返回失败,重新加载 expect_value ,重试,直到成功为止。

CAS loop 支持多线程并发写,这个特点太有用了,因为多线程同步,很多时候都面临多写的问题,我们可以基于 CAS 实现 Fetch-and-add(FAA)算法,它看起来像这样:

  T faa(T& t) {
      T temp = t;
      while (!compare_and_swap(x, temp, temp + 1));
  }

第一步加载共享数据的值到 temp,第二步比较 + 存入新值,直到成功。

8.4 无锁数据结构:Lock-free Stack

无锁数据结构是通过非阻塞算法而非锁保护共享数据,非阻塞算法保证竞争共享资源的线程,不会因为互斥而让它们的执行无限期暂停;无阻塞算法是 Lock-free 的,因为无论如何调度都能确保有系统级的进度。wiki 定义如下:

A non-blocking algorithm ensures that threads competing for a shared resource do not have their execution indefinitely postponed by mutual exclusion. A non-blocking algorithm is Lock-free if there is guaranteed system-wide progress regardless of scheduling.

下面是 C++ atomic compare_exchange_weak() 实现的一个 Lock-free 堆栈(from CppReference):

  template <typename T>
  struct node {
      T data;
      node* next;
      node(const T& data) : data(data), next(nullptr) {}
  };
  
  template <typename T>
  class stack {
      std::atomic<node<T>*> head;
  public:
      void push(const T& data) {
        node<T>* new_node = new node<T>(data);
        new_node->next = head.load(std::memory_order_relaxed);
        while (!head.compare_exchange_weak(new_node->next, new_node,
                                          std::memory_order_release,
                                          std::memory_order_relaxed));
      }
  };

代码解析:

这样的行为逻辑显然符合 Lock-free 的定义,注意用 CAS+Loop 实现自旋锁不符合 Lock-free 的定义,注意区分。

9 程序序:Program Order

对单线程程序而言,代码会一行行顺序执行,就像我们编写的程序的顺序那样。比如:

  a = 1;
  b = 2;

会先执行 a = 1 再执行 b = 2,从程序角度看到的代码行依次执行叫程序序,我们在此基础上构建软件,并以此作为讨论的基础。

10 内存序:Memory Order

与程序序相对应的内存序,是指从某个角度观察到的对于内存的读和写所真正发生的顺序。内存操作顺序并不唯一,在一个包含 core0 和 core1 的 CPU 中,core0 和 core1 有着各自的内存操作顺序,这两个内存操作顺序不一定相同。从包含多个 Core 的 CPU 的视角看到的全局内存操作顺序跟单 core 视角看到的内存操作顺序亦不同,而这种不同,对于有些程序逻辑而言,是不可接受的,例如:

程序序要求 a = 1b = 2 之前执行,但内存操作顺序可能并非如此,对 a 赋值1并不确保发生在对 b 赋值2之前,这是因为:

虽然对一个 Core 而言,如上所述,这个 Core 观察到的内存操作顺序不一定符合程序序,但内存操作序和程序序必定产生相同的结果,无论在单 Core 上对 a、b 的赋值哪个先发生,结果上都是 a 被赋值为1、b 被赋值为2,如果单核上乱序执行会影响结果,那编译器的指令重排和 CPU 乱序执行便不会发生,硬件会提供这项保证。

但多核系统,硬件不提供这样的保证,多线程程序中,每个线程所工作的 Core 观察到的不同内存操作序,以及这些顺序与全局内存序的差异,常常导致多线程同步失败,所以,需要有同步机制确保内存序与程序序的一致,内存屏障(Memory Barrier)的引入,就是为了解决这个问题,它让不同的 Core 之间,以及 Core 与全局内存序达成一致。

11 乱序执行:Out-of-Order Execution

乱序执行会引起内存顺序跟程序顺序不同,乱序执行的原因是多方面的,比如编译器指令重排、超标量指令流水线、预测执行、Cache-Miss 等。内存操作顺序无法精确匹配程序顺序,这有可能带来混乱,既然有副作用,那为什么还需要乱序执行呢?答案是为了性能。

我们先看看没有乱序执行之前,早期的有序处理器(In-order Processors)是怎么处理指令的?

乱序处理器(Out-of-Order Processors)又是怎么处理指令的呢?

指令乱序执行是结果,但原因并非只有 CPU 的乱序执行,而是由两种因素导致:

12 Store Buffer

为什么需要 Store Buffer?

考虑下面的代码:

void set_a() {
    a = 1;
}

所以,我们需要 Store Buffer,每个 Core 都有独立的 Store Buffer,每个 Core 都访问私有的 Store Buffer,Store Buffer 帮助 CPU 遮掩了 Store 操作带来的延迟。

Store Buffer 会带来什么问题?

  a = 1;
  b = 2;
  assert(a == 1);

上面的代码,断言 a == 1 的时候,需要读(load)变量a的值,而如果 a 在被赋值前就在 Cache 中,就会从 Cache 中读到 a 的旧值(可能是1之外的其他值),所以断言就可能失败。但这样的结果显然是不能接受的,它违背了最直观的程序顺序性。

问题出在变量 a 除保存在内存外,还有2份拷贝:一份在 Store Buffer 里,一份在 Cache 里;如果不考虑这2份拷贝的关系,就会出现数据不一致。那怎么修复这个问题呢?

可以通过在 Core Load 数据的时候,先检查 Store Buffer 中是否有悬而未决的a的新值,如果有,则取新值;否则从 cache 取 a 的副本。这种技术在多级流水线 CPU 设计的时候就经常使用,叫 Store Forwarding。有了 Store Buffer Forwarding,就能确保单核程序的执行遵从程序顺序性,但多核还是有问题,让我们考查下面的程序:

多核内存序问题

  int a = 0; // 被CPU1 Cache
  int b = 0; // 被CPU0 Cache

  // CPU0执行
  void x() {
      a = 1;
      b = 2;
  }

  // CPU1执行
  void y() {
      while (b == 0);
      assert(a == 1);
  }

假设 a 和 b 都被初始化为0;CPU0 执行 x() 函数,CPU1 执行 y() 函数;变量 a 在 CPU1 的 local Cache里,变量 b 在 CPU0 的 local Cache 里。

怎么办?答案留到内存屏障一节揭晓。

13 Invalidate Queue

为什么需要 Invalidate Queue?

当一个变量加载到多个 core 的 Cache,则这个 Cache Line 处于 Shared 状态,如果 Core1 要修改这个变量,则需要通过发送核间消息 Invalidate 来通知其他 Core 把对应的 Cache Line 置为 Invalid,当其他 Core 都 Invalid 这个 CacheLine 后,则本 Core 获得该变量的独占权,这个时候就可以修改它了。

收到 Invalidate 消息的 core 需要回 Invalidate ACK,一个个 core 都这样 ACK,等所有 core 都回复完,Core1 才能修改它,这样 CPU 就白白浪费。

事实上,其他核在收到 Invalidate 消息后,会把I nvalidate 消息缓存到 Invalidate Queue,并立即回复 ACK,真正 Invalidate 动作可以延后再做,这样一方面因为 Core 可以快速返回别的 Core 发出的 Invalidate 请求,不会导致发生 Invalidate 请求的 Core 不必要的 Stall,另一方面也提供了进一步优化可能,比如在一个 CacheLine 里的多个变量的 Invalidate 可以攒一次做了。

但写 Store Buffer 的方式其实是 Write Invalidate,它并非立即写入内存,如果其他核此时从内存读数,则有可能不一致。

14 内存屏障

那有没有方法确保对 a 的赋值一定先于对 b 的赋值呢?有,内存屏障被用来提供这个保障。

内存屏障(Memory Barrier),也称内存栅栏、屏障指令等,是一类同步屏障指令,是 CPU 或编译器在对内存随机访问的操作中的一个同步点,同步点之前的所有读写操作都执行后,才可以开始执行此点之后的操作。语义上,内存屏障之前的所有写操作都要写入内存;内存屏障之后的读操作都可以获得同步屏障之前的写操作的结果。

内存屏障,其实就是提供一种机制,确保代码里顺序写下的多行,会按照书写的顺序,被存入内存,主要是解决 Store Buffer 引入导致的写入内存间隙的问题。

  void x() {
      a = 1;
      wmb();
      b = 2;
  }

像上面那样在 a = 1b = 2 之间插入一条内存屏障语句,就能确保 a = 1 先于 b = 2 生效,从而解决了内存乱序访问问题,那插入的这句 smp_mb(),到底会干什么呢?

回忆前面的流程,CPU0 在执行完 a = 1 之后,执行 smp_mb() 操作,这时候,它会给 Store Buffer 里的所有数据项做一个标记(marked),然后继续执行 b = 2,但这时候虽然 b 在自己的 cache 里,但由于 store buffer 里有 marked 条目,所以,CPU0 不会修改 cache 中的 b,而是把它写入 Store Buffer;所以 CPU0 收到 Read 消息后,会把 b 的0值发给 CPU1,所以继续在 while (b) 自旋。

简而言之,Core 执行到 write memory barrier(wmb)的时候,如果 Store Buffer 还有悬而未决的 store 操作,则都会被 mark 上,直到被标注的 Store 操作进入内存后,后续的 Store 操作才能被执行,因此 wmb 保障了 barrier 前后操作的顺序,它不关心 barrier 前的多个操作的内存序,以及 barrier 后的多个操作的内存序,是否与 Global Memory Order 一致。

  a = 1;
  b = 2;
  wmb();
  c = 3;
  d = 4;

wmb() 保证“a = 1;b = 2”发生在“c = 3;d = 4”之前,不保证 a = 1b = 2 的内存序,也不保证 c = 3d = 4 的内部序。

Invalidate Queue 的引入的问题

就像引入 Store Buffer 会影响 Store 的内存一致性,Invalidate Queue 的引入会影响 Load 的内存一致性。因为 Invalidate queue 会缓存其他核发过来的消息,比如 Invalidate 某个数据的消息被 delay 处置,导致 core 在 Cache Line 中命中这个数据,而这个 Cache Line 本应该被 Invalidate 消息标记无效。如何解决这个问题呢?

一种思路是硬件确保每次load数据的时候,需要确保 Invalidate Queue 被清空,这样可以保证 load 操作的强顺序

软件的思路,就是仿照 wmb() 的定义,加入 rmb() 约束。rmb() 给我们的 invalidate queue 加上标记。当一个 load 操作发生的时候,之前的 rmb() 所有标记的 invalidate 命令必须全部执行完成,然后才可以让随后的 load 发生。这样,我们就在 rmb() 前后保证了 load 观察到的顺序等同于 global memory order

所以,我们可以像下面这样修改代码:

  a = 1;
  wmb();
  b = 2;
  while(b != 2) {};
  rmb();
  assert(a == 1);

系统对内存屏障的支持

gcc 编译器在遇到内嵌汇编语句 asm volatile(“” ::: “memory”),将以此作为一条内存屏障,重排序内存操作,即此语句之前的各种编译优化将不会持续到此语句之后。

Linux 内核提供函数 barrier() 用于让编译器保证其之前的内存访问先于其之后的完成。

  #define barrier() __asm__ __volatile__("" ::: "memory")

CPU内存屏障:

小结

伪共享

多个线程同时读写同一个 Cache Line 中的变量、导致 CPU Cache 频繁失效,从而使得程序性能下降的现象称为伪共享(False Sharing)。

  const size_t shm_size = 16*1024*1024; //16M
  static char shm[shm_size];
  std::atomic<size_t> shm_offset{0};

  void f() {
      for (;;) {
          auto off = shm_offset.fetch_add(sizeof(long));
          if (off >= shm_size) break;
          *(long*)(shm + off) = off; // 赋值
      }
  }

考察上面的程序:shm 是一块 16M 字节的内存,我测试的机器的 L3 Cache 是 32M,16M 字节能确保 shm 在 Cache 里放得下。f() 函数的循环里,视 shmlong 类型的数组,依次给每个元素赋值,shm_offset 用于记录偏移位置,shm_offset.fetch_add(sizeof(long)) 原子性的增加 shm_offset 的值(因为x86_64系统上 long 的长度为8,所以 shm_offset 每次增加8),并返回增加前的值,对 shmlong 数组的每个元素赋值后,结束循环从函数返回。

因为 shm_offsetatomic 类型变量,所以多线程调用 f() 依然能正常工作,虽然多个线程会竞争 shm_offset,但每个线程会排他性的对各 long 元素赋值,多线程并行会加快对 shm 的赋值操作。我们加上多线程调用代码:

  std::atomic<size_t> step{0};

  const int THREAD_NUM = 2;

  void work_thread() {
      const int LOOP_N = 10;
      for (int n = 1; n <= LOOP_N; ++n) {
          f();
          ++step;
          while (step.load() < n * THREAD_NUM) {}
          shm_offset = 0;
      }
  }

  int main() {
      std::thread threads[THREAD_NUM];
      for (int i = 0; i < THREAD_NUM; ++i) {
          threads[i] = std::move(std::thread(work_thread));
      }
      for (int i = 0; i < THREAD_NUM; ++i) {
          threads[i].join();
      }
      return 0;
  }

main 函数里启动2个工作线程 work_thread。 工作线程对 shm 共计赋值10轮,后面的每一轮会访问 Cache 里的 shm 数据,step 用于 work_thread 之间每一轮的同步。 工作线程调用完 f() 后会增加 step,等2个工作线程都调用完之后,step 的值增加到 n * THREAD_NUM 后,while() 会结束循环,重置 shm_offset,重新开始新一轮对 shm 的赋值。

如图所示:

编译后执行上面的程序,产生如下的结果:

  time ./a.out

  real 0m3.406s
  user 0m6.740s
  sys 0m0.040s

time 命令用于时间测量,a.out 程序运行完成后会打印耗时,real 列显式耗时3.4秒。

1. 改进版 f_fast

我们稍微修改一下 f 函数,改进版 f 函数取名 f_fast

  void f_fast() {
      for (;;) {
          const long inner_loop = 16;
          auto off = shm_offset.fetch_add(sizeof(long) * inner_loop);
          for (long j = 0; j < inner_loop; ++j) {
              if (off >= shm_size) return;
              *(long*)(shm + off) = j;
              off += sizeof(long);
          }
      }
  }

for 循环里,shm_offset 不再是每次增加8字节(sizeof(long)),而是 8*16=128 字节,然后在内层的循环里,依次对16个 long 连续元素赋值,然后下一轮循环又再次增加128字节,直到完成对 shm 的赋值。如图所示:

编译后重新执行程序,结果显示耗时降低到0.06秒,对比前一种耗时3.4秒,f_fast 性能提升明显。

  time ./a.out

  real 0m0.062s
  user 0m0.110s
  sys 0m0.012s

f f_fast 的行为差异

shm 数组总共有 2M 个 long 元素,因为 16M / sizeof(long) 得 2M:

1、f() 函数行为逻辑

2、f_fast() 函数行为逻辑

2 为什么 f_fast 更快

第一眼感觉是 f_fast()shm_offset.fetch_add() 调用频次降低到了原来的 1/16,有理由怀疑是原子变量的竞争减少导致程序执行速度加快。为了验证,让我们在内层的循环里加一个原子变量 testfetch_addtest 原子变量的竞争会像 f() 函数里 shm_offset.fetch_add() 一样激烈,修改后的 f_fast 代码变成下面这样:

  void f_fast() {
      for (;;) {
          const long inner_loop = 16;
          auto off = shm_offset.fetch_add(sizeof(long) * inner_loop);
          for (long j = 0; j < inner_loop; ++j) {
              test.fetch_add(1);
              if (off >= shm_size) return;
              *(long*)(shm + off) = j;
              off += sizeof(long);
          }
      }
  }

为了避免 test.fetch_add(1) 的调用被编译器优化掉,我们在 main 函数的最后把 test 的值打印出来。编译后测试一下,结果显示:执行时间只是稍微增加到 real 0m0.326s,很显然,并不是 atomic 的调用频次减少导致性能飙升。

重新审视 f() 循环里的逻辑:f() 循环里的操作很简单:原子增加、判断、赋值。我们把 f() 的里赋值注释掉,再测试一下,发现它的速度得到了很大提升,看来是 (long)(shm + off) = off 这一行代码执行慢,但这明明只是一行赋值。我们把它反汇编来看,它只是一个 mov 指令,源操作数是寄存器,目标操作数是内存地址,从寄存器拷贝数据到一个内存地址,为什么会这么慢呢?

3 原因

现在揭晓答案:导致 f() 性能底下的元凶是伪共享(false sharing)。那什么是伪共享?要说清这个问题,还得联系 CPU 的架构以及 CPU 怎么访问数据,回顾一下关于多核 Cache 结构。

背景知识

现代 CPU 可以有多个核,每个核有自己的 L1-L2 缓存,L1又区分数据缓存(L1-DCache)和指令缓存(L1-ICache),L2 不区分数据和指令 Cache,而 L3 是跨核共享的,L3 通过内存总线连接到内存,内存被所有 CPU 所有 Core 共享。

CPU 访问 L1 Cache 的速度大约是访问内存的100倍,Cache 作为 CPU 与内存之间的缓存,减少对内存的访问频率。

从内存加载数据到 Cache 的时候,是以 Cache Line 为长度单位的,Cache Line 的长度通常是64字节,所以,那怕只读一个字节,但是包含该字节的整个 Cache Line 都会被加载到缓存,同样,如果修改一个字节,那么最终也会导致整个 Cache Line 被冲刷到内存。

如果一块内存数据被多个线程访问,假设多个线程在多个 Core 上并行执行,那么它便会被加载到多个 Core 的的 Local Cache 中;这些线程在哪个 Core 上运行,就会被加载到哪个 Core 的 Local Cache 中,所以,内存中的一个数据,在不同 Core 的 Cache 里会同时存在多份拷贝。

那么,便会存在缓存一致性问题。当一个 Core 修改其缓存中的值时,其他 Core 不能再使用旧值。该内存位置将在所有缓存中失效。此外,由于缓存以缓存行而不是单个字节的粒度运行,因此整个缓存行将在所有缓存中失效。如果我们修改了 Core1 缓存里的某个数据,则该数据所在的 Cache Line 的状态需要同步给其他 Core 的缓存,Core 之间可以通过核间消息同步状态,比如通过发送 Invalidate 消息给其他核,接收到该消息的核会把对应 Cache Line 置为无效,然后重新从内存里加载最新数据。

当然,被加载到多个 Core 缓存中的同一 Cache Line,会被标记为共享(Shared)状态,对共享状态的缓存行进行修改,需要先获取缓存行的修改权(独占),MESI 协议用来保证多核缓存的一致性,更多的细节可以参考 MESI 的文章。

示例分析

假设线程1运行在 Core1,线程2运行在 Core2。

这种交替执行模式,相当于 Core1 和 Core2 之间需要频繁的发送核间消息,收到消息的 Core 的 Cache Line 被置为无效,并重新从内存里加载数据到 Cache,每次修改后都需要把 Cache 中的数据刷入内存,这相当于废弃掉了 Cache,因为每次读写都直接跟内存打交道,Cache 的作用不复存在,这就是性能低下的原因。

这种多核多线程程序,因为并发读写同一个 Cache Line 的数据(临近位置的内存数据),导致 Cache Line 的频繁失效,内存的频繁 Load/Store,从而导致性能急剧下降的现象叫伪共享,伪共享是性能杀手。

4 另一个伪共享的例子

假设线程 x 和 y,分别修改 Data 的 a 和 b 变量,如果被频繁调用,也会出现性能低下的情况,怎么规避呢?

  struct Data {
      int a;
      int b;
  } data; // global

  void thread1() {
      data.a = 1;
  }

  void thread2() {
      data.b = 2;
  }

空间换时间

避免 Cache 伪共享导致性能下降的思路是用空间换时间,通过增加填充,让 a 和 b 两个变量分布到不同的 Cache Line,这样对 a 和 b 的修改就会作用于不同 Cache Line,就能避免 Cache 失效的问题。

  struct Data {
      int a;
      int padding[60];
      int b;
  };

在 Linux kernel 中存在 __cacheline_aligned_in_smp 宏定义用于解决 false sharing 问题。

  #ifdef CONFIG_SMP
  #define __cacheline_aligned_in_smp __cacheline_aligned
  #else
  #define __cacheline_aligned_in_smp
  #endif

  struct Data {
      int a;
      int b __cacheline_aligned_in_smp;
  };

从上面的宏定义,可以看到:

总结

pthread 接口提供的几种同步原语如下:

由于 Linux 下线程和进程本质都是 LWP,那么进程间通信使用的 IPC(管道、FIFO、消息队列、信号量)线程间也可以使用,也可以达到相同的作用。 但是由于 IPC 资源在进程退出时不会清理(因为它是系统资源),因此不建议使用。

以下是一些非锁但是也能实现线程安全或者部分线程安全的常见做法:

可以看到,上面很多做法都是采用了副本,尽量避免在 thread 中间共享数据。最快的同步就是没同步(The fastest synchronization of all is the kind that never takes place),Share nothing is best。