Linux 系统编程

第七章 进程间通信(IPC)

7.1 进程间通信概述

一、进程间通信的目的

  • 数据传输:一个进程将它的数据发送给另一个进程。
  • 共享数据:多个进程操作共享的数据,这些操作对每个进程都是实时的。
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知某种事件的发生。
  • 资源共享:多个进程在内核的锁与同步机制的基础上共享同样的资源。
  • 进程控制:有些进程希望完全控制另一个进程的运行(如 debug 调试),并能够及时知道其状态的改变。

二、进程间通信的变革

  • Linux 进程间通信由几个部分发展而来
    • 早期 UNIX 进程间通信
    • 基于 System V 的进程间通信
    • 基于 Socket 的进程间通信和 POSIX 进程间通信
  • UNIX 进程间通信方式包括:管道、FIFO、信号
  • System V 进程间通信方式包括:System V 消息队列、System V 信号灯、System V 共享内存
  • POSIX 进程间通信方式包括:POSIX 消息队列、POSIX 信号灯、POSIX 共享内存
  • 现代的进程间通信方式包括:管道和命名管道、信号、消息队列、共享内存、进程信号量、套接字

7.2 管道

一、基本概念

  • 管道是针对本地计算机的两个进程间通信而设计的通信方法,其实质是内核中的一片缓存
  • 管道建立后,实际获得两个文件描述符,一个用于读取,另一个用于写入。
  • 管道是最常见的 IPC 手段,通过pipe系统调用实现。
  • 管道是单工的,数据只能向一个方向流动,需要双向通信时,需要建立两个管道
  • 一个进程向管道中写入的内容被另一端的进程读出,写入的内容每次都添加在管道末尾,并且每次都从管道的头部读出。

二、管道的分类

匿名管道
  • 关系进程间通信(父子进程、兄弟进程)
  • 通过pipe系统调用创建,一般由父进程建立
  • 管道位于内核空间
命名管道
  • 可以在两个没有任何关系的进程间通信,本质是内核中的缓存,另在文件系统中以一个特殊的设备文件(管道文件)存在。两者是同步的。
  • 通过mkfifo系统调用创建。

三、匿名管道

1
2
3
4
5
6
7
8
9
#include <unistd.h>
int pipe(int fd[2]);
/*
返回值:成功返回 0,出错返回 -1
功能:创建匿名管道

- fd[0]: pipe 的读端
- fd[1]: pipe 的写端
*/

示例:利用管道,父进程写两个数据,子进程读两个数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

int main()
{
int fd[2];
pipe(fd); // 创建匿名管道,fd[0] 为读端,fd[1] 为写端

pid_t pid;
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid > 0) {
close(fd[0]); // 父进程写数据,关闭读端
int start = 1, end = 100;
if (write(fd[1], &start, sizeof(int)) < 0) {
perror("write error");
}
if (write(fd[1], &end, sizeof(int)) < 0) {
perror("write error");
}
close(fd[1]); // 写完后关闭写端
wait(NULL); // 等待子进程结束
} else {
close(fd[1]); // 子进程读数据,关闭写端
int start, end;
if (read(fd[0], &start, sizeof(int)) < 0) {
perror("read error");
}
if (read(fd[0], &end, sizeof(int)) < 0) {
perror("read error");
}
close(fd[0]); // 读完后关闭读端
printf("read success. start: %d, end: %d\n", start, end);
}

return 0;
}
案例

​ 实现管道命令cat /etc/passwd | grep root|左侧命令的输出作为右侧命令的输入。程序实现思路图示

截屏2023-01-04 15.47.19

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

char *const cmd1[3] = {"cat", "/etc/passwd",NULL};
char *const cmd2[3] = {"grep", "root", NULL};

int main()
{
int fd[2];
pipe(fd); // 父进程创建管道

pid_t pid;
for (int i = 0; i < 2; i++) {
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid == 0) {
if (i == 0) { // 第一个子进程,执行 cat 并写入管道
close(fd[0]); // 关闭读端
// 将标准输出重定向到管道写端
if (dup2(fd[1], STDOUT_FILENO) != STDOUT_FILENO) {
perror("dup2 error");
exit(1);
}
// 随后关闭管道写端(由于成功 exec 后不会返回,所以要在 exec 前关闭)
close(fd[1]);
// 执行 cat 命令
if (execvp(cmd1[0], cmd1) < 0) {
perror("execvp error");
exit(1);
}
}
if (i == 1) { // 第二个子进程,从管道读出内容并执行 grep 命令
close(fd[1]); // 关闭写端
// 将标准输入重定向到管道读端
if (dup2(fd[0], STDIN_FILENO) != STDIN_FILENO) {
perror("dup2 error");
exit(1);
}
// 随后关闭管道读端 (由于成功 exec 后不会返回,所以要在 exec 前关闭)
close(fd[0]);
// 执行 grep 命令
if (execvp(cmd2[0], cmd2) < 0) {
perror("execvp error");
exit(1);
}
}
} else {
if (i == 1) { // 如果两个子进程都创建好了
// 关闭资源
close(fd[0]);
close(fd[1]);
// 等待子进程结束
wait(NULL);
wait(NULL);
}
}
}

return 0;
}
管道的读写特性
  • 管道是阻塞性的,当进程从管道中读取数据但没有数据时,会阻塞。

  • 一个进程可以往管道中不断写入数据,只要该管道未满。如果满了再写,则会报错。

  • 不完整管道(broken pipe)

    • 读一个写端被关闭的管道,在所有数据被读取后,read会返回 0 表示读到尾部

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      #include <stdio.h>
      #include <unistd.h>
      #include <string.h>
      #include <stdlib.h>

      int main()
      {
      int fd[2];
      if (pipe(fd) < 0) {
      perror("pipe error");
      exit(1);
      }

      pid_t pid;
      if ((pid = fork()) < 0) {
      perror("fork error");
      exit(1);
      } else if (pid == 0) {
      close(fd[0]);
      char *s = "abc123";
      if (write(fd[1], s, strlen(s)) != strlen(s)) { // 子进程写入数据
      perror("write error");
      exit(1);
      }
      close(fd[1]);
      } else {
      wait(NULL); // 父进程等子进程结束后再运行 保证管道写端已经关闭
      close(fd[1]);
      while (1) {
      char c;
      if (read(fd[0], &c, sizeof(char)) == 0) {
      printf("\nend of pipe\n");
      break;
      } else {
      printf("%c", c);
      }
      }
      close(fd[0]);
      }

      return 0;
      }
    • 写一个读端被关闭的管道,会产生信号 SIGPIPE。如果忽略该信号或者捕捉该信号并从信号处理函数返回,则write会返回 -1,且errno被设置为EPIPE

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      #include <stdio.h>
      #include <unistd.h>
      #include <string.h>
      #include <stdlib.h>
      #include <errno.h>
      #include <signal.h>

      void sig_handler(int sig)
      {
      if (sig == SIGPIPE) {
      printf("SIGPIPE occurred!\n");
      }
      }

      int main()
      {
      int fd[2];
      if (pipe(fd) < 0) {
      perror("pipe error");
      exit(1);
      }
      // 登记信号 SIGPIPE
      if (signal(SIGPIPE, sig_handler) == SIG_ERR) {
      perror("signal SIGPIPE error");
      exit(1);
      }
      // 关闭管道读端
      close(fd[0]);
      // 然后写入数据
      char *s = "hello world";
      if (write(fd[1], s, strlen(s)) < 0) {
      fprintf(stderr, "write error. %s(%s)\n", strerror(errno),
      (errno == EPIPE ? "EPIPE" : "unknown"));
      // 打印对 errno 的解析内容,并判断 errno 是否为 EPIPE
      }

      return 0;
      }
标准库中的管道操作
1
2
3
4
5
#include <stdio.h>
FILE *popen(const char *cmdstring, const char *type);
// 创建管道,成功返回文件指针,出错返回 NULL
int pclose(FILE *fp);
// 使用 popen 创建的管道必须用此函数关闭,出错返回 -1

截屏2023-01-05 17.06.49

举例应用:两种模式调用popen执行命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

int main()
{
FILE *fp = popen("cat /etc/passwd | grep root", "r");
// 执行该命令,命令的输出将存放到 fp 的缓存中
char buf[512] = {0};
while (fgets(buf, 512, fp) != NULL) {
printf("%s", buf);
}
pclose(fp);

printf("-------------------------\n");

fp = popen("wc -l", "w");
// 此命令统计行数,其输入来源于 fp 的缓存
// 写 fp 的缓存
fprintf(fp, "line 1\nline 2\nline 3\nline 4\n");
fprintf(fp, "abc\n123\n\nhello\n");
pclose(fp);

return 0;
}

四、命名管道

1
2
3
4
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
//成功返回 0,出错返回 -1
  • 只要对 FIFO 有适当的访问权限,FIFO 可用在任何两个进程间通信。
  • FIFO 本质是内核中的一块缓存,另在文件系统中以一个特殊的设备文件(管道文件)存在。文件系统中,只有一个索引块存放文件的路径,没有数据块,所有数据存放在内核中。
  • 命名管道读写必须同时打开,否则单独读或写会阻塞。
  • 命令mkfifo pathname可以直接创建命名管道。
  • 对 FIFO 的操作和普通文件一样。
  • FIFO 的出错信息主要包括:EACCES(无存取权限)、 EEXIST(指定文件不存在)、 ENAMETOOLONG(路径名太长)、 ENOENT(包含的目录不存在)、 ENOSPC(文件系统剩余空间不足)、 ENOTDIR(文件路径无效)、 EROFS(指定的文件位于只读文件系统中)。
案例

​ 命名管道的读写。通过两个程序分别实现命名管道的读和写,命名管道通过mkfifo命令创建,路径通过命令行传递给程序,观察两个程序执行顺序是否会造成区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// fifo_w.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

int main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s fifopath\n", argv[0]);
exit(1);
}

printf("writing...\n");

int fd;
if ((fd = open(argv[1], O_WRONLY)) < 0) {
perror("open error");
exit(1);
}
char *s = "hello fifo";
if (write(fd, s, strlen(s)) != strlen(s)) {
perror("write error");
exit(1);
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// fifo_r.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>

int main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s fifopath\n", argv[0]);
exit(1);
}

printf("reading...\n");

int fd;
if ((fd = open(argv[1], O_RDONLY)) < 0) {
perror("open error");
exit(1);
}
char buf[512] = {0};
if (read(fd, buf, 512) < 0) {
perror("read error");
exit(1);
} else {
printf("%s\n", buf);
}

return 0;
}
两类管道的特性比较
  • 相同点

    • 默认都是阻塞型读写
    • 都适用于 socket 通信
    • 对于完整管道(读端写端都开启)
  • 不同点

    • 创建和打开方式不同

    • 设置非阻塞型读写时,匿名管道通过fcntl系统调用设置O_NONBLOCK,命名管道通过openfcntl设置O_NONBLOCK

  • 对于完整管道

操作 阻塞型 非阻塞型
单纯读 要么阻塞,要么读到数据 直接报错
单纯写 写到管道满时会出错 写到管道满时会出错
  • 对于不完整管道(有一端关闭)
操作 阻塞型 非阻塞型
单纯读 读完后read返回 0 表示读到末尾 直接报错
单纯写 产生 SIGPIPE,若程序未终止,则write返回 -1 且errnoEPIPE 同阻塞型

7.3 消息队列

System V IPC 概述

  • 基本信息

    • System V 引入三种高级进程间通信机制,分别对应三类 IPC 对象:消息队列、共享内存、信号量
    • 这三类 IPC 对象存在于内核而非文件系统,由用户控制和管理,不像管道那样由内核控制。特别要注意的是,当若干个进程创建并使用了一个 IPC 对象,如果不主动销毁,它将持续存在于内核空间,除非重启。
    • IPC 对象通过其标识符来引用和访问,所有 IPC 对象在内核空间中有唯一的标识 ID,在用户空间中有唯一的 key(非负整数)
    • Linux IPC 继承自 System V IPC。
  • System V IPC 对象的访问

    • IPC 对象是全局对象,可用ipcsipcrm等命令查看或删除

      1
      2
      3
      ipcs -q    # 查看消息队列信息
      ipcs -m # 查看共享内存信息
      ipcs -s # 查看信号量信息
    • 每个 IPC 对象都通过对应的get系统调用创建

  • IPC 对象信息的数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    struct ipc_perm {
    uid_t uid; // owner's effective user id
    git_t gid; // owner's effective group id
    uid_t cuid; // creator's effective user id
    gid_t cgid; // creator's effective gruop id
    mode_t mode; // access mode
    // ...
    };

一、消息队列概述

  • 消息队列是内核中的一个链表,一个消息就是链表的一个节点。

  • 用户进程将数据传输到内核后,内核另外添加一些包括用户 ID、组 ID、进程 ID、优先级在内的信息后,打成的一个数据包称为消息。

  • 允许一个或多个进程往消息队列中写入或读取消息,但是一个消息只能被一个进程读取,读取完毕后自动删除

  • 消息队列具有一定 FIFO 特性,也可以按照一些特定的方式读取。

  • 消息队列的实现包括打开与创建、发送消息、读取消息、控制消息队列(获取或修改属性、销毁)。

  • 消息队列信息的数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    struct msqid_ds {
    struct ipc_perm msg_perm;
    msgqnum_t msg_qnum; // the number of messages on queue
    msglen_t msg_qbytes; // the max number of bytes on queue
    pid_t msg_lspid; // the pid of the last msgsnd()
    pid_t msg_lrpid; // the pid of the last msgrcv()
    time_t msg_stime; // the last msgsnd() time
    time_t msg_ctime; // last change time
    // ...
    }

二、消息队列的相关操作

  • 打开或创建消息队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    #include <sys/msg.h>
    int msgget(key_t key, int flag);
    /*
    成功返回内核中消息队列的标识 ID,出错返回 -1

    参数 key 是用户指定的键值,为一非负整数。
    - 用户任意指定一个数,只要保证不会和先前存在的 key 冲突即可
    - 设置为 IPC_PRIVATE(0),但是 key 为 0 的时候这个队列是不能打开的
    - 通过 ftok(const char *path, int id) 获得,即传入一个文件路径和一个数,通过某些算法
    自动生成一个 key,有一定冲突的可能

    参数 flag 一般为 IPC_CREAT | IPC_EXCL | {访问权限,如 0777}
    - 这种情况下,如果不存在则创建,如果存在将返回 -1
    - 如果只是要打开,flag 只需要访问权限即可
    */
  • 控制消息队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #include <sys/msg.h>
    int msgctl(int msgid, int cmd, struct msqid_ds *buf);
    /*
    成功返回 0,出错返回 -1

    参数 msgid 为消息队列 ID

    参数 cmd 为操作的类型
    - IPC_STAT 获取消息队列属性,此时 buf 为传出型参数
    - IPC_SET 设置消息队列属性,此时 buf 为传入型参数
    - IPC_RMID 删除消息队列队列,此时 buf 设置为 NULL
    */
  • 发送消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #include <sys/msg.h>
    int msgsnd(int msgid, const void *ptr, size_t nbytes, int flag);
    /*
    成功返回 0,出错返回 -1

    参数 msgid 为消息队列 ID

    参数 ptr 为指向消息的指针,消息是一个自定义的结构体:
    struct mymsg {
    long mtype; // 必须有此成员,指定消息的类型,为一非负整数

    // 以下为自定义消息的内容,例如
    char text[512];
    int i;
    double d;
    ...
    };

    参数 nbytes 为消息内容的大小,不包括 mtype,故应设置为
    sizeof(struct mymsg) - sizeof(long)

    参数 flag 用于设置阻塞类型
    - 0 表示阻塞。若消息队列已满(消息总数达到系统限制值/队列总字节数达到系统限制值)
    则进程阻塞直至 1)有空间可以容纳要发送的消息; 2)此队列被删除; 3)捕捉到一个信
    号并从处理函数返回
    - IPC_NOWAIT 表示非阻塞,此参数会使得 msgsnd 在无法发送消息时直接出错并返回,
    errno 设置为 EAGAIN
    */
  • 读取消息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    #include <sys/msg.h>
    ssize_t msgrcv(int msgqid, void *ptr, size_t nbytes, long type, int flag);
    /*
    成功返回消息的数据部分的字节数,出错返回 -1

    参数 msgqid ptr nbytes flag 含义同上

    参数 type 为指定消息的类型
    (由于消息队列具有一定的 FIFO 特性,为避免混淆,将“第一条消息”称为“最早发送的消息”)
    - type = 0 表示获取消息队列的最早发送的消息
    - type > 0 表示获取队列中类型为 type 的最早发送的消息
    - type < 0 表示获取队列中类型小于或等于 type 绝对值中最小的那个类型的最早发送
    的消息
    */

三、综合测试:使用消息队列进行进程间通信,并考察消息接收的不同特性

1
2
3
4
5
6
7
8
9
10
11
12
13
// mymsg.h

#ifndef _MYMSG_H_
#define _MYMSG_H_

typedef struct {
long mtype; // 必须要有的 long 类型成员
char str[512]; // 自定义内容 1
int i; // 自定义内容 2
double d; // 自定义内容 3
} message;

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// msgq_w.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include <string.h>
#include "mymsg.h"

int main(int argc, char *argv[])
{
if (argc < 2) {
fprintf(stderr, "usage: %s key\n", argv[0]); // 命令行传入用户指定的 key
exit(1);
}

key_t key = atoi(argv[1]);
int id;
if ((id = msgget(key, IPC_CREAT | IPC_EXCL | 0777)) < 0) {
// 创建消息队列
perror("msgget error");
exit(1);
}
// 设置一些消息
message m1 = {5l, "first", 1, 3.14};
message m2 = {2l, "second", 2, 6.28};
message m3 = {1l, "third", 3, 9.42};
message m4 = {7l, "forth", 4, 12.56};
message m5 = {7l, "fifth", 5, 15.70};
message m6 = {8l, "sixth", 6, 18.84};

// 发送消息
if (msgsnd(id, &m1, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m1 error");
}
if (msgsnd(id, &m2, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m2 error");
}
if (msgsnd(id, &m3, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m3 error");
}
if (msgsnd(id, &m4, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m4 error");
}
if (msgsnd(id, &m5, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m5 error");
}
if (msgsnd(id, &m6, sizeof(message) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd m6 error");
}

// 获取队列中的消息条数
struct msqid_ds buf;
if (msgctl(id, IPC_STAT, &buf) < 0) {
perror("msgctl error"); exit(1);
} else {
printf("Now the message queue %d contains %lu messages\n",
id, buf.msg_qnum);
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// msgq_r.c

#include <stdio.h>
#include <stdlib.h>
#include <sys/msg.h>
#include "mymsg.h"

int main(int argc, char *argv[])
{
if (argc < 3) {
// 命令行传入消息队列的 key 和指定的消息类型 type
fprintf(stderr, "usage: %s key type\n", argv[0]);
exit(1);
}
key_t key = atoi(argv[1]);
long type = (long)atoi(argv[2]);

int id;
if ((id = msgget(key, 0777)) < 0) {
// 打开消息队列,flag 参数只需要传入操作权限
perror("msgget error");
exit(1);
}

message m;
ssize_t size;
if ((size = msgrcv(id, &m, sizeof(message) - sizeof(long),
type, IPC_NOWAIT)) < 0) {
perror("msgrcv error");
exit(1);
} else {
printf("receive %ld bytes from msgq %d:\n", size, id);
printf("(type = %ld)str: %s, i: %d, d: %.2f\n",
type, m.str, m.i, m.d);
}

return 0;
}

测试内容

  • 随意指定一个正整数key1,运行./msgq_w key1,使用ipcs -q观察是否被创建。
  • 再次运行./msgq_w key1,观察报错信息。
  • 任取另一个正整数key2,运行./msgq_w key2,使用ipcs -q观察是否被创建。
  • 使用ipcrm删除key2对应的消息队列。
  • 运行./msgq_r key1 type,观察消息队列的读特性
    • type不存在(如 100),观察报错信息
    • type为 2,直接指定类型为 2 的消息 “second”
    • type为 0,获取队列中的第一条消息 “first”
    • type为 7,获取类型为 7 的第一条消息 “forth”
    • type为 -8,获取类型小于等于 8 的消息中类型最小且最早发送的消息 “third”

7.4 共享内存

一、共享内存概述

  • 共享内存是被多个进程共享的一部分物理内存

  • 多个进程都可以把共享内存映射到自己的虚拟内存空间,通过映射的虚拟内存地址进行操作,从而达到进程间通信的目的。

  • 共享内存是进程间共享数据的最快的方法,是效率最高的 IPC 机制

  • 共享内存本身不提供同步机制。

  • 共享内存信息的数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    struct shmid_ds {
    struct ipc_perm shm_perm;
    size_t shm_segsz; // size of segment in bytes
    pid_t shm_lpid; // the pid the last shmop()
    pid_t shm_cpid; // the pid of its creator
    shmatt_t shm_nattch; // the number of current attaches
    time_t shm_atime; // last-attach time
    time_t shm_dtime; // last-detach time
    time_t shm_ctime; // last-change time
    // ...
    }

二、共享内存的操作

  • 创建共享内存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #include <sys/shm.h>
    int shmget(key_t key, size_t size, int shmflg);
    /*
    成功返回内核中的标识 ID,出错返回 -1

    参数 size 指定共享内存的大小(字节)

    参数 shmflg 一般为 IPC_CREAT IPC_EXCL 0660 等权限组合

    可能产生的 errno
    - EINVAL 无效的内存段大小
    - EEXIST 内存段已经存在,无法创建
    - EIDRM 内存段已经被删除
    - ENOENT 内存段不存在
    - EACCES 无权限访问
    - ENOMEM 没有足够的内存来创建内存段
    */
  • 控制共享内存

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #include <sys/shm.h>
    int shmctl(int shmid, int cmd, struct shmid_ds *buf);
    /*
    成功返回 0,出错返回 -1

    参数 cmd 指定操作类型
    - IPC_STAT 获取共享内存属性
    - IPC_SET 设置共享内存属性
    - IPC_RMID 删除共享内存
    - SHM_LOCK 锁定共享内存,即该物理内存不和外存进行换入换出操作
    - SHM_UNLOCK 解除上述锁定
    */
  • 将共享内存映射到虚存和解除映射

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    #include <sys/shm.h>
    void* shmat(int shmid, char *shmaddr, int shmflg);
    /*
    成功返回虚拟内存地址,出错返回 -1

    参数 shmaddr 为所欲映射的虚存地址,一般设置为 0,此时由操作系统分配

    参数 shmflg 指定特殊要求,若 shmaddr 为 0,则此参数也应设置为 0
    - SHM_RND 随机分配
    - SHM_LBA 地址为 2 的乘方
    - SHM_RDONLY 以只读方式连接

    可能产生的 errno
    - EINVAL 无效的 ID 值或无效的地址
    - ENOMEM 没有足够的内存
    - EACCES 无存取权限

    注意:子进程不继承父进程创建的共享内存。但是如果在 fork 之前进行映射,子进程也将得到
    父进程映射的地址。
    */

    int shmdt(char *shmaddr); // 解除映射,成功返回 0,出错返回 -1

三、案例:父进程创建共享内存并写入数据,子进程计算,并利用管道同步

1
2
3
4
5
6
7
8
9
10
11
12
// pipe.h
#ifndef _PIPE_H_
#define _PIPE_H_

/* 利用管道实现同步 */

extern void init_pipe();
extern void pipe_wait();
extern void pipe_notify();
extern void destroy_pipe();

#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include "pipe.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>

static int fd[2];

void init_pipe() // 初始化管道
{
if (pipe(fd) < 0) {
perror("init_pipe error");
exit(1);
}
}

void pipe_wait() // 利用管道的阻塞特性,通过调用 read 实现阻塞
{
char c;
if (read(fd[0], &c, 1) < 0) {
perror("pipe_wait error");
exit(1);
}
}
void pipe_notify() // 利用管道的阻塞特性,随意写一个字符解除阻塞
{
char c = 'c';
if (write(fd[1], &c, 1) < 0) {
perror("pipe_notify error");
exit(1);
}
}

void destroy_pipe() // 销毁管道(关闭文件描述符)
{
close(fd[0]);
close(fd[1]);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include "pipe.h"

int main()
{
// 父进程创建共享内存,大小为 100 个 int
int id = shmget(IPC_PRIVATE, 100 * sizeof(int),
IPC_CREAT | IPC_EXCL | 0777);
if (id < 0) {
perror("shmget error");
exit(1);
}
// 父进程初始化管道
init_pipe();

pid_t pid;
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid > 0) {
// 父进程进行映射,这一步也可以放在 fork 之前
int *p = (int *)shmat(id, 0, 0);
// 很有可能映射失败,所以要判断。注意,p 应该和强转为 int* 的 -1 比较
if (p == (int *)-1) {
perror("shmat error");
exit(1);
}
// 父进程写入数据
for (int i = 1; i <= 100; i++) {
*(p + i - 1) = i;
}
// 父进程解除映射
shmdt(p);
// 父进程通知子进程读取
pipe_notify();
// 父进程等待子进程结束
wait(NULL);
// 父进程待子进程结束后销毁管道
destroy_pipe();
// 父进程待子进程结束后释放共享内存
if (shmctl(id, IPC_RMID, NULL) < 0) {
perror("shmct error");
exit(1);
} else {
printf("destroy shared memory success\n");
}
} else {
// 子进程先阻塞,等待父进程通知
pipe_wait();
// 子进程进行映射。如果父进程在 fork 之前已经映射,则子进程可以直接引用指针 p
int *p = (int *)shmat(id, 0, 0);
if (p == (int *)-1) {
perror("shmat error");
exit(1);
}
// 子进程读取并求和
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += *(p + i);
}
printf("sum = %d\n", sum);
// 子进程解除映射
shmdt(p);
}

return 0;
}

7.5 进程信号量

一、进程信号量概述

  • 信号量本质上就是共享资源的数目,信号量可以用于进程间的同步和互斥。

  • 每种共享资源对应一个信号量,为了便于实现大量共享资源的控制,引入信号量集。可以对信号量集中的所有信号量统一操作,也可以选取部分操作。

  • 信号量集信息的数据结构

    1
    2
    3
    4
    5
    6
    7
    struct semid_ds {
    struct ipc_perm sem_perm;
    unsigned short sem_nsems; // the number of semaphores in set
    time_t sem_otime; // the last semop() time
    time_t sem_ctime; // the last change time
    // ...
    }

二、信号量集的操作

  • 创建信号量集

    1
    2
    3
    4
    5
    6
    7
    #include <sys/sem.h>
    int semget(key_t key, int nsems, int flag);
    /*
    成功返回信号量集 ID,出错返回 -1

    参数 nsem 指定信号量集中的信号量个数,其余参数含义同前
    */
  • 控制信号量集

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    #include <sys/sem.h>
    int semctl(int semid, int semnum, int cmd, .../* union semun arg */);
    /*
    成功返回 0,出错返回 -1

    参数 semnum 指定要操作的信号量编号,编号从 0 到 nsems - 1。特别,如果是对所有信号量
    的操作,应设置 semnum 为 0,表示无效参数

    参数 cmd 指定操作类型
    - IPC_STAT 获取信号量集属性,semnum 应设为 0,属性由联合中的 buf 传出
    - IPC_SET 设置信号量集属性,semnum 应设为 0,属性由联合中的 buf 传入
    - IPC_RMID 销毁信号量集,semnum 应设为 0,第四个参数应设为 NULL
    - GETVAL 获取编号为 sennum 的信号量的值,值由联合中的 val 传出
    - SETVAL 设置编号为 semnum 的信号量的值,值由联合中的 val 传入
    - GETALL 获取所有信号量的值,semnum 应设为 0,值由联合中的 array 传出
    - SETALL 设置所有信号量的值,semnum 应设为 0,值由联合中的 array 传入

    第四个参数的联合类型应自定义
    */
    union senun {
    int val;
    struct semid_ds *buf;
    unsigned short *array;
    }
  • 信号量的 PV 操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    #include <sys/sem.h>
    int semop(int semid, struct sembuf *sops, size_t nsops);
    struct sembuf {
    unsigned short sem_num;
    short sem_op;
    short sem_flg;
    }
    /*
    返回值:成功返回 0,出错返回 -1
    功能:对信号量集中的信号量做 PV 操作,实现进程间的同步和互斥

    参数 sops 为 struct sembuf 的结构体数组,这个结构体类型无需自定义
    - 结构体中的成员 sem_num 表示要操作的信号量编号
    - sem_op 为正数表示 V 操作(加操作),为负数表示 P 操作(减操作)
    加或减的值不一定为 1
    一般地,当进行一次 P 操作后信号量的值为负,则阻塞,功能同线程信号量
    - sem_flg 可选 IPC_NOWAIT SEM_UNDO,前者为非阻塞,后者在进程没有释放共享资源就
    退出时,由内核代为释放

    参数 nsops 为 sops 数组长度,即要操作的信号量的个数
    */

三、案例一:使用信号量实现互斥(银行取款模拟)

注:相关函数忽略不合法参数的判断,如存款为负数。假设参数取值均正确。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <assert.h>
#include <sys/shm.h>
#include <sys/sem.h>

typedef struct {
int blc; // 账户余额
// -------
int semid; // 绑定一个信号量集
// -------
} Act;

void save(Act *a, int amt) // 存款
{
assert(a != NULL);
// ------------------------------------------
struct sembuf sop_p[] = {{0, -1, SEM_UNDO}};
struct sembuf sop_v[] = {{0, 1, SEM_UNDO}};
semop(a->semid, sop_p, 1); // 对编号为 0 的信号量做 -1 操作 P(1)
// ------------------------------------------
int amount = amt;
sleep(1); // 睡眠,模拟真实情况下的延迟
a->blc = amount;
// ------------------------
semop(a->semid, sop_v, 1); // 对编号为 0 的信号量做 +1 操作 V(1)
// ------------------------
}

int deposit(Act *a, int amt) // 取款
{
assert(a != NULL);
// ------------------------------------------
struct sembuf sop_p[] = {{0, -1, SEM_UNDO}};
struct sembuf sop_v[] = {{0, 1, SEM_UNDO}};
semop(a->semid, sop_p, 1); // 对编号为 0 的信号量做 -1 操作 P(1)
// ------------------------------------------
int balance = a->blc;
if (balance < amt) {
// ------------------------
semop(a->semid, sop_v, 1); // 对编号为 0 的信号量做 +1 操作 V(1)
// ------------------------
return 0;
}
balance -= amt;
sleep(1); // 睡眠,模拟真实情况下的延迟
a->blc = balance;
// ------------------------
semop(a->semid, sop_v, 1); // 对编号为 0 的信号量做 +1 操作 V(1)
// ------------------------
return amt;
}

int get_balance(Act *a) // 查看余额
{
assert(a != NULL);
// ------------------------------------------
struct sembuf sop_p[] = {{0, -1, SEM_UNDO}};
struct sembuf sop_v[] = {{0, 1, SEM_UNDO}};
semop(a->semid, sop_p, 1); // 对编号为 0 的信号量做 -1 操作 P(1)
// ------------------------------------------
int balance = a->blc;
sleep(1); // 睡眠,模拟真实情况下的延迟
// ------------------------
semop(a->semid, sop_v, 1); // 对编号为 0 的信号量做 +1 操作 V(1)
// ------------------------
return balance;
}

// ---------------------------------------------------------------
void init_sem(Act *a) // 初始化信号量集
{
assert(a != NULL);
// 获取有一个信号量的信号量集
if ((a->semid = semget(IPC_PRIVATE, 1,
IPC_CREAT | IPC_EXCL | 0777)) < 0) {
perror("semget error");
exit(1);
}
// 自定义联合
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
union semun un;
// 设置单个信号量的值,取联合的 val 成员,且设置初值为 1,表示只有一个共享资源
un.val = 1;
if (semctl(a->semid, 0, SETVAL, un) < 0) {
perror("semctl error");
exit(1);
}
}

void destroy_sem(Act *a) // 销毁信号量集
{
assert(a != NULL);
semctl(a->semid, 0, IPC_RMID, NULL);
}
// ---------------------------------------------------------------

int main()
{
// 父进程创建共享内存并存款 10000
int shmid = shmget(IPC_PRIVATE, sizeof(Act),
IPC_CREAT | IPC_EXCL | 0777);
if (shmid < 0) {
perror("shmget error");
exit(1);
}
Act *a = (Act *)shmat(shmid, 0, 0);
if (a == (Act *) -1) {
perror("shmat error");
exit(1);
}
save(a, 10000);

// ---------
init_sem(a); // 初始化信号量集
// ---------

pid_t pid;
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid > 0) {
// 父进程取款 10000
int amt = deposit(a, 10000);
int bcl = get_balance(a);
printf("%d get money %d, balance %d\n", getpid(), amt, bcl);
// 父进程等待子进程结束
wait(NULL);
// ------------
destroy_sem(a); // 待子进程结束后销毁信号量集
// ------------
// 解除与共享内存的映射
shmdt(a);
// 待子进程结束后释放共享内存
shmctl(shmid, IPC_RMID, NULL);
} else {
// 子进程取款 10000
int amt = deposit(a, 10000);
int bcl = get_balance(a);
printf("%d get money %d, balance %d\n", getpid(), amt, bcl);
// 解除与共享内存的映射
shmdt(a);
}

return 0;
}

说明:将例程中// ----之间的语句全部注释,编译运行,大概率会出现账户总额只有 10000,而父子进程均取出 10000 的情况,这是取款时sleep(1)的延迟导致的。加上信号量集相关操作后,当一个进程存款 / 取款 / 查看余额时,会进行 P(1) 操作,将信号量的值变为 0;另一个进程如果要访问账户,再次 P(1) 操作,信号量的值变成负数,将会阻塞。只有当前一个进程访问完毕后,进行 V(1) 操作,先前阻塞的进程才能继续访问。

四、案例二:利用信号量实现同步(读者写者问题)

思路

截屏2023-01-12 11.38.46

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/wait.h>
#include <assert.h>

typedef struct {
int val;
int semid; // 共享资源绑定信号量集
} Storage;

void writenum(Storage *s, int v) // 写入数据
{
assert(s != NULL);

s->val = v;
printf("process %d writes %d\n", getpid(), v);

struct sembuf v1[] = {{1, 1, SEM_UNDO}};
struct sembuf p0[] = {{0, -1, SEM_UNDO}};

semop(s->semid, v1, 1); // 对 s1 做 V(1) 操作
semop(s->semid, p0, 1); // 对 s0 做 P(1) 操作
}

int readnum(Storage *s) // 读取数据
{
assert(s != NULL);

struct sembuf p1[] = {{1, -1, SEM_UNDO}};
struct sembuf v0[] = {{0, 1, SEM_UNDO}};

semop(s->semid, p1, 1); // 对 s1 做 P(1) 操作
int num = s->val;
printf("process %d reads %d\n", getpid(), num);
semop(s->semid, v0, 1); // 对 s0 做 V(1) 操作

}

void initsem(Storage *s) // 初始化信号量集
{
assert(s != NULL);
if ((s->semid = semget(IPC_PRIVATE, 2, /* 两个信号量 */
IPC_CREAT | IPC_EXCL | 0777)) < 0) {
perror("semget error");
exit(1);
}
union semun { // 自定义联合
int val;
struct semid_ds *buf;
unsigned short *array;
};
unsigned short array[] = {0, 0};
union semun un;
un.array = array; // 设置所有信号量的初值,使用联合的 array 成员
if (semctl(s->semid, 0, SETALL, un) < 0) {
perror("semctl error");
exit(1);
}
}

void destroysem(Storage *s) // 销毁信号量集
{
assert(s != NULL);
if (semctl(s->semid, 0, IPC_RMID, NULL) < 0) {
perror("semctl error");
exit(1);
}
}

int main()
{
// 创建共享内存,内容为一个 Storage 类型
int shmid = shmget(IPC_PRIVATE, sizeof(Storage),
IPC_CREAT | IPC_EXCL | 0777);
if (shmid < 0) {
perror("shmget error");
exit(1);
}
// 将共享内存映射到虚拟内存,父子进程共享 s 指针
Storage *s = (Storage *)shmat(shmid, 0, 0);
if (s == (Storage *)-1) {
perror("shmat error");
exit(1);
}
// 父进程初始化信号量集
initsem(s);

pid_t pid;
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid > 0) {
for (int i = 1; i <= 10; i++) {
writenum(s, i * i);
}
wait(NULL); // 父进程等待子进程结束
destroysem(s); // 父进程销毁信号量集(这一步用到 s 指针,必须在 shmdt 之前!!)
shmdt(s); // 父进程解除映射
shmctl(shmid, IPC_RMID, NULL); // 父进程释放共享内存
} else {
for (int i = 1; i <= 10; i++) {
readnum(s);
}
shmdt(s); // 子进程解除映射
}

return 0;
}