Main

Parallel programming in standard C++


Reading: Introduction to Parallel Computing; optional C pthreads Tutorial

A simple program with multiple threads (lecture note Bitbucket repo -- week10). First, using C and the pthreads library, and then the same functionality, but using only standard C++.

C version:

  1. // file: threads.c
  2. #include<stdio.h>
  3. #include<pthread.h>
  4.  
  5. void* foo (void* msg) {
  6.     printf(" In foo: %s\n", (const char *) msg);
  7. }
  8.  
  9. void* bar (void* msg) {
  10.     printf(" In bar: %s\n", (const char *) msg);
  11. }
  12.  
  13. void main()
  14. {
  15.     pthread_t t1,t2;
  16.  
  17.     pthread_create(&t1, NULL, foo, "Hello from Task 1");
  18.     pthread_create(&t2, NULL, bar, "Hello from Task 2");
  19.     pthread_join(t1, NULL);
  20.     pthread_join(t2, NULL);
  21. }

The equivalent standard C++ version:

  1. // file: threads1.cpp
  2. #include <string>
  3. #include <iostream>
  4. #include <thread>
  5.  
  6. using namespace std;
  7.  
  8. void foo (string msg) {
  9.     cout << " In foo: " << msg;
  10. }
  11.  
  12. void bar (string msg) {
  13.     cout << " In bar: " << msg;
  14. }
  15.  
  16. int main()
  17. {
  18.     // Constructs the new thread and runs it. Does not block execution.
  19.     thread t1(foo, "Hello from Task 1");
  20.     thread t2(bar, "Hello from Task 2");
  21.  
  22.     // Makes the main thread wait for the new thread to finish execution,
  23.     // blocking the main thread until join returns
  24.     t1.join();
  25.     t2.join();
  26. }

In the simple examples, the threads are not using any local or shared variables. Next let's try having the threads modify the same variable.

  1. #include <iostream>
  2. #include <thread>
  3. #include <memory>
  4. #include <vector>
  5. #include <mutex>    // mutex
  6.  
  7. class MultiCounter {
  8. public:
  9.      MultiCounter() : sum(0) {}
  10.      void computeSum(int *a, int n);
  11.      int  getSum() { return sum; }
  12. private:
  13.      int64_t sum;   // the shared variable
  14. };
  15.  
  16. void
  17. MultiCounter::computeSum(int* a, int n) {
  18.  
  19.      // Make sure only one thread at a time can increment counter
  20.      for (int i = 0; i < n; ++i) {
  21.         sum += a[i];
  22.      }
  23. }
  24.  
  25. int main()
  26. {
  27.     MultiCounter counter;
  28.     const int n = 1000000;
  29.  
  30.     int a[n];
  31.     for (int i = 0; i < n; i++) a[i] = 1;
  32.  
  33.     // Create a vector of threads, each of which executes 'increment'
  34.     std::vector<std::unique_ptr<std::thread> > threads;
  35.     for (int i = 0; i < 10; i++)  
  36.         threads.push_back(std::unique_ptr<std::thread>
  37.               (new std::thread(&MultiCounter::computeSum, &counter, a, n)));
  38.  
  39.     for (int i = 0; i < 10; i++)  
  40.         threads[i]->join();    // wait for each thread to finish
  41.  
  42.     std::cout << "Final result: " << counter.getSum() << std::endl;
  43. }

The answer should be the number of threads * n, or 10,000,000. When we run this code, however, it will produce a different (usually wrong) answer every time.

0 ix-saucy 22:06> ./threads2
Final result: 7033014
0 ix-saucy 22:06> ./threads2
Final result: 2904314
0 ix-saucy 22:06> ./threads2
Final result: 2759084
0 ix-saucy 22:06> ./threads2
Final result: 3335995

Why is that? When multiple threads are simultaneously modifying the same memory location we have a so-called race condition, and the result is unpredictable because the current value for sum in one thread is unlikely to be the same as that of another.

How do we fix that? We need to "protect" the memory while a thread is writing to it. There are multiple mechanisms for doing this, here we consider locking by using a mutex variable, which is a special kind of variable that can be used to make sure that whatever instructions are between the calls to its lock() and unlock() methods gets executed by only one thread at a time (other approaches include semaphores and memory barriers). As a result, sum can only be modified by a single thread at a time.

  1. // File: threads2.cpp
  2. #include <iostream>
  3. #include <thread>
  4. #include <memory>
  5. #include <vector>
  6. #include <mutex>    // mutex
  7.  
  8. class MultiCounter {
  9. public:
  10.      MultiCounter() : sum(0) {}
  11.      void computeSum(int *a, int n);
  12.      int  getSum() { return sum; }
  13. private:
  14.      std::mutex sum_mutex;   // a mutex to control access to the shared variable
  15.      int64_t sum;   // the shared variable
  16. };
  17.  
  18. void
  19. MultiCounter::computeSum(int* a, int n) {
  20.  
  21.      // Make sure only one thread at a time can modify sum
  22.      sum_mutex.lock();
  23.      for (int i = 0; i < n; ++i) {
  24.         sum += a[i];
  25.      }
  26.      sum_mutex.unlock();
  27.      //std::cout << "Thread " << std::this_thread::get_id() << ": counter=" << value << std::endl;
  28. }
  29.  
  30. int main()
  31. {
  32.     MultiCounter counter;
  33.     const int n = 1000000;
  34.  
  35.     int a[n];
  36.     for (int i = 0; i < n; i++) a[i] = 1;
  37.  
  38.     // Create a vector of threads, each of which executes 'computeSum'
  39.     std::vector<std::unique_ptr<std::thread> > threads;
  40.     for (int i = 0; i < 10; i++)  
  41.         threads.push_back(std::unique_ptr<std::thread>
  42.               (new std::thread(&MultiCounter::computeSum, &counter, a, n)));
  43.  
  44.     for (int i = 0; i < 10; i++)  
  45.         threads[i]->join();    // wait for each thread to finish
  46.  
  47.     std::cout << "Final result: " << counter.getSum() << std::endl;
  48. }

Output:

0 ix-saucy 22:07> ./threads2
Final result: 10000000
0 ix-saucy 22:07> ./threads2
Final result: 10000000
0 ix-saucy 22:07> ./threads2
Final result: 10000000
0 ix-saucy 22:07> ./threads2
Final result: 10000000

Because almost all the work is happening inside the mutex region, this is actually pretty inefficient. We can improve it by changing the computeSum function slightly (note that you likely won't see much of a difference for this toy example, but it can make a difference for more complicated computations).

  1. void
  2. MultiCounter::computeSum(int* a, int n) {
  3.      // do work privately without worrying about other threads
  4.      int64_t mysum = 0;
  5.      for (int i = 0; i < n; ++i)
  6.         mysum += a[i];
  7.  
  8.      // Make sure only one thread at a time can update sum
  9.      sum_mutex.lock();
  10.      sum += mysum;
  11.      sum_mutex.unlock();
  12. }

The above examples are all synchronous, i.e., the join function is issued for each thread in order. So if the first thread happens to take longer to finish its work, all the others must wait even if they are done with theirs. To avoid this, you can use asynchronous threads by modifying the main program as follows (complete example in async.cpp):

  1. int main()
  2. {
  3.     MultiCounter counter;
  4.     const int n = 1000000;
  5.  
  6.     int a[n];
  7.     for (int i = 0; i < n; i++) a[i] = 1;
  8.  
  9.     auto start = std::chrono::steady_clock::now(); // timer
  10.  
  11.     // Create a vector of threads, each of which executes 'computeSum'
  12.     std::vector<std::future<void> > futures;  // for use with async
  13.     for (int i = 0; i < 10; i++)  
  14.         futures.push_back(std::async (std::launch::async,
  15.               &MultiCounter::computeSum, &counter, a, n));
  16.  
  17.     for (auto &sumFuture : futures) sumFuture.get();   // finish with all mapper threads
  18.  
  19.     auto elapsed = std::chrono::steady_clock::now() - start; // timer
  20.     std::cout << "Final result: " << counter.getSum() << std::endl;
  21.     std::cout << "Elapsed time: " << std::chrono::duration <double, std::milli> (elapsed).count() << std::endl;
  22. }

Green Marinee theme adapted by David Gilbert, powered by PmWiki