進(jìn)行多線(xiàn)程編程,最頭疼的就是那些共享的數據。因為你無(wú)法知道哪個(gè)線(xiàn)程會(huì )在哪個(gè)時(shí)候對它進(jìn)行操作,你也無(wú)法得知那個(gè)線(xiàn)程會(huì )先運行,哪個(gè)線(xiàn)程會(huì )后運行。下面介紹一些技術(shù),通過(guò)他們,你會(huì )合理安排你的線(xiàn)程之間對資源的競爭。
l 互斥體Mutex
l 信號燈Semophore
l 條件變量Conditions
先說(shuō)一下互斥量。
什么時(shí)候會(huì )用上互斥量了?比如你現在有一全局鏈表,你有幾個(gè)工作線(xiàn)程。每一個(gè)線(xiàn)程從該鏈表中取出頭節點(diǎn),然后對該頭節點(diǎn)進(jìn)行處理。比如現在線(xiàn)程1正在取出頭節點(diǎn),他的操作如下:
Item * p =queue_list;
Queue_list=queue_list->next;
Process_job(p);
Free(p);
當線(xiàn)程1處理完第一步,也就是Item *p=queue_list后,這時(shí)候系統停止線(xiàn)程1的運行,改而運行線(xiàn)程2。線(xiàn)程2照樣取出頭節點(diǎn),然后進(jìn)行處理,最后釋放了該節點(diǎn)。過(guò)了段時(shí)間,線(xiàn)程1重新得到運行。而這個(gè)時(shí)候,其實(shí)p所指向的節點(diǎn)已經(jīng)被線(xiàn)程2釋放掉,而線(xiàn)程1對此毫無(wú)知曉。他會(huì )接著(zhù)運行process_job(p)。而這將導致無(wú)法預料的后果!
對于這種情況,系統給我們提供了互斥量。你在取出頭節點(diǎn)前必須要等待互斥量,如果此時(shí)有其他線(xiàn)程已經(jīng)獲得該互斥量,那么線(xiàn)程將會(huì )阻塞在這個(gè)地方。只有等到其他線(xiàn)程釋放掉該互斥量后,你的線(xiàn)程才有可能得到該互斥量。為什么是可能了?因為可能此時(shí)有不止你一個(gè)線(xiàn)程在等候該互斥量,而系統無(wú)法保證你的線(xiàn)程將會(huì )優(yōu)先運行。
互斥量的類(lèi)型為pthread_mutex_t。你可以聲明多個(gè)互斥量。在聲明該變量后,你需要調用pthread_mutex_init()來(lái)創(chuàng )建該變量。pthread_mutex_init的格式如下:
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutex-
attr_t *mutexattr);
第一個(gè)參數,mutext,也就是你之前聲明的那個(gè)互斥量,第二個(gè)參數為該互斥量的屬性。這個(gè)將在后面詳細討論。
在創(chuàng )建該互斥量之后,你便可以使用它了。要得到互斥量,你需要調用下面的函數:
int pthread_mutex_lock(pthread_mutex_t *mutex);
該函數用來(lái)給互斥量上鎖,也就是我們前面所說(shuō)的等待操作?;コ饬恳坏┍簧湘i后,其他線(xiàn)程如果想給該互斥量上鎖,那么就會(huì )阻塞在這個(gè)操作上。如果在此之前該互斥量已經(jīng)被其他線(xiàn)程上鎖,那么該操作將會(huì )一直阻塞在這個(gè)地方,直到獲得該鎖為止。
在得到互斥量后,你就可以進(jìn)入關(guān)鍵代碼區了。
同樣,在操作完成后,你必須調用下面的函數來(lái)給互斥量解鎖,也就是前面所說(shuō)的釋放。這樣其他等待該鎖的線(xiàn)程才有機會(huì )獲得該鎖,否則其他線(xiàn)程將會(huì )永遠阻塞。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
下面給出一個(gè)簡(jiǎn)單的例子:
#include <malloc.h>
#include <pthread.h>
struct job {
/* Link field for linked list. */
struct job* next;
/* Other fields describing work to be done... */
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
while (1) {
struct job* next_job;
/* Lock the mutex on the job queue. */
pthread_mutex_lock (&job_queue_mutex);
/* Now it’s safe to check if the queue is empty. */
if (job_queue == NULL)
next_job = NULL;
else {
/* Get the next available job. */
next_job = job_queue;
/* Remove this job from the list. */
job_queue = job_queue->next;
}
/* Unlock the mutex on the job queue because we’re done with the
queue for now. */
pthread_mutex_unlock (&job_queue_mutex);
/* Was the queue empty? If so, end the thread. */
if (next_job == NULL)
break;
/* Carry out the work. */
process_job (next_job);
/* Clean up. */
free (next_job);
}
return NULL;
}
在這個(gè)例子中我們使用了下面一條語(yǔ)句:
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
他的作用和調用pthread_mutex_init()函數一樣。
如果一個(gè)線(xiàn)程已經(jīng)給一個(gè)互斥量上鎖了,后來(lái)在操作的過(guò)程中又再次調用了該上鎖的操作,那么該線(xiàn)程將會(huì )無(wú)限阻塞在這個(gè)地方,從而導致死鎖。怎么變了?這就需要我們之前所提到的互斥量的屬性。
互斥量分為下面三種:
l 快速型。這種類(lèi)型也是默認的類(lèi)型。該線(xiàn)程的行為正如上面所說(shuō)的。
l 遞歸型。如果遇到我們上面所提到的死鎖情況,同一線(xiàn)程循環(huán)給互斥量上鎖,那么系統將會(huì )知道該上鎖行為來(lái)自同一線(xiàn)程,那么就會(huì )同意線(xiàn)程給該互斥量上鎖。
l 錯誤檢測型。如果該互斥量已經(jīng)被上鎖,那么后續的上鎖將會(huì )失敗而不會(huì )阻塞,pthread_mutex_lock()操作將會(huì )返回EDEADLK。
互斥量的屬性類(lèi)型為pthread_mutexattr_t。聲明后調用pthread_mutexattr_init()來(lái)創(chuàng )建該互斥量。然后調用int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);來(lái)設置屬性。int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);格式如下:
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);
第一個(gè)參數,attr,就是前面聲明的屬性變量,第二個(gè)參數,kind,就是我們要設置的屬性類(lèi)型。他有下面幾個(gè)選項:
l PTHREAD_MUTEX_FAST_NP
l PTHREAD_MUTEX_RECURSIVE_NP
l PTHREAD_MUTEX_ERRORCHECK_NP
下面給出一個(gè)使用屬性的簡(jiǎn)單過(guò)程:
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mutex,&attr);
pthread_mutex_destroy(&attr);
前面我們提到在調用pthread_mutex_lock()的時(shí)候,如果此時(shí)mutex已經(jīng)被其他線(xiàn)程上鎖,那么該操作將會(huì )一直阻塞在這個(gè)地方。如果我們此時(shí)不想一直阻塞在這個(gè)地方,那么可以調用下面函數:
pthread_mutex_trylock()
如果此時(shí)互斥量沒(méi)有被上鎖,那么pthread_mutex_trylock()將會(huì )返回0,并會(huì )對該互斥量上鎖。如果互斥量已經(jīng)被上鎖,那么會(huì )立刻返回EBUSY。
上面談到的是使用互斥量。如果碰到下面這種情況,該怎么辦了?
還是上面程序中提到的工作鏈表。此時(shí)必然有一個(gè)生產(chǎn)者線(xiàn)程,用于往鏈表里添加節點(diǎn)。如果這一段時(shí)間沒(méi)有工作,那么工作線(xiàn)程將會(huì )不停的調用lock,unlock操作。而這樣的操作毫無(wú)疑義。
在這里系統給我們提供了另外一種同步機制,信號燈,Semaphore。
信號燈其實(shí)就是一個(gè)計數器,也是一個(gè)整數。每一次調用wait操作將會(huì )使semaphore值減一,而如果semaphore值已經(jīng)為0,則wait操作將會(huì )阻塞。每一次調用post操作將會(huì )使semaphore值加一。將這些操作用到上面的問(wèn)題中。工作線(xiàn)程每一次調用wait操作,如果此時(shí)鏈表中沒(méi)有節點(diǎn),則工作線(xiàn)程將會(huì )阻塞,直到鏈表中有節點(diǎn)。生產(chǎn)者線(xiàn)程在每次往鏈表中添加節點(diǎn)后調用post操作,信號燈值會(huì )加一。這樣阻塞的工作線(xiàn)程就會(huì )停止阻塞,繼續往下執行。
信號燈的類(lèi)型為sem_t。在聲明后必須調用sem_init()。需要傳遞兩個(gè)參數,第一個(gè)參數就是你之前聲明的sem_t變量,第二個(gè)必須為0。當你不再需要信號燈時(shí),你必須調用sem_destroy()來(lái)釋放資源。
等待信號燈的操作為sem_wait()。投遞一個(gè)信號的操作為sem_wait()。和互斥量一樣,等待信號燈也有一個(gè)非阻塞的操作,sem_trywait()。該操作在沒(méi)有信號燈的時(shí)候返回EAGAIN。
下面是一個(gè)結合了互斥量和信號燈的例子:
#include <malloc.h>
#include <pthread.h>
#include <semaphore.h>
struct job {
/* Link field for linked list. */
struct job* next;
/* Other fields describing work to be done... */
};
/* A linked list of pending jobs. */
struct job* job_queue;
/* A mutex protecting job_queue. */
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
/* A semaphore counting the number of jobs in the queue. */
sem_t job_queue_count;
/* Perform one-time initialization of the job queue. */
void initialize_job_queue ()
{
/* The queue is initially empty. */
job_queue = NULL;
/* Initialize the semaphore which counts jobs in the queue. Its
initial value should be zero. */
sem_init (&job_queue_count, 0, 0);
}
/* Process queued jobs until the queue is empty. */
void* thread_function (void* arg)
{
while (1) {
struct job* next_job;
/* Wait on the job queue semaphore. If its value is positive,
indicating that the queue is not empty, decrement the count by
1. If the queue is empty, block until a new job is enqueued. */
sem_wait (&job_queue_count);
/* Lock the mutex on the job queue. */
pthread_mutex_lock (&job_queue_mutex);
/* Because of the semaphore, we know the queue is not empty. Get
the next available job. */
next_job = job_queue;
/* Remove this job from the list. */
job_queue = job_queue->next;
/* Unlock the mutex on the job queue because we’re done with the
queue for now. */
pthread_mutex_unlock (&job_queue_mutex);
/* Carry out the work. */
process_job (next_job);
/* Clean up. */
free (next_job);
}
return NULL;
}
/* Add a new job to the front of the job queue. */
void enqueue_job (/* Pass job-specific data here... */)
{
struct job* new_job;
/* Allocate a new job object. */
new_job = (struct job*) malloc (sizeof (struct job));
/* Set the other fields of the job struct here... */
/* Lock the mutex on the job queue before accessing it. */
pthread_mutex_lock (&job_queue_mutex);
/* Place the new job at the head of the queue. */
new_job->next = job_queue;
job_queue = new_job;
/* Post to the semaphore to indicate that another job is available. If
threads are blocked, waiting on the semaphore, one will become
unblocked so it can process the job. */
sem_post (&job_queue_count);
/* Unlock the job queue mutex. */
pthread_mutex_unlock (&job_queue_mutex);
}
下面說(shuō)一下第三種同步機制—條件變量。
如果現在在等待一個(gè)信號。如果該信號被設置,則繼續運行。如果沒(méi)有條件變量,我們將會(huì )不停的去查詢(xún)該信號是否被設置,這樣就會(huì )浪費大量的cpu。而通過(guò)使用條件變量,我們就可以將等待信號的線(xiàn)程阻塞,直到有信號的時(shí)候再去喚醒它。
條件變量的類(lèi)型是pthread_cond_t。
下面簡(jiǎn)單說(shuō)一下如何使用條件變量。
l 聲明pthread_cond_t變量后,調用pthread_cond_init()函數,第一個(gè)參數為之前聲明的變量。第二個(gè)參數在Linux中不起作用。
l 聲明一個(gè)pthread_mutex_t變量,并調用pthread_mutex_init()初始化。
l 調用pthread_cond_signal(),發(fā)出信號。如果此時(shí)有線(xiàn)程在等待該信號,那么該線(xiàn)程將會(huì )喚醒。如果沒(méi)有,該信號就會(huì )別忽略。
l 如果想喚醒所有等待該信號的線(xiàn)程,調用pthread_cond_broadcast()。
l 調用pthread_cond_wait()等待信號。如果沒(méi)有信號,線(xiàn)程將會(huì )阻塞,直到有信號。該函數的第一個(gè)參數是條件變量,第二個(gè)參數是一個(gè)mutex。在調用該函數之前必須先獲得互斥量。如果線(xiàn)程阻塞,互斥量將立刻會(huì )被釋放。
下面給出一個(gè)簡(jiǎn)單的使用例子。
#include <pthread.h>
#include <stdio.h>
pthread_mutex_t mutex;
pthread_cond_t cond;
int flag;
void init()
{
pthread_mutex_init(&mutex,NULL);
pthread_cond_init(&cond,NULL);
flag=0;
}
void * Thread_Function(void * arg)
{
//loop infinitely
while(1)
{
pthread_mutex_lock(&mutex);
while(!flag)
pthread_cond_wait(&cond,&mutex);
pthread_mutex_unlock(&mutex);
do_some_work();
}
}
void SetFlag()
{
pthread_mutex_lock(&mutex);
flag=1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
關(guān)于線(xiàn)程同步的技術(shù)先說(shuō)到這個(gè)地方。