POSIX Threads Programming
Table of Contents
What is a Thread?
Process-Thread Relationship:
What are Pthreads?
Why Pthreads?
For example, the following table compares timing results for the fork() subroutine and the pthreads_create() subroutine. Timings reflect 50,000 process/thread creations, were performed with the time utility, and units are in seconds.
However, even with MP_SHARED_MEMORY set to "yes", on-node MPI communications can not compete with Pthreads:
Designing Threaded Programs
Creating and Terminating Threads
Routines:
pthread_exit (status)
pthread_attr_init (attr)
pthread_attr_destroy (attr)
Creating Threads:
Thread Attributes:
Terminating Threads:
Example: Pthread Creation and Termination
#include <pthread.h> #include <stdio.h> #define NUM_THREADS 5 void *PrintHello(void *threadid) { printf("\n%d: Hello World!\n", threadid); pthread_exit(NULL); } int main (int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; int rc, t; for(t=0;t < NUM_THREADS;t++){ printf("Creating thread %d\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } pthread_exit(NULL); }
Passing Arguments to Threads
int *taskids[NUM_THREADS]; for(t=0;t < NUM_THREADS;t++) { taskids[t] = (int *) malloc(sizeof(int)); *taskids[t] = t; printf("Creating thread %d\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) taskids[t]); ... }
struct thread_data{ int thread_id; int sum; char *message; }; struct thread_data thread_data_array[NUM_THREADS]; void *PrintHello(void *threadarg) { struct thread_data *my_data; ... my_data = (struct thread_data *) threadarg; taskid = my_data->thread_id; sum = my_data->sum; hello_msg = my_data->message; ... } int main (int argc, char *argv[]) { ... thread_data_array[t].thread_id = t; thread_data_array[t].sum = sum; thread_data_array[t].message = messages[t]; rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &thread_data_array[t]); ... }
int rc, t; for(t=0;t < NUM_THREADS;t++) { printf("Creating thread %d\n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &t); ... }
Joining and Detaching Threads
pthread_detach (threadid,status)
pthread_attr_setdetachstate (attr,detachstate)
pthread_attr_getdetachstate (attr,detachstate)
Joining:
Joinable or Not?
Recommendations:
Example: Pthread Joining
#include <pthread.h> #include <stdio.h> #define NUM_THREADS 3 void *BusyWork(void *null) { int i; double result=0.0; for (i=0; i < 1000000; i++) { result = result + (double)random(); } printf("result = %e\n",result); pthread_exit((void *) 0); } int main (int argc, char *argv[]) { pthread_t thread[NUM_THREADS]; pthread_attr_t attr; int rc, t, status; /* Initialize and set thread detached attribute */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for(t=0;t < NUM_THREADS;t++) { printf("Creating thread %d\n", t); rc = pthread_create(&thread[t], &attr, BusyWork, NULL); if (rc) { printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } /* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); for(t=0;t < NUM_THREADS;t++) { rc = pthread_join(thread[t], (void **)&status); if (rc) { printf("ERROR; return code from pthread_join() is %d\n", rc); exit(-1); } printf("Completed join with thread %d status= %d\n",t, status); } pthread_exit(NULL); }
Stack Management
pthread_attr_setstacksize (attr, stacksize)
pthread_attr_getstackaddr (attr, stackaddr)
pthread_attr_setstackaddr (attr, stackaddr)
Preventing Stack Problems:
Example: Stack Management
#include <pthread.h> #include <stdio.h> #define NTHREADS 4 #define N 1000 #define MEGEXTRA 1000000 pthread_attr_t attr; void *dowork(void *threadid) { double A[N][N]; int i,j; size_t mystacksize; pthread_attr_getstacksize (&attr, &mystacksize); printf("Thread %d: stack size = %d bytes \n", threadid, mystacksize); for (i=0; i<N; i++) for (j=0; i<N; i++) A[i][j] = ((i*j)/3.452) + (N-i); pthread_exit(NULL); } int main(int argc, char *argv[]) { pthread_t threads[NTHREADS]; size_t stacksize; int rc, t; pthread_attr_init(&attr); pthread_attr_getstacksize (&attr, &stacksize); printf("Default stack size = %d\n", stacksize); stacksize = sizeof(double)*N*N+MEGEXTRA; printf("Amount of stack needed per thread = %d\n",stacksize); pthread_attr_setstacksize (&attr, stacksize); printf("Creating threads with stack size = %d bytes\n",stacksize); for(t=0;t<NTHREADS;t++){ rc = pthread_create(&threads[t], &attr, dowork, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d\n", rc); exit(-1); } } printf("Created %d threads.\n", t); pthread_exit(NULL); }
Miscellaneous Routines
pthread_equal (thread1,thread2)
pthread_once_t once_control = PTHREAD_ONCE_INIT;
Overview
Creating and Destroying Mutexes
pthread_mutex_destroy (mutex)
pthread_mutexattr_init (attr)
pthread_mutexattr_destroy (attr)
Usage:
The mutex is initially unlocked.
Note that not all implementations may provide the three optional mutex attributes.
Locking and Unlocking Mutexes
pthread_mutex_trylock (mutex)
pthread_mutex_unlock (mutex)
Thread 1 Thread 2 Thread 3 Lock Lock A = 2 A = A+1 A = A*B Unlock Unlock
Example: Using Mutexes
#include <pthread.h> #include <stdio.h> #include <malloc.h> /* The following structure contains the necessary information to allow the function "dotprod" to access its input data and place its output into the structure. */ typedef struct { double *a; double *b; double sum; int veclen; } DOTDATA; /* Define globally accessible variables and a mutex */ #define NUMTHRDS 4 #define VECLEN 100 DOTDATA dotstr; pthread_t callThd[NUMTHRDS]; pthread_mutex_t mutexsum; /* The function dotprod is activated when the thread is created. All input to this routine is obtained from a structure of type DOTDATA and all output from this function is written into this structure. The benefit of this approach is apparent for the multi-threaded program: when a thread is created we pass a single argument to the activated function - typically this argument is a thread number. All the other information required by the function is accessed from the globally accessible structure. */ void *dotprod(void *arg) { /* Define and use local variables for convenience */ int i, start, end, offset, len ; double mysum, *x, *y; offset = (int)arg; len = dotstr.veclen; start = offset*len; end = start + len; x = dotstr.a; y = dotstr.b; /* Perform the dot product and assign result to the appropriate variable in the structure. */ mysum = 0; for (i=start; i < end ; i++) { mysum += (x[i] * y[i]); } /* Lock a mutex prior to updating the value in the shared structure, and unlock it upon updating. */ pthread_mutex_lock (&mutexsum); dotstr.sum += mysum; pthread_mutex_unlock (&mutexsum); pthread_exit((void*) 0); } /* The main program creates threads which do all the work and then print out result upon completion. Before creating the threads, the input data is created. Since all threads update a shared structure, we need a mutex for mutual exclusion. The main thread needs to wait for all threads to complete, it waits for each one of the threads. We specify a thread attribute value that allow the main thread to join with the threads it creates. Note also that we free up handles when they are no longer needed. */ int main (int argc, char *argv[]) { int i; double *a, *b; int status; pthread_attr_t attr; /* Assign storage and initialize values */ a = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double)); b = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double)); for (i=0; i < VECLEN*NUMTHRDS; i++) { a[i]=1.0; b[i]=a[i]; } dotstr.veclen = VECLEN; dotstr.a = a; dotstr.b = b; dotstr.sum=0; pthread_mutex_init(&mutexsum, NULL); /* Create threads to perform the dotproduct */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for(i=0;i < NUMTHRDS;i++) { /* Each thread works on a different set of data. The offset is specified by 'i'. The size of the data for each thread is indicated by VECLEN. */ pthread_create( &callThd[i], &attr, dotprod, (void *)i); } pthread_attr_destroy(&attr); /* Wait on the other threads */ for(i=0;i < NUMTHRDS;i++) { pthread_join( callThd[i], (void **)&status); } /* After joining, print out the results and cleanup */ printf ("Sum = %f \n", dotstr.sum); free (a); free (b); pthread_mutex_destroy(&mutexsum); pthread_exit(NULL); }
Creating and Destroying Condition Variables
pthread_cond_destroy (condition)
pthread_condattr_init (attr)
pthread_condattr_destroy (attr)
Note that not all implementations may provide the process-shared attribute.
Waiting and Signaling on Condition Variables
pthread_cond_signal (condition)
pthread_cond_broadcast (condition)
Example: Using Condition Variables
#include <pthread.h> #include <stdio.h> #define NUM_THREADS 3 #define TCOUNT 10 #define COUNT_LIMIT 12 int count = 0; int thread_ids[3] = {0,1,2}; pthread_mutex_t count_mutex; pthread_cond_t count_threshold_cv; void *inc_count(void *idp) { int j,i; double result=0.0; int *my_id = idp; for (i=0; i < TCOUNT; i++) { pthread_mutex_lock(&count_mutex); count++; /* Check the value of count and signal waiting thread when condition is reached. Note that this occurs while mutex is locked. */ if (count == COUNT_LIMIT) { pthread_cond_signal(&count_threshold_cv); printf("inc_count(): thread %d, count = %d Threshold reached.\n", *my_id, count); } printf("inc_count(): thread %d, count = %d, unlocking mutex\n", *my_id, count); pthread_mutex_unlock(&count_mutex); /* Do some work so threads can alternate on mutex lock */ for (j=0; j < 1000; j++) result = result + (double)random(); } pthread_exit(NULL); } void *watch_count(void *idp) { int *my_id = idp; printf("Starting watch_count(): thread %d\n", *my_id); /* Lock mutex and wait for signal. Note that the pthread_cond_wait routine will automatically and atomically unlock mutex while it waits. Also, note that if COUNT_LIMIT is reached before this routine is run by the waiting thread, the loop will be skipped to prevent pthread_cond_wait from never returning. */ pthread_mutex_lock(&count_mutex); while (count < COUNT_LIMIT) { pthread_cond_wait(&count_threshold_cv, &count_mutex); printf("watch_count(): thread %d Condition signal received.\n", *my_id); } pthread_mutex_unlock(&count_mutex); pthread_exit(NULL); } int main (int argc, char *argv[]) { int i, rc; pthread_t threads[3]; pthread_attr_t attr; /* Initialize mutex and condition variable objects */ pthread_mutex_init(&count_mutex, NULL); pthread_cond_init (&count_threshold_cv, NULL); /* For portability, explicitly create threads in a joinable state so that they can be joined later. */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&threads[0], &attr, inc_count, (void *)&thread_ids[0]); pthread_create(&threads[1], &attr, inc_count, (void *)&thread_ids[1]); pthread_create(&threads[2], &attr, watch_count, (void *)&thread_ids[2]); /* Wait for all threads to complete */ for (i = 0; i < NUM_THREADS; i++) { pthread_join(threads[i], NULL); } printf ("Main(): Waited on %d threads. Done.\n", NUM_THREADS); /* Clean up and exit */ pthread_attr_destroy(&attr); pthread_mutex_destroy(&count_mutex); pthread_cond_destroy(&count_threshold_cv); pthread_exit(NULL); }
This section describes details specific to Livermore Computing's systems.
Implementations:
Compiling:
Mixing MPI with Pthreads:
Several features of the Pthreads API are not covered in this tutorial. These are listed below. See the Pthread Library Routines Reference section for more information.
This completes the tutorial.
Where would you like to go now?