主线程启动用户指定数量的线程,这些线程进入条件等待状态。
主线程生成一些任务(一定计算量),每生成一个新的任务,就用条件变量唤醒一个线程,当这个唤醒线程执行完任务时,回到条件等待状态。
当主线程生成完所有任务,设置全局变量表示再没有要生成的任务了,并用一个广播唤醒所有线程。为了清晰起见,建议任务采用链表操作。
#include <iostream>
#include <stdlib.h>
#include <pthread.h>
using namespace std;
const int N=1e4,N1=1e6;
struct Node {int id;int a[N1],b[N1];Node* next;
};
struct Queue {Node* front;Node* tail;
};
int InitQueue(Queue* T);
int Add(Queue* T, int e);
int DoTask(Queue* T, int* ep);
int thread_count, finished = 0;
pthread_mutex_t mutex,actprint;
pthread_cond_t cond;
void* task(void* rank);
Queue Q;
int main(int argc, char* argv[])
{ srand(0);InitQueue(&Q);pthread_t *thread_handles;thread_count = strtol(argv[1], NULL, 10);//从命令行读取线程数thread_handles = new pthread_t[thread_count];//分配长度pthread_mutex_init(&mutex, NULL);pthread_mutex_init(&actprint, NULL);pthread_cond_init(&cond, NULL);int n;printf("Please input the task number:");cin>>n;for (int i = 0; i < thread_count; i++)pthread_create(&thread_handles[i], NULL, task, (void*)i);for (int i = 0; i < n; i++) {pthread_mutex_lock(&mutex);Add(&Q, i);pthread_cond_signal(&cond);pthread_mutex_unlock(&mutex);}finished = 1;pthread_cond_broadcast(&cond);for (int i = 0; i < thread_count; i++)pthread_join(thread_handles[i], NULL);pthread_mutex_destroy(&mutex);pthread_mutex_destroy(&actprint);pthread_cond_destroy(&cond);free(thread_handles);return 0;
}
int InitQueue(Queue* T) {T->front = (Node*)malloc(sizeof(Node));T->tail = T->front;T->front->id = 0;T->front->next = NULL;return 0;
}
int Add(Queue* T, int e) {Node* newnode = (Node*)malloc(sizeof(Node));newnode->id = e;newnode->next = 0;int t,k=0;for(int i=0;i<N1;i++){t=rand()%N;newnode->a[k++]=t;}k=0;for(int i=0;i<N;i++){t=rand()%N;newnode->b[k++]=t;}T->tail->next = newnode;T->tail = newnode;return 0;
}
int DoTask(Queue* T, int* e) {Node* dest;if (T->tail == T->front)return 0;//队列中已经没有任务dest = T->front->next;//从头取出一个任务if (dest == 0) return 0;for(int i=0;i<N1;i++){//做任务dest->a[i]=dest->a[i]*dest->b[i];}*e = dest->id;T->front->next = dest->next;free(dest);return 0;
}
void* task(void* rank) {long long my_rank = (long long)rank;pthread_mutex_lock(&actprint);cout<<"process "<<my_rank<<" has been activated"<<endl;pthread_mutex_unlock(&actprint);int my_task;Node** p = &(Q.front->next);while (1) {pthread_mutex_lock(&mutex);if (finished) {if (*p == NULL) {pthread_mutex_unlock(&mutex);break;}DoTask(&Q, &my_task);pthread_mutex_unlock(&mutex);printf("Thread %ld: Task no.%d\n", my_rank, my_task);}else {while(pthread_cond_wait(&cond, &mutex)!=0);DoTask(&Q, &my_task);pthread_mutex_unlock(&mutex);printf("Thread %ld: Task no.%d\n", my_rank, my_task);}}
}
参考资料
Pthreads实现任务队列