并行编程实验一
实验内容
实验内容一:以数据划分的方式并行计算PI值
实验内容二:CPU多核编程——线程池开发
要求基于生产者—消费者模式进行框架开发,具体工作需求可以简化,但需要有线程管理和同步。
计算Pi
思路简述
依据莱布尼兹公式,通过多线程计算较多的次数,逼近$\pi$ 。用多线程的方式进行数据划分、即每个线程分担处理部分数据,从而进行加速。
同时由于多线程访问全局的结果可能会有冲突,因此使用互斥量和信号量组织线程有序将局部结果加到全局结果中。
代码实现
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
const int BLOCK_SIZE = 100000;
double sum = 0;
int num_threads;
// 定义互斥量和条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void *calculate_block(void *thread_id)
{
long id = (long)thread_id;
int start = id * BLOCK_SIZE;
int end = start + BLOCK_SIZE;
double block_sum = 0;
for (int i = start; i < end; i++) {
double term = pow(-1, i) / (2 * i + 1);
block_sum += term;
}
pthread_mutex_lock(&mutex);
sum += block_sum;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond);
}
int main()
{
num_threads = 8;
pthread_t threads[num_threads];
for (long i = 0; i < num_threads; i++)
pthread_create(&threads[i], NULL, calculate_block, (void *)i);
for(int i = 0; i < num_threads; i ++)
pthread_join(threads[i], NULL);
printf("%lf", sum * 4);
}
线程池设计
线程池实现
使用一个任务队列作为生产者和消费者之间的缓冲区,任务队列中的每一个元素包含要执行的函数和函数参数,对应代码如下:
typedef struct task_t {
void (*func)(void *);
void *arg;
} task_t;
线程池包含一个任务队列、若干线程、互斥量与信号量以及其它关键属性,定义如下:
typedef struct thread_pool_t {
pthread_t threads[MAX_THREADS];
int num_threads;
task_t queue[MAX_QUEUE];
int front, rear, size;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool shutdown;
} thread_pool_t;
在生产者方面,thread_pool_enqueue
函数用于将任务添加到任务队列中。当生产者生产了一个任务后,它首先会通过线程池的互斥锁pool->mutex
来保护任务队列,防止多个线程同时修改任务队列,然后使用条件变量pool->cond
来通知消费者有新的任务到达。
bool thread_pool_enqueue(thread_pool_t *pool, void (*func)(void *), void *arg){
pthread_mutex_lock(&pool->mutex);
if (pool->size == MAX_QUEUE) {
pthread_mutex_unlock(&pool->mutex);
return false;
}
task_t task = { .func = func, .arg = arg };
pool->queue[pool->rear] = task;
pool->rear = (pool->rear + 1) % MAX_QUEUE;
pool->size++;
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
return true;
}
在消费者方面,thread_pool_worker
函数用于从任务队列中取出任务并执行。当消费者从任务队列中取出一个任务时,它会使用互斥锁pool->mutex
来保护任务队列。如果任务队列为空,消费者将会被条件变量pool->mutex
阻塞,等待生产者添加新的任务到任务队列中。
void *thread_pool_worker(void *arg){
thread_pool_t *pool = (thread_pool_t *)arg;
while (true) {
pthread_mutex_lock(&pool->mutex);
while (pool->size == 0 && !pool->shutdown)
pthread_cond_wait(&pool->cond, &pool->mutex);
if (pool->size == 0 && pool->shutdown) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task_t task = pool->queue[pool->front];
pool->front = (pool->front + 1) % MAX_QUEUE;
pool->size--;
pthread_mutex_unlock(&pool->mutex);
task.func(task.arg);
}
}
线程池的启动包括初始化任务队列、信号量、启动消费者线程;线程池的关闭最重要的是等待所有线程运行结束;两者实现如下:
void thread_pool_init(thread_pool_t *pool, int num_threads) {
pool->num_threads = num_threads;
pool->front = pool->rear = pool->size = 0;
pool->shutdown = false;
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < num_threads; i++)
pthread_create(&pool->threads[i], NULL, thread_pool_worker, pool);
}
void thread_pool_shutdown(thread_pool_t *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = true;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->num_threads; i++)
pthread_join(pool->threads[i], NULL);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
}
设计一个简单的任务,输出任务id与线程id,并放到线程池运行:
void my_task(void *arg){
int *num = (int *)arg;
printf("Task %d by thread %lu\n", *num, pthread_self());
free(num);
}
int main() {
thread_pool_t pool;
thread_pool_init(&pool, 8);
for (int i = 0; i < 20; i++) {
int *num = malloc(sizeof(int));
*num = i;
thread_pool_enqueue(&pool, my_task, num);
}
printf("main thread %lu\n", pthread_self());
thread_pool_shutdown(&pool);
return 0;
}
运行结果
在ubuntu 20中编译gcc tp.c -o tp -lpthread
,运行结果描述如下:
Task 0 by thread 140422668744478
Task 4 by thread 140422668744456
Task 5 by thread 140422668744434
...
线程池
使用线程池完成矩阵相加$a+b=c$,相应的任务函数如下:
void add_matrix(void *arg) {
int *num = (int *)arg;
int st = *num, ed = st + 5;
for (int i = st; i < ed; i++)
c[i] = a[i] + b[i];
printf("Task [add_matrix] %d by thread %lu, range = %d ~ %d \n", *num /5, pthread_self(), st, ed);
free(num);
}