Previous section   Next section

11.8 Thread Synchronization

It is a given that to fully utilize threads, we need to reliably coordinate their activities. A thread's access to critical sections of common code that modify shared data structures must be protected in a manner such that the integrity of the data referenced is not compromised. POSIX thread activities can be synchronized in a variety of ways.

11.8.1 Mutex Variables

One of the easiest methods to ensure coordination is to use a mutual exclusion lock, or mutex. Conceptually, a mutex can be thought of as a binary semaphore with ownership whose state either allows (0, unlocked)[16] or prohibits (nonzero, locked) access. Unlike a binary semaphore, any thread within the scope of a mutex can lock a mutex, but only the thread that locked the mutex should unlock it. Again, while it should not be done, unlocking a mutex that a thread did not lock does not generate an error. However, such action results in undefined behavior—forewarned is forearmed. Threads unable to obtain a lock are suspended. As actions on a mutex are atomic, and only one thread at a time can successfully modify the state of a mutex, it can be used to force the serialization of thread activities. If threads associated with multiple processes are to be coordinated, the mutex must be mapped to a common section of memory shared by the processes involved.

[16] I know, I know—thinking semaphore-wise, you might expect a 1 to indicate unlocked and 0 locked. If the mutex is owned, it is considered locked.

The manner in which a mutex is created and initialized determines how it will be used. A mutex is of data type pthread_mutex_t. Examining the include file <bits/pthreadtypes.h>, we find this data type to be the following structure:

typedef struct {
  int __m_reserved;                  /* Reserved for future use          */
  int __m_count;                     /* Depth of recursive locking       */
  _pthread_descr __m_owner;          /* Owner thread (if recursive or
                                        errcheck)                        */
  int __m_kind;                      /* Mutex kind: fast, recursive or
                                        errcheck                         */
  struct _pthread_fastlock __m_lock; /* Underlying fast lock             */
} pthread_mutex_t;

Keep in mind that pthread_mutex_t is what is known as an opaque data type. That is, its specific structure is implementation-dependent. Thus, a POSIX threads implementation for the pthread_mutex_t data type for Linux might well be different from Sun's implementation. When in doubt, check it out!

A mutex, like a thread, can have its attributes specified via an attribute object, which is then passed to the pthread_mutex_init library function. Not to be outdone, the mutex attribute object also has its own initialization function, pthread_mutexattr_init, a deallocation function, and library functions (such as pthread_mutexattr_settype) for modification of the attribute settings once the object has been created. Changing the settings of the mutex attribute object will not change the settings of those mutexes previously allocated. We will restrict this section of our discussion to the following library calls: pthread_mutexattr_init, pthread_mutexattr_ settype, and pthread_mutex_init (tables 11.13, 11.14, and 11.15).

Table 11.13. The pthread_mutexattr_init Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_mutexattr_init( pthread_mutexattr_t *attr );

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutexattr_init call, which initializes the mutex attribute object, is passed a reference to a pthread_mutexattr_t structure. The definition of this structure, shown below, is found in the /usr/include/bits/pthreadtypes.h file.

typedef struct {
 int __mutexkind;
} pthread_mutexattr_t;

At present, the LinuxThreads implementation of POSIX threads provides support only for the attribute specifying the mutex kind. In LinuxThreads a mutex can be one of the following three kinds, with the default being the type fast. The kind (type) of a mutex determines the system's behavior when a thread attempts to lock the mutex (Table 11.14).

Table 11.14.

Mutex Kind

Constant

Behavior

fast

PTHREAD_MUTEX_FAST

If the mutex is already locked by another thread, the calling thread blocks. If the thread that owns (locked) the mutex attempts to lock it a second time, the thread will deadlock! The thread that unlocks the mutex is assumed to be the owner of the mutex. Unlocking a nonlocked mutex will result in undefined behavior.

recursive

PTHREAD_MUTEX_RECURSIVE

The system will the record number of lock requests for the mutex. The mutex is unlocked only when an equal number of unlock operations have been performed.

error-checking

PTHREAD_MUTEX_ERRORCHECK

If a thread attempts to lock a locked mutex, an error (EDEADLK) is returned.

If the pthread_mutexattr_init call is successful, it returns a 0 and a reference to a default pthread_mutexattr_t object; otherwise, it returns the value ENOMEM (12) to indicate there was insufficient memory to perform the initialization. One final note: A fast mutex (the default) is POSIX-based and thus portable. The mutex kinds recursive and error-checking are nonstandard and thus nonportable.

The pthread_mutexattr_settype library function is used to modify a mutex attribute object.

Table 11.15. The pthread_mutexattr_settype Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_mutexattr_settype(pthread_mutexattr_t 
graphics/ccc.gif*attr,
                          int               kind );

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutexattr_settype library call is passed a valid reference to a pthread_mutexattr_t object (presumably previously initialized by a successful call to pthread_mutexattr_init) and an integer argument (defined constant) indicating the mutex kind. The mutex kind is specified by one of the previously discussed PTHREAD_MUTEX_xxx constants. For example, the code sequence

pthread_mutexattr_t      my_attributes;
pthread_mutexattr_init( &my_attributes );
. . .
pthread_mutexattr_settype( &my_attributes, PTHREAD_MUTEX_RECURSIVE);

would allocate a mutex attribute object, set the default attributes, and then at a later point change the mutex kind to recursive.[17] If the pthread_muexattr_settype call is successful, it returns a 0; otherwise, it returns the value EINVAL (22), indicating it has found an invalid argument.

[17] Older version LinuxThreads may not support the pthread_mutexattr_settype call. An equivalent but deprecated call would be


thread_mutexattr_setkind_np(&my_attributes, PTHREAD_MUTEX_RECURSIVE_NP);

At this point, we must keep several things in mind. First, initializing a mutex attribute object does not create the actual mutex. Second, if the mutex attribute object is to be shared by threads in separate address spaces, the user is responsible for setting up the mapping of the mutex attribute object to a common shared memory location. Third, if a mutex is shared across processes, it must be allocated dynamically, and therefore a call to pthread_mutex_init and/or pthread_init would be needed. The mechanics of how to set up a shared mutex for threads sharing the same process space and those in separate process spaces can be found in Program 11.5.

Next, let's look at initializing a mutex using the pthread_mutex_init library call. Table 11.16 provides the details for pthread_mutex_init.

Table 11.16. The pthread_mutex_init Library Function

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_mutex_init(pthread_mutex_t  *mutex,
                   const pthread_mutexattr_t
                   *mutexattr);

Return

Success

Failure

Sets errno

0

Nonzero

 

The pthread_mutex_init library function initializes a mutex. Its first argument, *mutex, is a reference to the mutex, and the second argument, *mutexattr, is a reference to a previously initialized mutex attribute object. If the second argument is NULL, the mutex will be initialized with the default attributes. Thus, with pthread_mutex_init, we can generate a mutex with the default characteristics. For example, with the statements

. . .
pthread_mutex_t      my_lock;
. . .
pthread_mutex_init( &my_lock, NULL );

pthread_mutex_init returns a 0 and a reference to the mutex if successful. If the pthread_mutex_init call fails, it returns the value EINVAL (22) to indicate an invalid value for either the mutex or mutexattr argument.

While this approach (if we use an attribute object) is somewhat circuitous, it does allow greater freedom over the allocation of the mutex object. An alternate approach is to use a predefined constant PTHREAD_MUTEX_ INIIALIZER[18] to initialize the mutex. The code sequence for this is:

[18] In <pthread.h> we find this constant to be defined as {0, 0, 0, PTHREAD_MUTEX_TIMED_NP, __LOCK_INITIALIZER}.

pthread_mutex_t    my_lock = PTHREAD_MUTEX_INITIALIZER;

Additionally, LinuxThreads supports similar initializers for its two nonstandard mutex types: PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP and PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP.

Once the mutex has been created and initialized, there are four library functions that operate on the mutex, listed in Table 11.17.

Table 11.17. The mutex Manipulation Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_mutex_lock(    pthread_mutex_t *mutex );
int pthread_mutex_trylock( pthread_mutex_t *mutex );
int pthread_mutex_unlock(  pthread_mutex_t *mutex );
int pthread_mutex_destroy( pthread_mutex_t *mutex );

Return

Success

Failure

Sets errno

0

Nonzero

 

Each function takes a single argument, *mutex, a reference to the mutex. The library functions pthread_mutex_lock and pthread_mutex_unlock are used to lock and unlock a mutex. A pthread_mutex_lock, issued on a previously locked mutex, causes the issuing thread to block until the lock is free. If the mutex is unlocked, pthread_mutex_lock locks the mutex and changes the ownership of the mutex to the thread issuing the lock. The manual pages on this function note that should the owner of a mutex issue a second lock on mutex that it has previously locked, deadlock will result. The pthread_mutex_unlock library function is used to unlock a mutex. The thread issuing the unlock call should be the same as the thread that locked the mutex; otherwise, the resulting behavior is unspecified. Again, it is very easy to break the rules, as with the default fast type mutex, the concept of ownership is not enforced. The call pthread_mutex_ trylock is similar to pthread_mutex_lock (i.e., it will lock an unlocked mutex); however, it will not cause the thread to block if the indicated mutex is already locked. The library function pthread_mutex_destroy causes the referenced mutex to become uninitialized. However, the user must explicitly free the memory referenced by the mutex for it to be reclaimed by the operating system. In general, if two or more resources are to be acquired by multiple threads and each resource is protected by a mutex, a locking hierarchy should be established to reduce the chance of deadlock. That is, each thread should attempt to gain access to each resource in the same order, and when done, release the associated mutexes in the reverse order of their acquisition.

When these functions are successful, they return a 0; otherwise, they return a nonzero value. Both pthread_mutex_trylock and pthread_mutex_destroy return the value EBUSY (16) if the mutex is already locked. All functions return the value EINVAL (22) if the mutex argument is invalid.

We use two programs to demonstrate the use of a mutex. The programs are adaptations of an earlier producer—consumer example described in some detail in the chapter on semaphores (Chapter 7). As both programs use the same section of produce and consume logic, this code along with a small driver routine called do_work has been placed in a separate file. For similar reasons, code for includes and declarations have been placed in a common header file called local_mutex.h. Program 11.5 uses a mutex to coordinate the activities of multiple nondetached threads. Each thread executes a series of calls to the produce and consume routines. These routines access a shared file that acts as a common buffer. Access to the buffer is controlled by the mutex. The include file for Program 11.5 is shown below.

Program 11.5 Header file for mutex example.
File : local_mutex.h
  |     /*
  |          Common local header file: local_mutex.h
  |     */
  |     #ifndef LOCAL_MUTEX_H
  +     #define LOCAL_MUTEX_H
  |     #define _GNU_SOURCE
  |     #define _REENTRANT
  |     #include <iostream>
  |     #include <cstdio>
 10     #include <pthread.h>
  |     #include <fstream>
  |     #include <stdlib.h>
  |     #include <unistd.h>
  |     #include <sys/types.h>
  +     #include <sys/wait.h>
  |     #include <sys/time.h>
  |                                                // When we share a mutex
  |     #include <sys/ipc.h>                       // we will need these.
  |     #include <sys/shm.h>
 20     static const char *BUFFER="./buffer";
  |     static const int MAX=99;
  |     void do_work( void );
  |     using namespace std;
  |     #endif

Most of what we see in the header file is self-explanatory or has been covered in previous sections. However, there are some items of note. The first mutex program example contains multiple threads within one process space; the second example uses single threads, each associated with its own heavyweight process. When interprocess coordination is required, the mutex and two additional common variables must be mapped to a shared memory location. To accomplish this, we need to use the IPC shared memory functions (discussed in Chapter 8), and thus must include their corresponding header files. The defined constant BUFFER is the name of a local file that will be used as a common shared location to store generated data.

The listing containing the common logic for the production and consumption of data is shown in Program 11.5.PC.

Program 11.5.PC Common producer—consumer code for mutex examples.
File : p11.5.PC.cxx
  |     /*
  |           Common producer & consumer code
  |     */
  |     #include "local_mutex.h"
  +     struct timespec some_time;
  |     fstream         fptr;                  // common buffer location
  |     extern pthread_mutex_t *m_LOCK;        // shared mutex
  |     extern int             *s_shm,         // setup flag
  |                            *c_shm;         // counter
 10     /*
  |         Generate a random # within specified range
  |     */
  |     int
  |     my_rand(int start, int range){
  +       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
  |     }
 20     /*
  |           Produce a random # and write to a common repository
  |     */
  |     void
  |     produce( ) {
  +       int   err, *n;
  |       cout << pthread_self( ) << "\t P: attempting to produce \t"
  |            << getpid( ) << endl;
  |       cout.flush( );
  |       if (pthread_mutex_trylock(m_LOCK) != 0) {      // LOCK
 30         cout << pthread_self( ) << "\t P: lock busy             \t"
  |              << getpid( ) << endl;
  |         cout.flush( );
  |         return;
  |       }
  +       n  = new int;                                  // allocate
  |       *n = my_rand(1,MAX);
  |       fptr.open(BUFFER, ios::out | ios::app);        // Open for append
  |       fptr.write( (char *) n, sizeof(*n) );
  |       fptr.close( );
 40       delete n;                                      // release
  |       cout << pthread_self() << "\t P: The # [" << *n
  |            << "] deposited    \t" << getpid( )  << endl;
  |         cout.flush( );
  |       some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  +       nanosleep(&some_time, NULL);                   // sleep a bit
  |       if ((err=pthread_mutex_unlock(m_LOCK)) != 0){  // UNLOCK
  |         cerr << "P: unlock failure " << err << endl;
  |         cout.flush( );
  |         exit(102);
 50       }
  |     }
  |     /*
  |           Consume the next random number from the common repository
  |     */
  +     void
  |     consume( ) {
  |       int             err, *n;
  |       cout << pthread_self( ) << "\t C: attempting to consume \t"
  |            << getpid( ) << endl;
 60       cout.flush( );
  |       if (pthread_mutex_trylock(m_LOCK) != 0) {      // LOCK
  |         cout << pthread_self( ) << "\t C: lock busy             \t"
  |              << getpid( ) << endl;
  |         cout.flush( );
  +         return;
  |       }
  |       fptr.open(BUFFER, ios::in);                   // Try to read
  |       if ( fptr )  {                                 // If present
  |         fptr.close( );
 70         fptr.open (BUFFER, ios::in|uis::out);        // Reopen for R/W
  |       }
  |       fptr.seekp( *c_shm * sizeof(int), ios::beg );
  |       n = new int;                                   // allocate
  |       *n = 0;
  +       fptr.read( (char *)n, sizeof(*n));
  |       if ((*n) > 0) {                               // For positive values
  |         cout << pthread_self() << "\t C: The # [" << *n
  |              << "] obtained    \t" << getpid( )   << endl;
  |         cout.flush( );
 80         fptr.seekp( *c_shm * sizeof(int), ios::beg );
  |         *n = -(*n);
  |         fptr.write( (char *) n, sizeof(*n) );
  |         fptr.close( );
  |         ++*c_shm;                                   // increment counter
  +       } else {
  |         cout << pthread_self( ) << "\t C: No new # to consume     \t"
  |              << getpid( ) << endl;
  |         cout.flush( );
  |       }
 90       delete n;                                       // release
  |       fptr.close( );
  |       some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  |       nanosleep(&some_time, NULL);
  |       if ((err=pthread_mutex_unlock(m_LOCK)) != 0){  // UNLOCK
  +         cerr<< "C: unlock failure " << err << endl;
  |         exit(104);
  |       }
  |     }
  |     /*
100       Simulate some work, 10 iterations about half produce, half consume
  |     */
  |     void
  |     do_work( ) {
  |       if (!(*s_shm)) {                               // Clear @ start
  +         pthread_mutex_lock(m_LOCK);                  // LOCK
  |         if (!(*s_shm)++) {
  |           cout << pthread_self( ) << "  \t  : clearing the buffer  \t"
  |                << getpid() << endl;
  |           fptr.open( BUFFER, ios::out | ios::trunc );
110           fptr.close( );
  |         }
  |         pthread_mutex_unlock(m_LOCK);                // UNLOCK
  |       }
  |       for (int i = 0; i < 10; ++i) {
  +         some_time.tv_sec = 0; some_time.tv_nsec = 10000;
  |         nanosleep(&some_time, NULL);                 // sleep a bit
  |         switch ( my_rand(1,2) ) {
  |         case 1:
  |           produce();
120           break;
  |         case 2:
  |           consume();
  |         }
  |       }
  +     }

Overall, the produce and consume code listed in Program 11.5.PC is very much the same as what we saw in the Chapter 7 example. Nonetheless, there have been some important changes and a few additions. At the top of the listing a timespec structure, some_time, is declared. The program uses the nanosleep real-time library function instead of sleep to suspend the current thread from execution.

Next is the declaration of a file stream pointer, fptr, which is used to reference the file where generated values will be stored and retrieved. Following this is a reference to a mutex (m_LOCK) and two references to integers (s_shm and c_shm). The first, s_shm, is used as a flag to indicate whether or not the file, used as a storage place for generated data, has been cleared (reset). The second, c_shm, is used as a counter—index offset for the current item to be retrieved from the file. As the variables m_LOCK, s_shm, and c_shm were initially declared in the source file with the code for the function main, they are qualified as extern (external) here.

As shown, the user-defined produce function attempts, using the pthread_mutex_trylock call, to lock the mutex referenced by m_LOCK. If it is not successful, a message is displayed and the function is exited. If the mutex can be obtained, a random value is generated. The random value is stored at a temporarily allocated memory location referenced by n. Once the value is stored in the file, the file buffer is flushed and the file closed. Next, the temporary location is freed, and a call to nanosleep is made to simulate some additional processing. As a last step, the mutex is released.

Conversely, the user-defined consume function tries to lock the mutex. If it is successful, it tries to consume (read) a number from the file. To accomplish this, the file is opened for reading and writing (ios::in | ios::out). The offset into the file is calculated using the current index referenced by c_shm multiplied by the sizeof of the data value written to the file. The seekp method is used to move the file pointer to the proper offset in the file where the value is to be retrieved. As in the produce function, a temporarily allocated location is used to store the retrieved value. The newly allocated location is initially cleared (set to 0) prior to the call to read. If the value obtained is positive, the value is displayed. The displayed value is written back to its same location in the file as a negative number. The seekp method is used to move the file pointer back to its proper location so the update process will overwrite the correct data item. Rewriting the value as a negative is used as a technique for keeping track of consumed values. Once the value is rewritten, the file buffer is flushed and the current index value is incremented. Whether or not a valid value is retrieved, the file is closed. A short nanosleep (again to simulate additional processing) is made, and the temporary storage location freed. Finally, the mutex is unlocked and made available to other threads.

The user-defined function do_work uses the s_shm reference to determine if the file that stores the output has been cleared. Upon first entry, *s_shm references a 0. In this case the if logic is entered. The mutex referenced by m_LOCK is used to bracket access to the code that opens the file for writing (ios::out), truncating (ios::trunc) its content. The value referenced by s_shm is incremented in this process, allowing the initialization to occur only once. Following this, the user-defined do_work function sleeps for a few seconds to permit random startup times, and then loops 10 times to simulate a series of production and consumption events. If the random number generator is decent, there should be a somewhat even split of the calls to produce and consume within the loop. The loop also contains a call to nanosleep.

Program 11.5.INTRA contains the code for the function main for an intraprocess mutex example. All threads in this example share a common address space and are associated with an underlying LWP.

Program 11.5.INTRA Code for function main for intraprocess mutex example.
File : p11.5.INTRA.cxx
  |     /*
  |        INTRA process main (multiple threads - one process space)
  |        Compile: g++  p11.5.PC.cxx  p11.5.INTRA.cxx  -lpthread -o INTRA
  |     */
  +     #include "local_mutex.h"
  |     pthread_mutex_t LOCK,   *m_LOCK = &LOCK;
  |     int             setup,  *s_shm  = &setup,
  |                     current,*c_shm  = &current;
  |     int
 10     main(int argc, char *argv[]) {
  |       int  i, n;
  |       pthread_t worker[MAX];                 // worker TID's
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  +         return 1;
  |       }
  |       pthread_mutex_init(&LOCK,  NULL);
  |       *s_shm = 0;                            // Start as NOT setup
  |       *c_shm = 0;                            // current index (offset)
 20       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
                                                 // # of threads to create
  |       for( i=0; i < n; ++i)                  // create each thread
  |         pthread_create( &worker[i], NULL,
                           (void *(*)(void *))do_work, (void *)NULL );
  |                                              // wait for all to finish
  |       for(i=0; i < n; ++i )
  +         pthread_join(worker[i], (void **) NULL);
  |                                              // show contents of buffer
  |       cout << "Contents of " << BUFFER
  |            << " negative values were 'consumed'." << endl;
  |       fstream  fptr;
 30       bool     done = false;
  |       fptr.open( BUFFER, ios::in );
  |       while ( !done ) {
  |         fptr.read( (char *)&n, sizeof(n) );
  |         if ( fptr.fail() )
  +           done = true;
  |         else
  |           cout << n << endl;
  |       }
  |       fptr.close( );
 40       return 0;
  |     }

In Program 11.5.INTRA several static declarations are made (by their placement prior to the function main). These are a mutex called LOCK and a reference to the mutex called *m_LOCK. The variables setup (is the file setup—cleared) and current (the index into the file) are also statically allocated. In main the mutex is initialized (line 17). Remember that initialization should be done only once—re-initializing an already initialized mutex results in undefined behavior. As the value NULL is passed as the second argument to pthread_mutex_init, the mutex has the default characteristics. The next two statements assign both the setup and current variables the value 0. Next, the program checks the value passed in on the command line. This value represents the number of threads to be produced. As written, the value should be less than MAX (set arbitrarily at 99 in the include file). A for loop is used to create the specified number of threads. The thread IDs are saved, and each thread is passed a reference to the do_work function. As the do_work function reference is not the required data type for this parameter, it must be cast to keep the compiler from complaining. Once all the threads are created, the program waits for all the threads to terminate. When all threads are done, the contents of the file (the common buffer used by the threads) is displayed.

A compilation and partial run of the program is shown in Figure 11.8.

Figure 11.8 A compile and partial run of Program 11.5 with intraprocess mutexes.
linux$ g++  p11.5.PC.cxx  p11.5.INTRA.cxx  -lpthread -o INTRA
linux$ INTRA 3
1026      : clearing the buffer         18353
1026     P: attempting to produce       18353
1026     P: The # [94] deposited        18353        <-- 1
2051     C: attempting to consume       18354
2051     C: lock busy                   18354
3076     C: attempting to consume       18355
3076     C: lock busy                   18355
. . .
1026     C: attempting to consume       18353
3076     P: attempting to produce       18355
3076     P: lock busy                   18355
1026     C: The # [48] obtained         18353
2051     C: attempting to consume       18354
2051     C: No new # to consume         18354
. . .
1026     P: The # [51] deposited        18353
1026     P: attempting to produce       18353
1026     P: The # [30] deposited        18353
Contents of ./buffer negative values were 'consumed'.
-94
-48
-50
-58
-98
-49
51
30

(1) Notice the different thread IDs. Each thread is mapped to a separate LWP (that has its own PID).

While the output of this program tends to be somewhat voluminous, it is very informative. With all the cout statements, it is easy to see what individual threads are doing. In this case there are three threads, with the thread IDs of1026, 2051, and 3076. The mutex forces the competing threads to access the shared data structure in a manner that prevents data loss. If we rerun the program and pass the output to grep and keep only the lines containing the # symbol, we should find the output to be well behaved and furthermore note that the values placed in the file by one thread can be consumed by a different thread. A sample of this type of output is shown in Figure 11.9.

Figure 11.9 Filtering the output of Program 11.3.
linux$ INTRA 3 | grep '#'
1026     C: No new # to consume         18321
3076     P: The # [51] deposited        18323
2051     P: The # [77] deposited        18322
3076     C: The # [51] obtained         18323
2051     P: The # [61] deposited        18322                                                     <-- 1
3076     P: The # [86] deposited        18323
2051     C: The # [77] obtained         18322
1026     C: The # [61] obtained         18321                                                     <-- 1
3076     C: The # [86] obtained         18323
1026     C: No new # to consume         18321
2051     P: The # [33] deposited        18322
3076     P: The # [96] deposited        18323
2051     C: The # [33] obtained         18322
3076     P: The # [91] deposited        18323

(1) Deposited by thread 2051 and consumed by thread 1026

11-6 EXERCISE

In the Chapter 7 example we used two semaphores (MADE and READ) to coordinate activities. How is it that in this example we were able to coordinate activities using just one mutex?

Edit the source file p11.5.PC.cxx and comment out all references to nanosleep. Recompile the program. Run the INTRA executable several times with a varying number of threads. Redirect the output to grep and have it search for the word obtained. Pass this output to wc -l to find the number of actual values obtained. For example,

linux$ INTRA 5 | grep obtained | wc -l
       18
linux$ INTRA 10 | grep obtained | wc -l
       40

Is there any relationship between the number of threads specified and the number of overall values obtained? (It's best to run each sequence a number of times and obtain an average.) If yes, what is the relationship (graphs are acceptable)? If no, why is there no relationship?

The section of code containing main can be rewritten to support multiple heavyweight processes using a mutex mapped to a shared memory location. In this implementation the setup and current variables, which must be accessible across processes, are also mapped to a shared memory location. Program 11.5.INTER displays the code to accomplish this.

Program 11.5.INTER Code for function main for interprocess mutex example.
File : p11.5.INTER.cxx
  |     /*
  |        INTER process main (multiple processes - 1 thread each)
  |        Compile: g++  p11.5.PC.cxx  p11.5.INTER.cxx  -lpthread -o INTER
  |     */
  +     #include "local_mutex.h"
  |     pthread_mutex_t  *m_LOCK;                 // Shared memory pointer
  |     int              m_shmid, i_shmid,        // Shared memory IDs
  |                      *s_shm, *c_shm;          // Shared setup and counter
  |     int
 10     main(int argc, char *argv[]) {
  |       pthread_mutexattr_t  the_attr_obj;             // attribute object
  |       int  i, n;
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  +         return 1;
  |       }
  |       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
  |       if((m_shmid=shmget(IPC_PRIVATE,sizeof(pthread_mutex_t),IPC_CREAT| 0666))<0){
  |         perror("shmget fail mutex");
 20         return 2;
  |       }
  |       if ((m_LOCK=(pthread_mutex_t *)shmat(m_shmid,0,0)) == (pthread_ mutex_t *) -1){
  |         perror("shmat fail mutex");
  |         return 3;
  +       }
  |       if ((i_shmid=shmget(IPC_PRIVATE,sizeof(int)*2,IPC_CREAT|0666))<0){
  |         perror("shmget fail ints");
  |         return 4;
  |       }
 30       if ((s_shm=(int *) shmat(i_shmid, 0, 0)) == (int *) -1){
  |         perror("shmat ints");
  |         return 5;
  |       }
  |       c_shm  = s_shm + sizeof(int);            // reference  correct loc
  +       *s_shm = *c_shm = 0;                     // start counter (offset)
  |       pthread_mutexattr_init( &the_attr_obj);  // initialize attrib obj
  |       for( i=0; i < n; ++i)
  |         if ( fork() == 0 ){                    // generate child process
  |           do_work( );                          // child process does work
 40           exit( 2 );
  |         }
  |       while( (n = (int) wait( NULL)) && n != -1 )  // wait for child processes
  |                     ;
  |       shmdt((char *) m_LOCK);                  // cleanup shared memory
  +       shmdt((char *) s_shm);
  |       shmctl(m_shmid, IPC_RMID, (struct shmid_ds *) 0);
  |       shmctl(i_shmid, IPC_RMID, (struct shmid_ds *) 0);
  |       cout << "Contents of " << BUFFER         // show contents of buffer
  |            << " negative values were 'consumed'." << endl;
 50       fstream  fptr;
  |       bool     done = false;
  |       fptr.open( BUFFER, ios::in );
  |       while ( !done ) {
  |       fptr.read( (char *)&n, sizeof(n) );
  +         if ( fptr.fail() )
  |           done = true;
  |         else
  |           cout << n << endl;
  |       }
 60       fptr.close( );
  |       return 0;
  |     }

While some of the code is similar to the preceding intraprocess example, additional steps are taken to create and manipulate the shared mutex as well as the shared data values (setup and current). First, a shared memory segment large enough to hold a mutex is allocated (line 18), and a reference to the segment is to m_LOCK. Second, a shared memory segment large enough to hold two integers is allocated. The first part of the second segment, which will hold the value for setup, is referenced by s_shm. The second half of the segment, used to hold the value of current, is referenced by c_shm. The sizeof operator is used to find the proper offset for the second integer reference (see line 34). The shared memory locations for setup and current are set to 0. The for loop that generated the threads is replaced with a loop to generate child processes (using fork). The child process, which by default has a single thread of control, executes the do_work function. The initial process waits for all the child processes to terminate. Once the child processes are finished, the shared memory is removed and the contents of thecommon buffer (the file) are displayed. This program segment can be compiled with the statement

linux$ g++ p11.5.PC.cxx p11.5.INTER.cxx -lpthread -o INTER

The output of this interprocess mutex example should be similar to the intraprocess example with the thread IDs remaining constant at 1024. A sample output sequence is shown in Figure 11.10.

Figure 11.10 A run of Program 11.3 with inter-process mutexes.
linux$ INTER 3 | grep #
1024     P: The # [76] deposited        18755
1024     P: The # [25] deposited        18754
1024     C: The # [76] obtained         18755
1024     P: The # [68] deposited        18754
1024     C: The # [25] obtained         18755        <-- 1
1024     C: The # [68] obtained         18754
1024     C: No new # to consume         18753
1024     C: No new # to consume         18755
1024     C: No new # to consume         18754
1024     C: No new # to consume         18755
1024     P: The # [17] deposited        18754
1024     C: The # [17] obtained         18755

(1) Notice all the thread IDs are the same. Eachthread is associated with a heavyweight (standard) process—not with a LWP

11-7 EXERCISE

Which example (the INTRA or the INTER) seems to be able to process the greatest number of values for the number of threads and processes involved? Run the two versions with /usr/bin/time to obtain CPU time-usage data. You may want to redirect the output of the programs to /dev/null to reduce the volume of output. Why are the results so different?

11.8.2 Condition Variables

Sometimes we need to coordinate the activities of threads using the current value of mutex-protected data. Say, for example, we want to notify a reader thread once a writer thread has filled its data set. The counter for the number of items in the data set and the access to the data is protected by a mutex. The POSIX implementation of threads provides a construct called a condition variable that can be used for this purpose. A condition variable is associated with a specific mutex and predicate (condition). Similar to a mutex, a condition variable should be global and can be mapped to shared memory if it is to be used by threads in more than one process space. A thread uses the condition variable to either notify other cooperating threads (with access to the same condition variable) that the condition (predicate) has been met or to block and wait for the receipt of notification. When a thread blocks on a condition variable, it atomically releases the associated mutex, allowing other threads access to the mutex. Several threads can be blocked, waiting for the same notification. The thread that generates the notification does so by signaling the associated condition variable.

The majority of what was discussed concerning the creation and initialization techniques of a mutex also applies to the creation and initialization ofacondition variable. The corresponding library functions have the occurrences of the string _mutex_ replaced with _cond_.

As with a mutex, a condition variable attribute object can be created, set, and then referenced when creating a condition variable. However, at present the LinuxThreads implementation of POSIX threads does not support condition variable attributes, and the reference to the cond_attr object (the second parameter to the pthread_cond_init function) is ignored. Like a mutex, a condition variable can be created and initialized in a single statement:

pthread_cond_t my_condition = PTHREAD_COND_INITIALIZER;

When a thread wants to notify others, it uses the library function pthread_cond_signal (signal one thread) or pthread_cond_broadcast (signal all threads). The specifics for the condition variable notification functions can be found in Table 11.18.

Table 11.18. The Condition Variable Notification Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_cond_signal(   pthread_cond_t *cond );
int pthread_cond_broadcast(pthread_cond_t *cond );

Return

Success

Failure

Sets errno

0

Nonzero

 

The argument *cond is a reference to a condition variable of the type pthread_cond_t. When pthread_cond_signal is used, one thread blocked on the same condition variable will be unblocked. If several threads are blocked, the thread receiving notification is not specified. If pthread_cond_broadcast is called, all threads blocked on the condition variable are notified. Once awakened, a thread must still acquire the associated mutex. Either call essentially becomes a no-op if there are no threads waiting for notification. The value EINVAL (22) is returned if *cond references an illegal address.

The library functions pthread_cond_wait and pthread_cond_ timedwait cause the calling thread to wait and block on a condition variable. Under the covers these functions atomically unlock the associated mutex (which must be locked prior to the call), suspend the thread's execution, and relock the mutex (by issuing a pthread_lock_mutex). The waiting thread does not consume CPU time.

Table 11.19. The Condition Wait Library Functions.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int pthread_cond_wait(pthread_cond_t  *cond,
                      pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,
                     pthread_mutex_t      *mutex,
                     const struct timespec 
graphics/ccc.gif*abstime);

Return

Success

Failure

Sets errno

0

Nonzero

 

The first argument, *cond, is a reference to a condition variable. The second argument, *mutex, is a reference to the associated mutex. The pthread_cond_timedwait function is similar to pthread_cond_ wait except it will time out and return an error, ETIME (62), if the value referenced by *abstime is met or exceeded. The *abstime argument for pthread_cond_timedwait references a timespec structure that can be tracked down to the following structure:

typedef struct  timespec {
             time_t         tv_sec;         /* seconds         */
             long           tv_nsec;        /* and nanoseconds */
    } timespec_t;

If a signal or fork interrupts either of these calls, the value EINTR (4) is returned. If any of the arguments are invalid, the value EFAULT (14) is returned.

Program 11.6, a variation of a bounded buffer producer-consumer problem, demonstrates the use of a condition variable. In this program, a reader thread continually reads data from standard input and fills a small data buffer. When the buffer is full, the reader thread notifies a writer thread to empty (display) the buffer so that it may be filled again. When an end-of-file is encountered, notification is sent to the writer thread to write out any partial data left in the buffer. A finished flag is set, notifying both the reader and writer that processing is complete.

Program 11.6 Using a condition variable.
File : p11.6.cxx
  |     /*
  |         Using a condition variable
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <iostream>
  |     #include <cctype>
  |     #include <pthread.h>
  |     using namespace std;
 10     const int MAX=5;
  |                                                   // global
  |     pthread_mutex_t lock_it  = PTHREAD_MUTEX_INITIALIZER;
  |     pthread_cond_t  write_it = PTHREAD_COND_INITIALIZER;
  |     typedef struct {                              // a small data buffer
  +       char            buffer[MAX];                // the buffer
  |       int             how_many;                   // # of chars in buffer
  |     } BUFFER;
  |     BUFFER         share = {"", 0};               // start as empty
  |     void           *read_some (void *),
 20                    *write_some(void *);
  |     bool           finished = false;using namespace std;
  |     int
  |     main( ) {
  |     pthread_t       t_read,
  +                     t_write;                     // TID's
  |                                                  // create the threads
  |       pthread_create(&t_read,  NULL, read_some, (void *) NULL);
  |       pthread_create(&t_write, NULL, write_some,(void *) NULL);
  |                                                   // wait for the writer
 30       pthread_join(t_write, (void **) NULL);
  |       pthread_mutex_destroy( &lock_it  );         // clean up
  |       pthread_cond_destroy(  &write_it );
  |      return 0;
  |    }
  +    //        Code to fill the buffer
  |    void *
  |    read_some(void * junk) {
  |      char  ch;
  |      cout << "R " << pthread_self( ) << "\t: Starting" << endl;
 40      while (!finished) {
  |        pthread_mutex_lock(&lock_it);
  |        if (share.how_many != MAX) {                 // buffer not full
  |          cin.get(ch);
  |          if ( cin.fail( ) ) {                       // end-of-file
  +            share.buffer[share.how_many] = (char)NULL;
  |            share.how_many = MAX;
  |            finished       = true;                // we are all done
  |            cout << "R " << pthread_self( ) << "\t: Signaling done" << endl;
  |            pthread_cond_signal(&write_it);       // signal condition var
 50            pthread_mutex_unlock(&lock_it);
  |            break;
  |          } else {                                // sanitize input chars
  |            share.buffer[share.how_many] =  isalnum(ch) ? ch : '#';
  |            cout << "R " << pthread_self( ) << "\t: Got char ["
  +                         << share.buffer[share.how_many++] << "]" << endl;
  |            if ( share.how_many == MAX ) {           // if full
  |              cout << "R " << pthread_self( ) << "\t: Signaling full" << endl;
  |              pthread_cond_signal(&write_it);
  |            }
 60          }
  |        }
  |        pthread_mutex_unlock(&lock_it);
  |      }
  |      cout << "R " << pthread_self( ) << "\t: Exiting" << endl;
  +      return NULL;
  |    }
  |    //    Code to write (display) buffer
  |    void *
  |    write_some(void *junk) {
 70      int i;
  |      cout << "W " << pthread_self( ) << "\t: Starting" << endl;
  |      while (!finished ) {
  |        pthread_mutex_lock(&lock_it);
  |        cout << "W " << pthread_self( ) << "\t: Waiting" << endl;
  +        while (share.how_many != MAX)                // while not full
  |          pthread_cond_wait(&write_it, &lock_it);    // wait for notify
  |        cout << "W " << pthread_self( ) << "\t: Writing buffer" << endl;
  |        for( i=0; share.buffer[i] && share.how_many; ++i, share.how_many--)
  |          cout.put(share.buffer[i]);
 80        cout.put('\n');
  |        pthread_mutex_unlock(&lock_it);
  |      }
  |       cout << "W " << pthread_self( ) << "\t: Exiting" << endl;
  |       return NULL;
  +     }

In this program a mutex, lock_it, and a condition variable, write_it, are allocated and initialized to their default settings prior to main. Their location, prior to main and to the functions that will reference them, guarantees they will be global in scope and accessible by all threads associated with this process space. A small buffer consisting of five locations and an indexing counter is defined, and a buffer of this type, called share, is allocated and initialized. A Boolean flag called finished is set to false before processing begins. In main, two threads are created: one to be the reader (consuming data from an input source) that executes the read_some function and another to be the writer (producing output) that executes the write_some function. After the threads are created, the program waits for the thread executing the write_some function to terminate. When this occurs, the mutex and condition variables are removed and the program terminates.

The read_some function loops while the finished flag is false. The mutex lock_it is used to serialize access to the code that manipulates theshared buffer. Once the mutex is obtained, the count of the number of characters in the buffer (the predicate) is checked. If the buffer is full, the mutex is released (the assumption being the buffer will be processed by awriter, which will need to gain access via the mutex). If the buffer is not filled, an additional character is obtained from standard input. The new character ischecked; if it is not an end-of-file value, the character is added to the buffer and the character count is incremented. If the character fills the buffer, a call to pthread_cond_signal is made to notify the condition variable write_it. If the input character was an end-of-file value, a NULL value isinserted in the buffer in place of the end-of-file value. Next, the character counter is set to its maximum value to satisfy the predicate check in the writer,the finished flag is set to true, and pthread_cond_signal is used to notify the writer, so the remaining contents of the buffer can be processed.

The thread executing the writer code also loops, while the finished flag is set to false. Like the reader, it uses the lock_it mutex to gain access to the shared code and data. The inner while statement checks the count of the number of characters stored in the buffer. As long as the count is less than the maximum, the thread executing this code continues to block due to the call to pthread_cond_wait. When notified by the reader (when the character count is at the maximum), the while loop is exited, and the writer displays the contents of the common buffer. As the contents of the buffer are displayed, the character counter is decremented accordingly.

A compilation and run of Program 11.6 on a local system is shown in Figure 11.11.

Figure 11.11 A compile and run of Program 11.6.
linux$ $ g++ p11.6.cxx -lpthread -o p11.6
linux$ p11.6                                         <-- 1
R 1026  : Starting
W 2051  : Starting
twinkle toes                                         <-- 2
R 1026  : Got char [t]
W 2051  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full                             <-- 3
W 2051  : Writing buffer
twink
R 1026  : Got char [l]
W 2051  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 2051  : Writing buffer
le#to
R 1026  : Got char [e]
W 2051  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D                                                   <-- 4
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer
es#
W 2051  : Exiting

(1) Program is run—a reader (TID 1026) and writer (TID 2051) thread are generated.

(2) User enters the phrase "twinkle toes," terminated by a carriage return.

(3) Reader thread signals it is full.

(4) User types CTRL+D to signify end-of-file from the keyboard. The remaining stored input is displayed.

When the program is run, its input is obtained from the keyboard. The user enters the string twinkle toes, and the program responds by displaying each character as it is obtained. Display by individual threads is labeled as either R for reader or W for writer, and the thread's ID is given. After the fifth character is processed, the reader thread signals the condition variable. As there is only one writer thread (in this case thread ID 2051), it "wakes up" and processes (displays) the contents of the buffer. Nonalphanumeric characters are displayed as #. When a CTRL+D is entered to indicate end-of-file, the remaining contents of the buffer are displayed and the program terminates.

A little experimentation with this program produces some interesting output. For example, if we duplicate the writer pthread_create statement in main (line 28) so we have two writers, each with its own thread of control, the program on our system produces the output shown in Figure 11.12.

Figure 11.12 A run of Program 11.6 with two writers, using signal notification.
linux$ p11.6
R 1026  : Starting
W 2051  : Starting                                   <-- 1
W 3076  : Starting
twinkle toes
R 1026  : Got char [t]
W 2051  : Waiting
W 3076  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full
W 2051  : Writing buffer                             <-- 2
twink
R 1026  : Got char [l]
W 2051  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 3076  : Writing buffer                             <-- 3
le#to
R 1026  : Got char [e]
W 3076  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer                             <-- 4
es#
W 2051  : Exiting
^C

(1) This time there are two writer threads, TID 2051 and 3076.

(2) Writer TID 2051 is notified first that the buffer is full.

(3) Writer TID 3076 is notified second that the buffer is full.

(4) Writer TID 2051 is notified this time. Buffer is written out and this thread exits. Remaining writer thread does not exit until CTRL+C is entered.

The output shows the writer threads alternating the task of displaying the output. At the end of the input sequence, CTRL+D causes the reader thread to signal the condition variable that termination is necessary. The thread to act upon the signal is the writer thread 2051 (2051 and 3076 are alternating). Writer thread 3076 (the thread ID passed to the single call to join) is unaware of the change, continues looping, and must be terminated with a CTRL+C.

If we keep two writer threads and change the two pthread_cond_signal calls to pthread_cond_broadcast (lines 49 and 58), we obtain the output shown in Figure 11.13.

Figure 11.13 A run of Program 11.6 with two writer threads, using broadcast notification.
linux$ p11.6
R 1026  : Starting
W 2051  : Starting
W 3076  : Starting
twinkle toes
R 1026  : Got char [t]
W 2051  : Waiting
W 3076  : Waiting
R 1026  : Got char [w]
R 1026  : Got char [i]
R 1026  : Got char [n]
R 1026  : Got char [k]
R 1026  : Signaling full
W 3076  : Writing buffer
twink
R 1026  : Got char [l]
W 3076  : Waiting
R 1026  : Got char [e]
R 1026  : Got char [#]
R 1026  : Got char [t]
R 1026  : Got char [o]
R 1026  : Signaling full
W 3076  : Writing buffer
le#to
R 1026  : Got char [e]
W 3076  : Waiting
R 1026  : Got char [s]
R 1026  : Got char [#]
^D
R 1026  : Signaling done
R 1026  : Exiting
W 2051  : Writing buffer
es#
W 2051  : Exiting
^C

In this example, when all threads are awakened with the call to pthread_cond_broadcast and placed in contention, the thread with the ID of 3076 is always the first to act on the signal until the last (exiting) broadcast is made. Keep in mind that the actual thread chosen by the operating system is not specified. While our example seems robust, there are still some conditions we did not try (see Exercise 11.9).

11-8 EXERCISE

To some, the second while statement on line 75 in the user-defined writer function write_some appears to be superfluous. Can this while statement be replaced with an if statement? Why, or why not?

11-9 EXERCISE

Three computer science students, Alice, Kumar, and Rocco were experimenting with the original version of Program 11.6 on a Linux system. They wanted to see if the program would process information correctly if there were multiple reader threads with a single writer thread and multiple reader threads with multiple writer threads. Also, they were curious as to whether or not starting the writer thread(s) before the reader thread(s) would cause the program to fail. What did these students find (and why)? Be sure to compile, run, and record the program's output to document what you found.

11.8.3 Read/Write Locks

When writing programs, it is not unusual to run into a situation, such as with adatabase, where the data involved is read more often than it is modified (written). In these situations a locking mechanism that permits simultaneous reading of data if no writing is occurring and the writing of data if no reading or writing of data is needed. Until fairly recently, a POSIX thread implementation of read/write locks was not available and users were left to write their own. Fortunately, newer versions of LinuxThreads contain support for POSIX-based read/write locks.

A read/write lock should always be set before it is used. A read/write lock is initialized with the library function pthread_rwlock_init (Table 11.20).

Table 11.20. The pthread_rwlock_init Library Function.

Include File(s)

<pthread.h>

Manual Section

3

Summary

int
pthread_rwlock_init( pthread_rwlock_t     *rwlock,
               const pthread_rwlockattr_t *attr );

Return

Success

Failure

Sets errno

0

Nonzero

 

The data type of the first argument of this call, pthread_rwlock_t, is defined as

typedef struct _pthread_rwlock_t {
struct _pthread_fastlock __rw_lock; /* Lock to guarantee mutual exclusion */
int __rw_readers;                   /* Number of readers */
_pthread_descr __rw_writer;         /* Identity of writer, or NULL if none */
_pthread_descr __rw_read_waiting;   /* Threads waiting for reading */
_pthread_descr __rw_write_waiting;  /* Threads waiting for writing */
int __rw_kind;                      /* Reader/Writer preference selection */
int __rw_pshared;                   /* Shared between processes or not */
} pthread_rwlock_t;

The argument *rwlock is a reference to the read/write lock to be initialized. The argument attr is used to reference an attribute object (similar to previously presented pthread functions). If this argument is set to NULL, the read/write lock is set to the default.

A typical read/write lock initialization sequence for use with multiple threads in a single process is.[19]

[19] As in previous initialization discussions, the explicit call pthread_rwlock_init can be skipped when we use the single statement approach; that is,


pthread_rwlock_t rw_lock = PTHREAD_RWLOCK_DEFAULT_NP;.

pthread_rwlock_t  rw_lock;
. . .
pthread_rwlock_init( &rw_lock, NULL );

If the pthread_rwlock_init call is successful, it returns a 0. If the call fails, a nonzero value is returned. The return of EINVAL (22) indicates an invalid argument.

As with mutexes, there is a suite of read/write lock manipulation functions. A summary of some of the more commonly used functions is shown in Table 11.21. All the functions take a single reference to an allocated read/write lock. We restrict our discussion to the basics: locking and unlocking a read/write lock.

Table 11.21. Some Common Read/Write Lock Library Functions.

Function Prototype

Description

int pthread_rwlock_rdlock(
    pthread_rwlock_t *rwlock );

Locks the referenced read/write lock for reading. If the lock is currently held for writing, the calling thread blocks. Multiple threads can hold the lock for reading.

int pthread_rwlock_wrlock(
    pthread_rwlock_t *rwlock );

Locks the referenced read/write lock for writing. If the lock is currently held for reading or writing, the calling thread blocks. Only one thread can hold the lock for writing.

int pthread_rwlock_unlock(
    pthread_rwlock_t *rwlock );

Unlock a read/write lock held by the calling thread. If other threads are blocked on the read/write lock, one of them will be unblocked. At present, the implementation favors blocked writers over blocked readers. If the calling thread does not hold a lock for reading or writing but issues this unlock call, the program's behavior is undefined.

int pthread_rwlock_tryrdlock(
    pthread_rwlock_t *rwlock );

Try to lock the referenced read/write lock for reading. If the lock is not held for writing, it returns a read lock. If the lock is currently heldfor writing, it returns the error value EBUSY (16).

int pthread_rwlock_trywrlock(
    pthread_rwlock_t *rwlock );

Try to lock the referenced read/write lock for writing. If the lock is not held for reading or writing, return a write lock. If the lock is currently held for reading or writing, return the error value EBUSY (16).

Program 11.7 uses read/write locks to allow multiple threads read/write access to a stack of characters stored as a singularly linked list. Each thread can push (add) a character to the list, pop (remove) a character from a non-empty list, display the list, sleep, or quit. A random number generator drives the activities of each thread. Threads compete with one another for access to the list. The header file for Program 11.7 is shown below.

Program 11.7 Header file for read/write lock example.
File : local_stack.h
  |     /*
  |          Common local header file: local_stack.h
  |     */
  |     #ifndef  LOCAL_STACK_H
  +     #define  LOCAL_STACK_H
  |     #define  _GNU_SOURCE
  |     #define  _REENTRANT
  |     #include <iostream>
  |     #include <cstdlib>
 10     #include <pthread.h>
  |     #include <unistd.h>
  |     #include <sys/time.h>
  |     using namespace std;
  |     const int MAX=6;
  +     class Stack {
  |        public:
  |                   Stack     ( ) : head( NULL ) {}    <-- 1
  |                   ~Stack    ( );
  |           bool    StackEmpty( void ) const { return (head == NULL); }
 20           void    Display   ( void ) const ;
  |           void    Push      ( const char );
  |           char    Pop       ( void );
  |        private:
  |           struct node {
  +              char         item;
  |              struct node *next;
  |           };
  |           node *head;
  |     };
 30     #endif

(1) User-defined Stack class implemented as a linked list.

As might be expected, the content of this file is similar to that of the header file for the previous example. However, some new items have been added. This example uses a user-defined Stack class. The definition of the class is found at the bottom of the header file. Code for the more complex Stack methods is found in the Program 11.7B. Additionally, Program 11.7B contains the code each thread will execute. This code consists of a driver function called do_stack that randomly chooses an activity for the thread on each pass through the loop.

Program 11.7B Stack class methods and common list manipulation functions for read/write lockexample.
File : p11.7B.cxx
  |     #include "local_stack.h"
  |                                                // previously declared
  |     extern pthread_rwlock_t  *rw_ACCESS;       // RW lock
  |     extern Stack   *S;                         // Stack
  +                                                // remaining Stack methods
  |     Stack::~Stack( ){                          // List destructor
  |       node *curr = head, *next;
  |       while( curr ){
  |         next = curr->next;
 10         delete curr;
  |         curr = next;
  |       }
  |       head = NULL;
  |     }
  +     void                                       // Display the list
  |     Stack::Display( void ) const {
  |       node *temp = head;
  |       cout << "\t" << pthread_self() << " [head]" << endl;
  |       while( temp != NULL ){
 20          cout << "\t" << pthread_self() << " [" << temp->item
  |               << "]" << endl;
  |          cout.flush( );
  |          temp = temp->next;
  |          sleep(1);                             // slow things down
  +       }
  |       cout << "\t" << pthread_self( ) << " [tail]" << endl;
  |     }
  |     void                                       // Add an item
  |     Stack::Push( const char item ){
 30       node *temp = new node;
  |       temp->item = item;
  |       temp->next = head;
  |       head       = temp;
  |     }
  +     char                                       // Remove an item
  |     Stack::Pop( void ){
  |       char item;
  |       node *temp = head;
  |       item = temp->item;
 40       head = temp->next;
  |       delete temp;
  |       return item;
  |     }
  |     int                                        // Random # in range
  +     my_rand(int start, int range){
  |       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
  |                    / (RAND_MAX+1.0));
 50     }
  |     void *
  |     do_stack( void *junk ) {                  // Activity for thread
  |       char  item;
  |       sleep( my_rand(1,3) );                  // random start up time
  +       do {
  |         switch ( my_rand(1,10) ) {            // choose value 1-10
  |         case 1: case 2:                       // Display 2/10
  |           pthread_rwlock_rdlock(rw_ACCESS);   // read lock - block on W
  |           cout << pthread_self( ) << " Display:" << endl;
 60           if ( S->StackEmpty( ) )
  |             cout << pthread_self( ) << " Empty list" << endl;
  |           else
  |             S->Display();
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  +           break;
  |         case 3: case 4: case 5:               // Add item 3/10
  |           item = my_rand(1,25) + 64;
  |           pthread_rwlock_wrlock(rw_ACCESS);   // write lock - block on W|R
  |           cout << pthread_self( ) << " Push   : " << item << endl;
 70           S->Push( item );
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  |           break;
  |         case 6: case 7: case 8:               // Remove item 3/10
  |           pthread_rwlock_wrlock(rw_ACCESS);   // write lock - block
                                                     on W|R
  +           if (S->StackEmpty( ))
  |             cout << pthread_self( ) << " Underflow" << endl;
  |           else {
  |             cout << pthread_self( ) << " Pop    : ";
  |             item = S->Pop( );
 80             cout << pthread_self( ) << " " << item << endl;
  |           }
  |           pthread_rwlock_unlock(rw_ACCESS);   // unlock
  |           break;
  |         case 9:                               // Sleep 1/10
  +           cout << pthread_self( ) << " Sleep  :" << endl;
  |           sleep( my_rand(1,3));
  |           break;
  |         case 10:                              // Quit 1/10
  |           cout << pthread_self( ) << " Quit   :" << endl;
 90           return NULL;
  |         }
  |       } while ( 1 );
  |     }

In the do_stack loop, a random value from 1 to 10 is generated. This value determines the thread's activity. Given a good distribution of random values, approximately 20 percent of the time the thread executing this code should display the current list. About 30 percent of the time the thread should generate a new character and push the character onto the list. Roughly 30 percent of the time the thread should pop a character off the top of the list (if it is not empty). The remaining 20 percent of the thread will either sleep a few seconds or quit its activity.

The activities other than sleep or quit are bracketed with the appropriate read/write lock calls. As the display (reading) of the list can be done by multiple threads, a call to pthread_rwlock_rdlock is made before the display to obtain a read lock, and a call to pthread_rwlock_unlock is made once the display is completed to release the read lock. The Push and Pop methods, which cause the list contents to be modified, are bracketed with a call to pthread_rwlock_wrlock and pthread_rwlock_unlock calls. Thus, only one thread at a time is allowed to modify the linked list. It is important to note that all critical code was bracketed with the read/write locks. For example, if we were to move outside the bracketed area and check for an empty stack found in the section of code that calls Pop (line 75), we would on occasion find our program failing due to race conditions. This could occur when we have one item in our list. For example, say a thread calls Stack_Empty, finds the stack is not empty, and attempts to Pop (remove) the item. At the same time, a second thread (also finding the list to be not empty) also attempts to remove an item. While both consider the list to be not empty, one of the threads will draw an error, as the competing thread will have beaten it to the punch.

Each line of output identifies the underlying thread that generated it. The code for main is found in Program 11.7C.

Program 11.7C Code for function main for read/write lock example.
File : p11.7C.cxx
  |     #include "local_stack.h"
  |                                                    // global by placement
  |     pthread_rwlock_t *rw_ACCESS=new pthread_rwlock_t;
  |     Stack            *S=new Stack;
  +     void *do_stack( void * );
  |     int
  |     main( int argc, char *argv[] ){
  |       int  i, n;
  |       pthread_t worker[MAX];
 10       pthread_rwlock_init(rw_ACCESS, NULL);
  |       if ( argc != 2) {
  |         cerr << "Usage: " << *argv << " n_workers" << endl;
  |         return 1;
  |       }
  +       n = atoi(argv[1]) < MAX ? atoi(argv[1]) : MAX;
  |       for( i=0; i < n; ++i )                         // create threads
  |         pthread_create(&worker[i],NULL,do_stack,(void *) NULL);
  |       for( i=0; i < n; ++i )                         // wait
  |         pthread_join(worker[i], (void **) NULL);
 20       return 0;
  |     }

Figure 11.14 shows a portion of the output generated from a run of Program 11.7 when four threads are competing for access to the list. As can be seen, multiple threads can display the list, but only one thread at a time can modify the list.

Figure 11.14 A compilation and run of Program 11.7 with four competing threads.
linux$ g++ p11.7B.cxx p11.7C.cxx -o p11.7 -lpthread
linux$ p11.7  4
2051 Push   : A
2051 Sleep  :
3076 Pop    : 3076 A
3076 Push   : L                                      <-- 1
3076 Push   : L
3076 Push   : L
3076 Push   : K
3076 Push   : K                                      <-- 1
3076 Push   : F
3076 Quit   :
1026 Pop    : 1026 F
1026 Quit   :
4101 Push   : J
4101 Pop    : 4101 J
4101 Display:                                        <-- 2
        4101 [head]
        4101 [K]
2051 Display:
        2051 [head]                                  <-- 2
        2051 [K]
        4101 [K]
        2051 [K]
        4101 [L]
        2051 [L]
        4101 [L]
        2051 [L]
        4101 [L]
        2051 [L]
        4101 [tail]
4101 Display:
        4101 [head]
        4101 [K]
        2051 [tail]                                  <-- 3
        4101 [K]
        4101 [L]
        4101 [L]                                     <-- 3
        4101 [L]
        4101 [tail]
. . .

(1) A series of letters are pushed onto the list by several different threads.

(2) Thread 4101 begins to display the list. Shortly, thereafter, thread 2051 displays the list as well. This is perfectly acceptable, as more than one thread can access the list concurrently for reading. Their output is interspersed on the screen.

(3) Eventually, each thread finishes its display of the list.

11-10 EXERCISE

Adjust the switch statement in the user-defined do_stack function of Program 11.7 so approximately 70 percent of the thread's activity will be the display of the list. Recompile and run the program using the maximum number of threads. Direct the output to a temporary file (say output.txt). Use the grep, sort, and uniq utilities to obtain information on the number of times push, pop, and display were done. For example, the sequence

linux$ p11.7   6   > output.txt
linux$ grep  Push  output.txt | sort | uniq -c

would tally the number of times the word Push (printed each time push was called) was displayed by each thread. The sequence

linux$ grep  :  output.txt | wc -l

can be used to find the total number of activity lines displayed (each tagged with a :1).

Do you consistently find that percentage you specify is what is actually being done? Why, or why not? Does removing the sleep call in the user-defined Display method of the Stack class make a difference in the distribution of activities—the total number of activities before all threads terminate? Generate output to support your answer. You should run each sequence multiple times to be sure you are obtaining an accurate view of the program's activities.

11.8.4 Multithread Semaphores

The activities of threads may also be coordinated with semaphores. A semaphore is similar in function to a mutex. However, a semaphore can act as either a binary entity similar to a mutex or as a counting entity. Counting semaphores can be used to manage multiple resources. As they are somewhat more complex, semaphores are more system-intensive than mutexes. Semaphore concepts and operations were presented in some detail in Chapter 7. However, the Chapter 7 semaphore operations are System V-based and are not multithread safe. POSIX 1003.1b defines semaphores that can be used with threads. As these semaphore operations were written prior to the creation of the POSIX thread library calls, their interface has a slightly different flavor. Most notably, these operations do not begin with the sequence pthread_ and do set errno when they fail. All programs that contain POSIX semaphore operations must include <semaphore.h>.

Conceptually, a POSIX semaphore is a nonnegative integer value that can be used to synchronize thread activities. Increment (post) and decrement (wait) operations are performed on the semaphore. A decrement issued on a 0 valued semaphore will cause the issuing thread to block until another thread increments the semaphore. Unlike a mutex, for which there is a sense of ownership, a semaphore does not need to be acquired (decremented) and released (incremented) by the same thread.

A semaphore must be initialized before it is used. The library call sem_init, shown in Table 11.22, is used to initialize a semaphore.

Table 11.22. The sem_init Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int
sem_init( sem_t *sem, int pshared,
          unsigned int value );

Return

Success

Failure

Sets errno

0

-1

Yes

The sem_t data type referenced by sem is declared in <semaphore.h> as

/* System specific semaphore definition. */
typedef struct {
  struct _pthread_fastlock __sem_lock;
  int __sem_value;
  _pthread_descr __sem_waiting;
} sem_t;

The sem argument references the semaphore to be initialized. The pshared argument is used to indicate if the semaphore will be shared between processes. A value of 0 indicates the semaphore is not to be shared between processes, while a nonzero value indicates the semaphore is shareable. If the semaphore is to be shared between processes, the programmer is responsible for mapping the semaphore to a shared memory location or to a memory-mapped file. At present, the LinuxThreads implementation of POSIX threads does not support process-shared semaphores. Given this limitation, this argument should always be set to 0. The argument value is a nonnegative integer that specifies the starting value of the semaphore. A successful sem_init call returns a 0 and sets the referenced semaphore to the indicated initial value. If the call fails, it returns a value of -1 and sets errno to indicate the source of the error (see Table 11.23). In a multithreaded setting a semaphore should be initialized only once.

Table 11.23. sem_init Error Messages.

#

Constant

perror Message

Explanation

22

EINVAL

Invalid argument

The value argument exceeds the value of SEM_VALUE_MAX.

89

ENOSYS

Function not implemented

The pshared argument is not 0.

Once created, a semaphore can be locked using the library call sem_wait or sem_trywait. Keep in mind that underneath locking, a semaphore is an atomic decrement operation against the value of the semaphore.

Both calls require a reference to a semaphore of type sem_t. If the referenced semaphore is nonzero, the call decrements (by one) the referenced semaphore. If the semaphore is 0, the sem_wait call blocks until the semaphore becomes greater than zero. If the semaphore is 0, the sem_trywait call does not block and returns immediately. Both calls return a 0 if they are successful; otherwise, sem_trywait returns a -1 and sets errno to the value shown in Table 11.25. Unsuccessful calls do not change the state of the semaphore.

Table 11.24. The sem_wait and sem_trywait Library Functions.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_wait( sem_t * sem );
int sem_trywait( sem_t * sem );

Return

Success

Failure

Sets errno

0

-1

sem_trywait only

Table 11.25. sem_wait and sem_trywait Error Message.

#

Constant

perror Message

Explanation

11

EAGAIN

Resource temporarily unavailable

The sem_trywait found the semaphore to be 0.

Semaphores are unlocked (i.e., incremented) using the sem_post library call (Table 11.26).

Table 11.26. The sem_post Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_post(sem_t *sem);

Return

Success

Failure

Sets errno

0

-1

Yes

The sem_post call unlocks the referenced semaphore. If the semaphore was previously at 0 and there are other threads or LWPs blocking on the semaphore, they will be notified using the current scheduling policy (most often, the highest priority, longest waiting thread or LWP is scheduled next). If the semaphore was not previously at 0, its value is incremented by one. If successful, sem_post returns a value of 0; otherwise, it returns a value of -1 and sets errno to the value in Table 11.27 to indicate the error condition. The sem_post call is asynchronous-signal-safe (able to be called from within a signal handler).

Table 11.27. sem_post Error Message.

#

Constant

perror Message

Explanation

34

ERANGE

Numerical result out of range

If the semaphore were incremented, its value would exceed SEM_VALUE_MAX.

Chapter 7 provides a number of semaphore examples that can be readily adapted to a multithreaded setting by changing the nonmultithreaded semaphore operations to their POSIX multithread equivalents. Rather than duplicate these previous examples, in Program 11.8 I have used a semaphore to coordinate the activity of cooperating threads to determine when the threads have carried on their activities in a specific sequence.

Program 11.8 Using POSIX Semaphores with Threads
File : p11.8.cxx
  |     /*
  |            Using semaphores with threads
  |     */
  |     #define _GNU_SOURCE
  +     #define _REENTRANT
  |     #include <pthread.h>
  |     #include <iostream>
  |     #include <cstdio>
  |     #include <cstdlib>
 10     #include <cstring>
  |     #include <unistd.h>
  |     #include <sys/time.h>
  |     #include <semaphore.h>                        // for POSIX semaphores
  |     using namespace std;
  +     const int BUF_SIZE= 15;
  |     const int MAX     = 4;
  |     int   world_state = 1;
  |     sem_t check_state;
  |     typedef struct {
 20             char word[BUF_SIZE];
  |             int  my_state;
  |     } Info;
  |     void *speaker( Info * );
  |     //   Generate a random # within given range
  +     int
  |     my_rand(int start, int range){
  |       struct timeval t;
  |       gettimeofday(&t, (struct timezone *)NULL);
  |       return (int)(start+((float)range * rand_r((unsigned *)&t.tv_usec))
 30                    / (RAND_MAX+1.0));
  |     }
  |     int
  |     main( int argc, char *argv[] ){
  |       pthread_t t_ID[MAX];
  +       Info     words[MAX];
  |       if ( argc != MAX+1 ) {
  |          cerr << "Usage " << *argv << " word1 ... word" << MAX << endl;
  |          return 1;
  |       }
 40       sem_init( &check_state, 0, 1 );      // start semaphore at 1
  |       for (int i = 0; i < MAX; ++i){
  |         strcpy( words[i].word, argv[i+1] );
  |         words[i].my_state = i+1;
  |         if ( (pthread_create( &t_ID[i],NULL,
  +            ( void *(*)(void *) )speaker,(void *) &words[i])) != 0 ) {
  |          perror("Thread create speaker");
  |          return i;
  |          }
  |       }
 50       pthread_join( t_ID[MAX-1], (void **) NULL);
  |       cout << "!" << endl;
  |       return 0;
  |     }
  |     /*
  +        Display the passed in word
  |     */
  |     void  *
  |     speaker( Info * s ){
  |       while( true ) {
 60         sleep(my_rand(1,3));
  |         sem_wait( &check_state );        // obtain & decrement else block
  |         cout << s->word << " ";
  |         cout.flush( );
  |         if ( s->my_state == world_state ) {
  +           ++world_state;
  |           if ( world_state > MAX ) break;
  |         } else {
  |           cout << endl;
  |           world_state = 1;
 70         }
  |         sem_post( &check_state );          // release & increment
  |       }
  |       return( (void *) NULL );
  |     }

In Program 11.8 the file <semaphore.h> is included, as POSIX semaphores are used. A global integer, world_state (declared before main), is allocated and set to 1. This variable is used by the cooperating threads to determine when processing should stop (i.e., when this variable exceeds the value MAX). Access to the world_state variable is controlled by the semaphore check_state. A typedef is used to create a user-defined type called Info. Items of this type will have storage for 15 characters and an integer value. The character array will hold a short sequence of characters (a word), and the integer, a value indicating the current state of output. In main two arrays are allocated. The first, t_ID, is used to store the thread IDs. The second array, called words, stores the word and state information that is passed to each thread. The sem_init call is used to set the allocated semaphore to 1. As the second argument of this call is 0, only threads within the same process space can share the semaphore. A loop is used to create additional threads and pass each a value obtained from the command line (stored in the elements of the words array) and its state value. Each thread is directed to execute the user-defined function speaker. The thread in main then waits (by way of a pthread_join) for the last thread generated to exit. When the pthread_join is completed, the program concludes.

The speaker function loops continuously. It sleeps for a random number of seconds (1—3), and then attempts to lock (obtain) the semaphore. Once it is successful, the thread displays the word it was passed. It then checks its state value against the current state value for the process (stored as world_state). If its state value is equivalent to the current value of world_state, the world_state is incremented, as progress is being made toward the printing of the words in the proper (command line) order. If the thread's state value is not equivalent to the world_state value, out-of-order processing has occurred, and the world_state variable is reset to 1 to indicate a restarting of the sequence. Once the evaluation and manipulation of world_state is complete the semaphore is released.

A run of Program 11.8 and its output are shown in Figure 11.15.

Figure 11.15 A run of Program 11.8.
linux$ p11.8 once upon a time
upon
a
once time                                            <-- 1
upon
a
once time
upon
once time
a
time
once upon a once
time
a
a
once upon a time !                                   <-- 2

(1) Each time progress is no longer being made, the output sequence restarts.

(2) All command-line arguments are displayed in order; processing stops.

As can be seen, the threads do not finish their activity until they successfully display the sequence in the same order that it was passed on the command line.

The function sem_getvalue, which retrieves the current value of the semaphore, is also supported (Table 11.28).

Table 11.28. The sem_getvalue Library Function.

Include File(s)

<semaphore.h>

Manual Section

3

Summary

int sem_getvalue(sem_t * sem, int * sval);

Return

Success

Failure

Sets errno

0

-1

 

This function stores, in the location referenced by sval, the current value of the semaphore referenced by sem. The returned value should not be used by the program to make decisions, as it is transient and its use in a decision construct could result in race conditions.

11-11 EXERCISE

Write a program that implements a multithreaded bubble—merge sort. Have the initial process generate 10,000 random numbers—writing the numbers in groups of 1,000 each to 10 separate temporary files. Then create 10 threads and pass each a reference to one of the temporary files and a common bubble-sorting routine. As each thread finishes, its sorted results should be returned to the initial thread that performs a merge of sorted results (i.e., the temporary 1,000 number file) with a final, fully sorted file. Where appropriate, use semaphores and/or mutexes or condition variables to coordinate activities. Run your solution several times to be sure it works correctly. Once all the data is ordered, display every 100th value in the final data set to attempt to establish if the data was truly sorted. All temporary files and other data structures should be removed once processing is complete.

If time permits, keep the total number of values to be sorted constant (let's say 10,000) and attempt to determine empirically if there is a lower bound for the size of the list (e.g., 1,000 values per starting list with 10 files versus 100 values per starting list with 100 files, etc.) that is passed to the bubble—merge sort routines whereby no appreciable decrease in processing time is discernible. To maintain your sanity, keep the granularity of the list sizes you try fairly large. The executable file grays_mbms, found with the program files for this chapter, will allow you to compare your solution to that of the author. This program takes two command-line arguments: the number of values to sort and the number of files.


  Previous section   Next section
Top