作者:武特
學習了信號量以及共享內存後,我們就可以實現進程的同步與互斥了。說到這裡,最經典的例子莫過於生產者和消費者模型。下面就和大家一起分析,如何一步步實現這個經典模型。完整代碼可以在這裡下載。
下面程序,實現的是多個生產者和多個消費者對N個緩沖區(N個貨架)進行訪問的例子。現在先想想我們以前的偽代碼是怎麼寫的?是不是這樣:
//生產者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
while(1)
{
p(semid,1);
sleep(3);
p(semid,0);
//producer is producing a product
goods=rand()%10;//product a goods
shmaddr[indexaddr[0]]=goods;//The goods is placed on a shelf
printf("producer:%d produces a product[%d]:%d
",getpid(),indexaddr[0],goods);
indexaddr[0]=(indexaddr[0]+1)%10;
v(semid,0);
sleep(3);
v(semid,2);
}
//消費者:
1
2
3
4
5
6
7
8
9
10
11
12
13
while(1)
{
p(semid,2);
sleep(1);
p(semid,0);
//consumer is consuming a product
goods=shmaddr[indexaddr[1]];//The goods on the shelf is taken down
printf("consumer:%d consumes a product[%d]:%d
",getpid(),indexaddr[1],goods);
indexaddr[1]=(indexaddr[1]+1)%num;
v(semid,0);
sleep(1);
v(semid,1);
}
可能上面的代碼你有些眼熟,又有些困惑,因為它和課本上的代碼不完全一樣,其實上面的代碼就是偽代碼的linuxC語言具體實現。我們從上面的代碼中慢慢尋找偽代碼的蹤跡:p(semid,0)和v(semid,0)的作用是讓進程互斥訪問臨界區。臨界區中包含的數據indexaddr[0],indexaddr[1],以及shmaddr數組分別對應偽代碼中的in,out,buffer。p(semid,1)和v(semid,2)以及p(semid,2)和v(semid,1)實現的是同步作用。
並且,在生產者中,生產者生產了一個貨物(goods=rand()%10;),然後將這個貨物放上貨架(shmaddr[indexaddr[0]]=goods;)。在消費者中,消費和從貨架上取下貨物(goods=shmaddr[indexaddr[1]];)。
好了,現在再看一邊上面的代碼,我想你的思路就清晰了。
了解了核心代碼,並不能算就完成了生產者和消費者模型,因為生產者和消費者核心代碼前還得做一些些准備工作,具體要准備些什麼,我們具體來分析。
首先申請一塊共享內存,這塊共享內存用於存放生產者所生產的貨物。同時我們可以看到這塊共享內存大小為10字節。這裡需要注意,每個生產著或消費者運行後,都要去試著分配這樣的一塊共享內存。如果在當前進程運行前已經有某個進程已經創建了這塊共享內存,那麼這個進程就不再創建(此時createshm會返回-1並且錯誤代碼為EEXIST),只是打開這塊共享內存。創建後,再將這塊共享內存添加到當前進程的地址空間。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
num=10;
//create a shared memory as goods buffer
if((shmid_goods=createshm(".",s,num))==-1)
{
if(errno==EEXIST)
{
if((shmid_goods=openshm(".",s))==-1)
{
exit(1);
}
}
else
{
perror("create shared memory failed
");
exit(1);
}
}
//attach the shared memory to the current process
if((shmaddr=shmat(shmid_goods,(char*)0,0))==(char*)-1)
{
perror("attach shared memory error
");
exit(1);
}
接下來還要再申請一塊共享內存,用於存放兩個整形變量in和out(其實就是申請一個含有2個整形變量的數組而已)。他們記錄的是生產和消費貨物時“貨架”的索引。與上面情況相同,如果已經有其他進程創建了此塊共享內存,那麼當前進程只是打開它而已。
注意這裡對兩個整形變量的初始化時的值均為0。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//create a shared memory as index
if((shmid_index=createshm(".",z,8))==-1)
{
if(errno==EEXIST)
{
if((shmid_index=openshm(".",z))==-1)
{
exit(1);
}
}
else
{
perror("create shared memory failed
");
exit(1);
}
}
else
{
is_noexist=1;
}
//attach the shared memory to the current process
if((indexaddr=shmat(shmid_index,(int*)0,0))==(int*)-1)
{
perror("attach shared memory error
");
exit(1);
}
if(is_noexist)
{
indexaddr[0]=0;
indexaddr[1]=0;
}
接下來就是創建一個信號量集,這個信號量集中包含三個信號量。第一個信號量實現的互斥作用,即進程對臨界區的互斥訪問。剩下兩個均實現的是同步作用,協調生產者和消費者的合理運行,即貨架上沒有空位時候生產者不再生產,貨架上無商品時消費者不再消費。
注意下面對每個信號量的賦值情況。互斥信號量當然初值為1。而同步信號量兩者之和不能大於num的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//create a semaphore set including 3 semaphores
if((semid=createsem(".",t,3,0))==-1)
{
if(errno==EEXIST)
{
if((semid=opensem(".",t))==-1)
{
exit(1);
}
}
else
{
perror("semget error:");
exit(1);
}
}
else
{
union semun arg;
//seting value for mutex semaphore
arg.val=1;
if(semctl(semid,0,SETVAL,arg)==-1)
{
perror("setting semaphore value failed
");
return -1;
}
//set value for synchronous semaphore
arg.val=num;
//the num means that the producer can continue to produce num products
if(semctl(semid,1,SETVAL,arg)==-1)
{
perror("setting semaphore value failed
");
return -1;
}
//the last semaphores value is default
//the default value 0 means that the consumer is not use any product now
}
基本上這樣,就算完成了生產者和消費者的前期工作。我們可以看到,在核心代碼中,我們只需要“裝模作樣”的將代碼“各就各位”即可,當然這需要你理解生產者消費者這個基本模型。而在下面的准備代碼中,則需要我們理解關於信號量和共享內存的一些基本函數。
最後再說說使用,建議先運行一個生產者和一個消費者,觀察兩者是如何協調工作的。然後再只運行一個生產者或一個消費者,看其是否會阻塞。了解了以上情況後,你就可以同時運行多個生產者和消費者了