管道

创建匿名管道

实际管道的创建调用的是系统调用pipe(),该函数建了一个管道 pipe,返回了两个文件描述符,这表示管道的两端,一个是管道的读取端描述符 fd[0],另一个是管道的写入端描述符 fd[1]fd[2]为管道标号。

1
int pipe(int fd[2])

如果对于 fd[1]写入,调用的是 write(),向 pipe_buffer 里面写入数据;如果对于 fd[0]的读入,调用的是 read(),也就是从 pipe_buffer 里面读取数据。至此,我们在一个进程内创建了管道,但是尚未实现进程间通信。

要注意,在管道中没有数据的情况下,对管道的读操作会阻塞,直到管道内有数据为止。这就是为什么示例实验中的父子进程之间的执行是交替的。

当然写操作也不会再所有情况下都不阻塞。这里我们要先来了解一下管道的内核实现。管道实际上就是内核控制的一个内存缓冲区,既然是缓冲区,就有容量上限。我们把管道一次最多可以缓存的数据量大小叫做PIPESIZE。内核在处理管道数据的时候,底层也要调用类似read和write这样的方法进行数据拷贝,这种内核操作每次可以操作的数据量也是有限的,一般的操作长度为一个page,即默认为4k字节。我们把每次可以操作的数据量长度叫做PIPEBUF。POSIX标准中,对PIPEBUF有长度限制,要求其最小长度不得低于512字节。PIPEBUF的作用是,内核在处理管道的时候,如果每次读写操作的数据长度不大于PIPEBUF时,保证其操作是原子的。而PIPESIZE的影响是,大于其长度的写操作会被阻塞,直到当前管道中的数据被读取为止。

匿名管道通信

匿名管道实现A|B进程间的通信的原理如下:

  • 利用fork创建子进程,复制file_struct会同样复制fd输入输出数组,但是fd指向的文件仅有一份,即两个进程间可以通过fd数组实现对同一个管道文件的跨进程读写操作
  • 禁用父进程的读,禁用子进程的写,即从父进程写入从子进程读出,从而实现了单向管道,避免了混乱
  • 对于A|B来说,shell首先创建子进程A,接着创建子进程B,由于二者均从shell创建,因此共用fd数组。shell关闭读写,A开写B开读,从而实现了AB之间的通信。

接着我们需要调用dup2()实现输入输出和管道两端的关联,该函数会将fd赋值给fd2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Duplicate FD to FD2, closing the old FD2 and making FD2 be
open the same file as FD is. Return FD2 or -1. */
int __dup2 (int fd, int fd2)
{
if (fd < 0 || fd2 < 0)
{
__set_errno (EBADF);
return -1;
}
if (fd == fd2)
/* No way to check that they are valid. */
return fd2;
__set_errno (ENOSYS);
return -1;
}

files_struct 里面,有这样一个表,下标是 fd,内容指向一个打开的文件 struct file。在这个表里面,前三项是定下来的,其中第零项 STDIN_FILENO 表示标准输入,第一项 STDOUT_FILENO 表示标准输出,第三项 STDERR_FILENO 表示错误输出。

1
2
3
struct files_struct {
struct file __rcu * fd_array[NR_OPEN_DEFAULT];
}
  • 在 A 进程写入端通过dup2(fd[1],STDOUT_FILENO)STDOUT_FILENO(也即第一项)不再指向标准输出,而是指向创建的管道文件,那么以后往标准输出写入的任何东西,都会写入管道文件。
  • 在 B 进程中读取端通过dup2(fd[0],STDIN_FILENO)STDIN_FILENO 也即第零项不再指向标准输入,而是指向创建的管道文件,那么以后从标准输入读取的任何东西,都来自于管道文件。

至此,我们将 A|B 的功能完成。

示例实验与上面的过程略有不同(没有上面那么麻烦),因为不会涉及shell进程,所以是直接在父子进程中进行消息传递,而不是创建两个子进程。

独立实验

与示例实验思路相近,使用了两个管道。不同的是,两个管道都是由子进程向父进程发送数据。所以子进程直接关闭读,父进程直接关闭写。

出现的一些问题:不能在父进程中并列地进行进程创建,或者说,不能在父进程中同时声明子进程的创建,这会导致第二个子进程会被第一个子进程再创建一遍。所以第二个子进程的创建应该在第一个子进程内进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

int main(int argc, char *argv[]) {
int pid1;
int pid2;
int pipe1[2]; //存放第一个无名管道标号
int pipe2[2]; //存放第二个无名管道标号
int x,y;
printf("输入x:\n");
scanf("%d",&x);
while(x<1){
printf("输入非法,重新输入x:\n");
scanf("%d",&x);
}

printf("输入y:\n");
scanf("%d",&y);
while(y<1){
printf("输入非法,重新输入y:\n");
scanf("%d",&y);
}

if (pipe(pipe1) < 0) {
perror("pipe not create");
exit(EXIT_FAILURE);
}
if (pipe(pipe2) < 0) {
perror("pipe not create");
exit(EXIT_FAILURE);
}

if ((pid1 = fork()) < 0) {
perror("process not create");
exit(EXIT_FAILURE);
}else if (pid1 > 0) {
pid2 = fork();
if (pid2 < 0){
perror("process not create");
exit(EXIT_FAILURE);
}else if (pid2 > 0){
printf("子进程创建成功!\n");
}
}

if (pid1 == 0) {//f(x)
close(pipe1[0]);

printf("child %d 读入x: %d\n", getpid(), x);
if(x > 1){
int a = 1;
for(int i = 2;i<=x;i++){
a = a * i;
}
x = a;
}
printf("f(x) = %d\n",x);
write(pipe1[1], &x, sizeof(int));

close(pipe1[1]);
exit(EXIT_SUCCESS);
}else if (pid2 == 0) {//f(y)
close(pipe2[0]);

printf("child %d 读入y: %d\n", getpid(), y);
if(y>2){
int a = 1,b = 1;
for(int i = 2;i<y;i++){
b = b + a;
a = b - a;
}
y = b;
}
printf("f(y) = %d\n",y);
write(pipe2[1], &y, sizeof(int));

close(pipe2[1]);
exit(EXIT_SUCCESS);
} else {//f(x,y)
close(pipe1[1]);
close(pipe2[1]);
int a,b;
read(pipe1[0], &a, sizeof(int));
read(pipe2[0], &b, sizeof(int));
printf("Parent %d 计算完成: %d\n", getpid(), a+b);
close(pipe1[0]);
close(pipe2[0]);
}
return EXIT_SUCCESS;
}