CC BY 4.0 (除特别声明或转载文章外)
如果这篇博客帮助到你,可以请我喝一杯咖啡~
用 pthread 信号量实现同步函数
采用简单同步计数方法
- 忙等待
- 阻塞等待
这里实现同步函数时,实现了和 POSIX 函数库中pthread_barrier_t
相似的函数接口,这样做的好处是只要调整一些宏的定义,就可以简单地从 POSIX 库实现的同步函数和我下面实现的两个同步函数进行切换和性能比较了。
忙等待
去掉//define WK_BARRIER_BUSY
前的注释号,即可选用忙等待实现的Barrier
。
#ifdef WK_BARRIER_BUSY
#define pthread_barrier_t Barrier
#define pthread_barrier_init initBarrier
#define pthread_barrier_destroy destoryBarrier
#define pthread_barrier_wait waitBarrier
#define BARRIER_FLAG (1UL << 31)
#define PTHREAD_BARRIER_SERIAL_THREAD -1
typedef struct
{
unsigned cnt, numThreads;
pthread_mutex_t m;
pthread_cond_t cv;
} Barrier;
int initBarrier(Barrier *b, void *attr, int numThreads)
{
pthread_mutex_init(&b->m, NULL);
pthread_cond_init(&b->cv, NULL);
b->numThreads = numThreads;
b->cnt = BARRIER_FLAG;
return 0;
}
int destoryBarrier(Barrier *b)
{
pthread_mutex_lock(&b->m);
while (b->cnt > BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
pthread_mutex_unlock(&b->m);
pthread_cond_destroy(&b->cv);
pthread_mutex_destroy(&b->m);
return 0;
}
int waitBarrier(Barrier *b)
{
pthread_mutex_lock(&b->m);
while (b->cnt > BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
if (b->cnt == BARRIER_FLAG)
b->cnt = 0;
if (++b->cnt == b->numThreads)
{
b->cnt += BARRIER_FLAG - 1;
pthread_cond_broadcast(&b->cv);
pthread_mutex_unlock(&b->m);
return PTHREAD_BARRIER_SERIAL_THREAD;
}
else
{
while (b->cnt < BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
--b->cnt;
if (b->cnt == BARRIER_FLAG)
pthread_cond_broadcast(&b->cv);
pthread_mutex_unlock(&b->m);
return 0;
}
}
#endif
阻塞等待
去掉//define WK_BARRIER_BUSY
前的注释号,即可选用阻塞等待实现的Barrier
。
#ifdef WK_BARRIER_BLOCK
#define pthread_barrier_t Barrier
#define pthread_barrier_init initBarrier
#define pthread_barrier_destroy destoryBarrier
#define pthread_barrier_wait waitBarrier
typedef struct
{
sem_t cntSem, barrierSem;
int numThreads, cnt;
} Barrier;
int initBarrier(Barrier *b, void *attr, int numThreads)
{
sem_init(&b->cntSem, 0, 1);
sem_init(&b->barrierSem, 0, 0);
b->numThreads = numThreads;
b->cnt = 0;
return 0;
}
int destoryBarrier(Barrier *b)
{
return sem_destroy(&b->cntSem), sem_destroy(&b->barrierSem), 0;
}
int waitBarrier(Barrier *b)
{
sem_wait(&b->cntSem);
if (++b->cnt == b->numThreads)
{
sem_post(&b->cntSem);
for (int i = 1; i < b->numThreads; ++i)
sem_post(&b->barrierSem);
return PTHREAD_BARRIER_SERIAL_THREAD;
}
else
{
sem_post(&b->cntSem);
sem_wait(&b->barrierSem);
return 0;
}
}
#endif
实现分块矩阵乘法
采用前述同步函数实现分块矩阵乘法
- 设$A=(A_{ij})$,$B=(B_{ij})$,$C=(C_{ij})$,$C=AB$
- 分块大小根据线程数 p
- 任务按中间数据划分
- 每个线程负责计算一个$A_{i,k}B_{k,j}$
- 对$C_{i,j}=\sum_{k}A_{i,k}B_{k,j}$计算采用求和两种方法
- 各由一个线程直接求和
- 多线程协作树形求和
- 矩阵元素:
- $a_{ij}=(i-0.1*j+1)/(i+j+1)$,
- $b_{ij}=(j-0.2i+1)(i+j+1)/(ii+jj+1)$
为了简化代码实现,这里分块选择按a
的列、b
的行进行分块(可以想象成把原先的矩阵分成了一个个长条)。
事实上,最优的方法其实应该是按照
a
的行或者b
的列之一进行分块,这样做的好处是每个线程对应答案矩阵的元素没有重复,也就是不需要最后的求和操作。然而这一次本身就要求我们实现树形求和,这样做的话无法满足要求。
去掉//define WK_SUM_TREE
前的注释号,即可选用多线程协作的树形求和。
void *worker(void *para)
{
int rank = (int)para;
local_c[rank] = rank ? (lf *)malloc(sizeof(lf) * n * m) : c;
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
{
local_c[rank][i * m + j] = 0;
for (int block = (p + numThreads - 1) / numThreads,
k = block * rank,
ke = k + block < p ? k + block : p;
k < ke; ++k)
local_c[rank][i * m + j] += a[i * p + k] * b[k * m + j];
}
pthread_barrier_wait(&barrier);
#ifdef WK_SUM_TREE
for (int k = 1; k < numThreads; k <<= 1)
{
if (rank & ((k << 1) - 1) == 0)
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
local_c[rank][i * m + j] += local_c[rank + k][i * m + j];
pthread_barrier_wait(&barrier);
}
#else
if (rank == 0)
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
for (int k = 1; k < numThreads; ++k)
c[i * m + j] += local_c[k][i * m + j];
#endif
pthread_barrier_wait(&barrier);
if (rank)
free(local_c[rank]);
}
按下表给出加速比和效率
计算算法的 S,E,并填写下表,假设矩阵阶$n=Ai=2^i , i=7,8,9,10,\dots$。
运行结果如下。在某些极端参数下(例如问题很大)时,某些数据出现了大于 1 的运行效率,这里可能是由于系统运行时的一些抖动产生的。此外,得到的结果还是比较符合经验的,问题规模增加时效率和加速比都有提升,而单纯提高并行线程数会增加加速比但是减小运行效率。
运行时间 T(单位 s)
详见附件中的输出文件mul.o666
。
线程数\N | $2^7$ | $2^8$ | $2^9$ | $2^{10}$ | $2^{11}$ |
---|---|---|---|---|---|
1 | 0.037599 | 0.216247 | 1.840997 | 12.817497 | 235.738514 |
2 | 0.019391 | 0.120468 | 0.645917 | 6.617959s | 51.516046 |
4 | 0.010033 | 0.066861 | 0.342896 | 3.398657 | 26.097873 |
8 | 0.005668 | 0.033389 | 0.198120 | 1.153888 | 12.976957 |
16 | 0.004929 | 0.021350 | 0.131165 | 0.620297 | 6.606665 |
32 | 0.005698 | 0.023265 | 0.090976 | 0.355688 | 2.259997 |
64 | 0.012130 | 0.023060 | 0.084289 | 0.329493 | 1.826968 |
加速比 S
加速比 S=同等规模下的串行时间/并行时间。
线程数\N | $2^7$ | $2^8$ | $2^9$ | $2^{10}$ | $2^{11}$ |
---|---|---|---|---|---|
1 | 1.00 | 1.00 | 1.00 | 1.00 | 1.00 |
2 | 1.94 | 1.80 | 2.85 | 1.93 | 4.58 |
4 | 3.75 | 3.23 | 5.37 | 3.77 | 9.03 |
8 | 6.63 | 6.48 | 9.29 | 11.10 | 18.17 |
16 | 7.63 | 10.13 | 14.03 | 20.66 | 35.68 |
32 | 6.60 | 9.29 | 20.23 | 36.03 | 104.30 |
64 | 3.10 | 9.38 | 21.84 | 38.90 | 129.03 |
效率 E
运行效率 E=加速比 S/并行线程数。
线程数\N | $2^7$ | $2^8$ | $2^9$ | $2^{10}$ | $2^{11}$ |
---|---|---|---|---|---|
1 | 1.00 | 1.00 | 1.00 | 1.00 | 1.00 |
2 | 0.97 | 0.90 | 1.43 | 0.97 | 2.29 |
4 | 0.94 | 0.81 | 1.34 | 0.94 | 2.26 |
8 | 0.83 | 0.81 | 1.16 | 1.39 | 2.27 |
16 | 0.48 | 0.63 | 0.88 | 1.30 | 2.23 |
32 | 0.21 | 0.29 | 0.63 | 1.13 | 3.26 |
64 | 0.05 | 0.15 | 0.34 | 0.61 | 2.01 |
源代码
mul.c
#define WK_BARRIER_BUSY
//#define WK_BARRIER_BLOCK
#define WK_SUM_TREE
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <omp.h>
#ifdef WK_BARRIER_BUSY
#define pthread_barrier_t Barrier
#define pthread_barrier_init initBarrier
#define pthread_barrier_destroy destoryBarrier
#define pthread_barrier_wait waitBarrier
#define BARRIER_FLAG (1UL << 31)
#define PTHREAD_BARRIER_SERIAL_THREAD -1
typedef struct
{
unsigned cnt, numThreads;
pthread_mutex_t m;
pthread_cond_t cv;
} Barrier;
int initBarrier(Barrier *b, void *attr, int numThreads)
{
pthread_mutex_init(&b->m, NULL);
pthread_cond_init(&b->cv, NULL);
b->numThreads = numThreads;
b->cnt = BARRIER_FLAG;
return 0;
}
int destoryBarrier(Barrier *b)
{
pthread_mutex_lock(&b->m);
while (b->cnt > BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
pthread_mutex_unlock(&b->m);
pthread_cond_destroy(&b->cv);
pthread_mutex_destroy(&b->m);
return 0;
}
int waitBarrier(Barrier *b)
{
pthread_mutex_lock(&b->m);
while (b->cnt > BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
if (b->cnt == BARRIER_FLAG)
b->cnt = 0;
if (++b->cnt == b->numThreads)
{
b->cnt += BARRIER_FLAG - 1;
pthread_cond_broadcast(&b->cv);
pthread_mutex_unlock(&b->m);
return PTHREAD_BARRIER_SERIAL_THREAD;
}
else
{
while (b->cnt < BARRIER_FLAG)
pthread_cond_wait(&b->cv, &b->m);
--b->cnt;
if (b->cnt == BARRIER_FLAG)
pthread_cond_broadcast(&b->cv);
pthread_mutex_unlock(&b->m);
return 0;
}
}
#endif
#ifdef WK_BARRIER_BLOCK
#define pthread_barrier_t Barrier
#define pthread_barrier_init initBarrier
#define pthread_barrier_destroy destoryBarrier
#define pthread_barrier_wait waitBarrier
typedef struct
{
sem_t cntSem, barrierSem;
int numThreads, cnt;
} Barrier;
int initBarrier(Barrier *b, void *attr, int numThreads)
{
sem_init(&b->cntSem, 0, 1);
sem_init(&b->barrierSem, 0, 0);
b->numThreads = numThreads;
b->cnt = 0;
return 0;
}
int destoryBarrier(Barrier *b)
{
return sem_destroy(&b->cntSem), sem_destroy(&b->barrierSem), 0;
}
int waitBarrier(Barrier *b)
{
sem_wait(&b->cntSem);
if (++b->cnt == b->numThreads)
{
sem_post(&b->cntSem);
for (int i = 1; i < b->numThreads; ++i)
sem_post(&b->barrierSem);
return PTHREAD_BARRIER_SERIAL_THREAD;
}
else
{
sem_post(&b->cntSem);
sem_wait(&b->barrierSem);
return 0;
}
}
#endif
typedef double lf;
lf *a, *b, *c, **local_c;
int numThreads, n, p, m;
pthread_barrier_t barrier;
void *worker(void *para)
{
int rank = (int)para;
local_c[rank] = rank ? (lf *)malloc(sizeof(lf) * n * m) : c;
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
{
local_c[rank][i * m + j] = 0;
for (int block = (p + numThreads - 1) / numThreads,
k = block * rank,
ke = k + block < p ? k + block : p;
k < ke; ++k)
local_c[rank][i * m + j] += a[i * p + k] * b[k * m + j];
}
pthread_barrier_wait(&barrier);
#ifdef WK_SUM_TREE
for (int k = 1; k < numThreads; k <<= 1)
{
if (rank & ((k << 1) - 1) == 0)
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
local_c[rank][i * m + j] += local_c[rank + k][i * m + j];
pthread_barrier_wait(&barrier);
}
#else
if (rank == 0)
for (int i = 0; i < n; ++i)
for (int j = 0; j < m; ++j)
for (int k = 1; k < numThreads; ++k)
c[i * m + j] += local_c[k][i * m + j];
#endif
pthread_barrier_wait(&barrier);
if (rank)
free(local_c[rank]);
}
int main(int argc, char **argv)
{
numThreads = atoi(argv[1]);
n = p = m = 1 << atoi(argv[2]);
a = (lf *)malloc(sizeof(lf) * n * p);
b = (lf *)malloc(sizeof(lf) * p * m);
c = (lf *)malloc(sizeof(lf) * n * m);
for (int i = 0; i < n; ++i)
for (int j = 0; j < p; ++j)
a[i * p + j] = (i - 0.1 * j + 1) / (i + j + 1);
for (int i = 0; i < p; ++i)
for (int j = 0; j < m; ++j)
b[i * m + j] = (j - 0.2 * i + 1) * (i + j + 1) / (i * i + j * j + 1);
local_c = (lf **)malloc(sizeof(lf *) * numThreads);
double start = omp_get_wtime();
pthread_t *thread = (pthread_t *)malloc(sizeof(pthread_t) * numThreads);
pthread_barrier_init(&barrier, NULL, numThreads);
for (int i = 0; i < numThreads; ++i)
pthread_create(&thread[i], 0, worker, (void *)i);
for (int i = 0; i < numThreads; ++i)
pthread_join(thread[i], 0);
printf("elapsed time: %fs with problem size %d, %d threads\n", omp_get_wtime() - start, n, numThreads);
pthread_barrier_destroy(&barrier);
free(thread), free(a), free(b), free(c), free(local_c);
}
mul.pbs
作业脚本。
#PBS -N mul
#PBS -l nodes=1:ppn=1
#PBS -j oe
gcc mul.c -o mul -lpthread -fopenmp -w -std=c11
for numThreads in 1 2 4 8 16 32 64
do
for logn in 7 8 9 10 11
do
./mul $numThreads $logn
done
done
mul.o666
最终得到的输出文件。顺便,这里的任务序号是蛮吉利的。
elapsed time: 0.037599s with problem size 128, 1 threads
elapsed time: 0.216247s with problem size 256, 1 threads
elapsed time: 1.840997s with problem size 512, 1 threads
elapsed time: 12.817497s with problem size 1024, 1 threads
elapsed time: 235.738514s with problem size 2048, 1 threads
elapsed time: 0.019391s with problem size 128, 2 threads
elapsed time: 0.120468s with problem size 256, 2 threads
elapsed time: 0.645917s with problem size 512, 2 threads
elapsed time: 6.617959s with problem size 1024, 2 threads
elapsed time: 51.516046s with problem size 2048, 2 threads
elapsed time: 0.010033s with problem size 128, 4 threads
elapsed time: 0.066861s with problem size 256, 4 threads
elapsed time: 0.342896s with problem size 512, 4 threads
elapsed time: 3.398657s with problem size 1024, 4 threads
elapsed time: 26.097873s with problem size 2048, 4 threads
elapsed time: 0.005668s with problem size 128, 8 threads
elapsed time: 0.033389s with problem size 256, 8 threads
elapsed time: 0.198120s with problem size 512, 8 threads
elapsed time: 1.153888s with problem size 1024, 8 threads
elapsed time: 12.976957s with problem size 2048, 8 threads
elapsed time: 0.004929s with problem size 128, 16 threads
elapsed time: 0.021350s with problem size 256, 16 threads
elapsed time: 0.131165s with problem size 512, 16 threads
elapsed time: 0.620297s with problem size 1024, 16 threads
elapsed time: 6.606665s with problem size 2048, 16 threads
elapsed time: 0.005698s with problem size 128, 32 threads
elapsed time: 0.023265s with problem size 256, 32 threads
elapsed time: 0.090976s with problem size 512, 32 threads
elapsed time: 0.355688s with problem size 1024, 32 threads
elapsed time: 2.259997s with problem size 2048, 32 threads
elapsed time: 0.012130s with problem size 128, 64 threads
elapsed time: 0.023060s with problem size 256, 64 threads
elapsed time: 0.084289s with problem size 512, 64 threads
elapsed time: 0.329493s with problem size 1024, 64 threads
elapsed time: 1.826968s with problem size 2048, 64 threads