Parallel programming in standard C++
Reading: Introduction to Parallel Computing; optional C pthreads Tutorial
A simple program with multiple threads (lecture note Bitbucket repo -- week10). First, using C and the pthreads library, and then the same functionality, but using only standard C++.
C version:
- // file: threads.c
- #include<stdio.h>
- #include<pthread.h>
- void* foo (void* msg) {
- printf(" In foo: %s\n", (const char *) msg);
- }
- void* bar (void* msg) {
- printf(" In bar: %s\n", (const char *) msg);
- }
- void main()
- {
- pthread_t t1,t2;
- pthread_create(&t1, NULL, foo, "Hello from Task 1");
- pthread_create(&t2, NULL, bar, "Hello from Task 2");
- pthread_join(t1, NULL);
- pthread_join(t2, NULL);
- }
The equivalent standard C++ version:
- // file: threads1.cpp
- #include <string>
- #include <iostream>
- #include <thread>
- using namespace std;
- void foo (string msg) {
- cout << " In foo: " << msg;
- }
- void bar (string msg) {
- cout << " In bar: " << msg;
- }
- int main()
- {
- // Constructs the new thread and runs it. Does not block execution.
- thread t1(foo, "Hello from Task 1");
- thread t2(bar, "Hello from Task 2");
- // Makes the main thread wait for the new thread to finish execution,
- // blocking the main thread until join returns
- t1.join();
- t2.join();
- }
In the simple examples, the threads are not using any local or shared variables. Next let's try having the threads modify the same variable.
- #include <iostream>
- #include <thread>
- #include <memory>
- #include <vector>
- #include <mutex> // mutex
- class MultiCounter {
- public:
- MultiCounter() : sum(0) {}
- void computeSum(int *a, int n);
- int getSum() { return sum; }
- private:
- int64_t sum; // the shared variable
- };
- void
- MultiCounter::computeSum(int* a, int n) {
- // Make sure only one thread at a time can increment counter
- for (int i = 0; i < n; ++i) {
- sum += a[i];
- }
- }
- int main()
- {
- MultiCounter counter;
- const int n = 1000000;
- int a[n];
- for (int i = 0; i < n; i++) a[i] = 1;
- // Create a vector of threads, each of which executes 'increment'
- std::vector<std::unique_ptr<std::thread> > threads;
- for (int i = 0; i < 10; i++)
- threads.push_back(std::unique_ptr<std::thread>
- (new std::thread(&MultiCounter::computeSum, &counter, a, n)));
- for (int i = 0; i < 10; i++)
- threads[i]->join(); // wait for each thread to finish
- std::cout << "Final result: " << counter.getSum() << std::endl;
- }
The answer should be the number of threads * n, or 10,000,000. When we run this code, however, it will produce a different (usually wrong) answer every time.
0 ix-saucy 22:06> ./threads2 Final result: 7033014 0 ix-saucy 22:06> ./threads2 Final result: 2904314 0 ix-saucy 22:06> ./threads2 Final result: 2759084 0 ix-saucy 22:06> ./threads2 Final result: 3335995
Why is that? When multiple threads are simultaneously modifying the same memory location we have a so-called race condition, and the result is unpredictable because the current value for sum
in one thread is unlikely to be the same as that of another.
How do we fix that? We need to "protect" the memory while a thread is writing to it. There are multiple mechanisms for doing this, here we consider locking by using a mutex variable, which is a special kind of variable that can be used to make sure that whatever instructions are between the calls to its lock()
and unlock()
methods gets executed by only one thread at a time (other approaches include semaphores and memory barriers). As a result, sum can only be modified by a single thread at a time.
- // File: threads2.cpp
- #include <iostream>
- #include <thread>
- #include <memory>
- #include <vector>
- #include <mutex> // mutex
- class MultiCounter {
- public:
- MultiCounter() : sum(0) {}
- void computeSum(int *a, int n);
- int getSum() { return sum; }
- private:
- std::mutex sum_mutex; // a mutex to control access to the shared variable
- int64_t sum; // the shared variable
- };
- void
- MultiCounter::computeSum(int* a, int n) {
- // Make sure only one thread at a time can modify sum
- sum_mutex.lock();
- for (int i = 0; i < n; ++i) {
- sum += a[i];
- }
- sum_mutex.unlock();
- //std::cout << "Thread " << std::this_thread::get_id() << ": counter=" << value << std::endl;
- }
- int main()
- {
- MultiCounter counter;
- const int n = 1000000;
- int a[n];
- for (int i = 0; i < n; i++) a[i] = 1;
- // Create a vector of threads, each of which executes 'computeSum'
- std::vector<std::unique_ptr<std::thread> > threads;
- for (int i = 0; i < 10; i++)
- threads.push_back(std::unique_ptr<std::thread>
- (new std::thread(&MultiCounter::computeSum, &counter, a, n)));
- for (int i = 0; i < 10; i++)
- threads[i]->join(); // wait for each thread to finish
- std::cout << "Final result: " << counter.getSum() << std::endl;
- }
Output:
0 ix-saucy 22:07> ./threads2 Final result: 10000000 0 ix-saucy 22:07> ./threads2 Final result: 10000000 0 ix-saucy 22:07> ./threads2 Final result: 10000000 0 ix-saucy 22:07> ./threads2 Final result: 10000000
Because almost all the work is happening inside the mutex region, this is actually pretty inefficient. We can improve it by changing the computeSum
function slightly (note that you likely won't see much of a difference for this toy example, but it can make a difference for more complicated computations).
- void
- MultiCounter::computeSum(int* a, int n) {
- // do work privately without worrying about other threads
- int64_t mysum = 0;
- for (int i = 0; i < n; ++i)
- mysum += a[i];
- // Make sure only one thread at a time can update sum
- sum_mutex.lock();
- sum += mysum;
- sum_mutex.unlock();
- }
The above examples are all synchronous, i.e., the join
function is issued for each thread in order. So if the first thread happens to take longer to finish its work, all the others must wait even if they are done with theirs. To avoid this, you can use asynchronous threads by modifying the main
program as follows (complete example in async.cpp):
- int main()
- {
- MultiCounter counter;
- const int n = 1000000;
- int a[n];
- for (int i = 0; i < n; i++) a[i] = 1;
- auto start = std::chrono::steady_clock::now(); // timer
- // Create a vector of threads, each of which executes 'computeSum'
- std::vector<std::future<void> > futures; // for use with async
- for (int i = 0; i < 10; i++)
- futures.push_back(std::async (std::launch::async,
- &MultiCounter::computeSum, &counter, a, n));
- for (auto &sumFuture : futures) sumFuture.get(); // finish with all mapper threads
- auto elapsed = std::chrono::steady_clock::now() - start; // timer
- std::cout << "Final result: " << counter.getSum() << std::endl;
- std::cout << "Elapsed time: " << std::chrono::duration <double, std::milli> (elapsed).count() << std::endl;
- }