Multithreading in C++


Multithreading is just one damn thing after, before, or simultaneous
with another.
--Andrei Alexandrescu



//
// Problems with the current C++ memory model
//
//
//


int X = 0;
int Y = 0;

// thread 1                          // thread 2
int r1 = X;                          int r2 = Y;
if ( 1 == r1 )                       if ( 1 == r2 )
    Y = 1;                               X = 1;

// can it be at the end of execution  r1 == r2 == 1 ?

// 
// branch prediction
//


struct s { char a; char b; } x;

// thread 1                          // thread 2
x.a = 1;                             x.b = 1;

// thread 1 may compiled:            // thread 2 may be compiled:
struct s tmp = x;                    struct s tmp = x;
tmp.a = 1;                           tmp.b = 1;
x = tmp;                             x = tmp;




Beyond the errors which can occur in single-threaded programs,
multithreaded environments are subject to additional errors:

- Race conditions
- Deadlock
- Priority failures (priority inversion, starvation, etc...)

Moreover testing and debugging of multithreaded programs are harder.
Multithreaded programs are non-deterministic. Failures are often
non-repeatable. Debugged code can produce very different results then
non-debugged ones. Testing on single processor hardware may produce
different results than testing on multiprocessor hardware.

In this section we show how easy to make mistakes when we are not
carefull enough. The text-book example is the double-checked lockig
pattern, invented by Douglas C. Schmidt in the Washington University,
St. Luis, and the idea has been appeared in a chapter of the book
Pattern Languages of Program Design 3. published by Addison-Wesley, 1997.


Singleton Pattern

Singleton is a well-known Design Pattern, which ensures that a class
has only one instance and provides a global point of access to that
instance. For many reasons, dynamically allocated Singletons are very
popular in C++ programs. (One reason: the evaluation order of static
data declarations is not defined over multiple compilation units, one
other: lazy instantiation: if there is no request for the Singleton
object, we can spare the initialization cost.)

The canonical implementation of the Singleton pattern is the following:

    // in singleton.h:

    class Singleton
    {
    public:
        static Singleton *instance();

        void other_method();
        // other methods ...
    private:
        static Singleton *pinstance;
    };


    // in singleton.cpp:

    Singleton *Singleton::pinstance = 0;

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            pinstance = new Singleton;  // lazy initialization
        }

        return pinstance;
    }


And the user simply call other methods on the instance() member function:

    // in client.cpp:

    Singleton::istance()-> other_method();


Thread Safety

It is clear that the canonical implementation of Singleton is not thread
safe. Let consider the following race condition: Thread A and B try to use
Singleton simultaneously on a preemptive parallel environment before its
first initialization.

First thread A evaluates the if condition and consider its value as a
null pointer. Then this thread is suspended.

Now thread B evaluates the condition, reads the null pointer and goes on
to create the instance. The Singleton has been created.

Thread A gets the controll again. It continues to create the Singleton.
Now a second instance has been created.

A critical section is a sequence of instructions that obeys the following
invariant: only one thread/process must execute at a time. The
creation/initialization of the Singleton pattern is a critical section.

The common way to implement a critical section is to add a Lock, like a
Mutex. With a static Mutex, make the Singleton pattern thread safe is easy.

    // in singleton.h:

    class Singleton
    {
    public:
        static Singleton *instance();

        void other_method();
        // other methods ...
    private:
        static Singleton *pinstance;
        static Mutex      lock_;
    };


    // in singleton.cpp:

    Singleton *Singleton::pinstance = 0;

    Singleton *Singleton::instance()
    {
        Guard<Mutex> guard(lock_);  // constructor acquires lock_

        // this is now the critical section        

        if ( 0 == pinstance )
        {
            pinstance = new Singleton;  // lazy initialization
        }

        return pinstance;
    }                               // destructor releases lock_


The ACE::Guard class can use different kind of lock strategies via the
template parameter. Guard uses the RAII (Resource Allocation Is
Initialization) principle: the constructor requires the lock, and the
destructor releases it.


Double-Checked Locking Pattern

However the previous solution now works correctly, it is far from ideal.
The problem is, that although the problematic race condition is connected
only to the initialization of the Singleton instance, the critical section
executed for every calls of the instance() method. All access to the instance
acquire and release the lock. Such an excessive usage of the locking mechanism
may cause serious overhead which could not be acceptable.

One obviously wrong optimalization attempt would be to move the critical
section inside the conditional check of pinstance:

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            Guard<Mutex> guard(lock_);  // constructor acquires lock_

            // this is now the critical section        

            pinstance = new Singleton;  // lazy initialization
        }                               // destructor releases lock_

        return pinstance;
    }


There is still a race-condition, when each threads read pinstance as null
pointer. Then one thread is able to require the lock, the other has been
blocked. The first thread initializes the instance and then releases the
lock. Now the second thread continous the execution after the condition
check and re-initialize the Singleton.

The solution proposed by Schmidt uses a second check to ensure thread safety.

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            Guard<Mutex> guard(lock_);  // constructor acquires lock_

            // this is now the critical section        

            if ( 0 == pinstance )   // re-check pinstance
            {
                pinstance = new Singleton;  // lazy initialization
            }
        }                               // destructor releases lock_

        return pinstance;
    }


Unneccessary locking is avoided by wrapping the call to new with another
conditional test. When in the previous (bad) scenario, the blocked thread
wakes up, it re-checks the state of the pointer, and finds that it is not
null anymore. The solution minimalizes the lock attempts and still ensures
correctness.

But...


The Problem with DCLP

At the end of its article Schmidt mention a certain disadvantage of the
pattern: There is a subtle portability issue that can lead to pernicious
bugs if the pattern is used in software that have non-atomic pointer or
integral assignment semantics. ... A related problem can occur if an
overly-agressive compiler optimizes (pinstance) by caching it in some
way (e.g. storing it in a register) or by removing the second check of
0 == pinstance.

Consider this part of the DLCP implementation:

    pinstance = new Singleton;


This does three things:

(1) Allocate memory (via operator new) to hold the Singleton object.

(2) Construct the Singleton object in the memory.

(3) Assign to pinstance the address off the memory.

But they need not to be done in this order! For example, the compiler is
free to generate the following code:

    pinstance =                             // (3)
        operator new( sizeof(Singleton) );  // (1)
    new (pinstance) Singleton;              // (2)


If this code is generated, the order is 1, 3, 2 .

In the context:

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            Guard<Mutex> guard(lock_);

            // this is now the critical section        

            if ( 0 == pinstance )   // re-check pinstance
            {
                // pinstance = new Singleton;
                pinstance =                             // (3)
                    operator new( sizeof(Singleton) );  // (1)
                new (pinstance) Singleton;              // (2)
            }
        }

        return pinstance;
    }


Consider: thread A executes (1) and (3) and is suspended. Now pinstance
is not null, however the instance has not been constructed. Thread B
checks the outmost condition, sees pinstance not null, and continues to
return the pointer to the non-initialized Singleton object.

The fundamental problem: we need a way to specify instruction ordering
in C++. The sad news is that there is no way for that in C++ (or in C)!
Sequence Points and Observable Behavior

The C++ standard defines so called sequence points. There is a sequence
point at the end of each statement, i.e., "at the semicolon". Also there
is a number of other cases for sequence points, like the shortcut-behaviour
logical operators, the conditional operator, the comma operator and at the
function calls.

When a sequence point is reached, all side effects of previous evaluations
shall be complete and no side effects of subsequent evaluations shall have
taken place.

This suggest that perhaps a careful statement ordering could help to control
the order of the generated instructions. It does not. The compiler may reorder
instruction as long as they preserve the observable behavior of the C++
abstract machine.


Observable behavior consist only of

- Reads and writes of volatile data.
- Calls to library I/O functions.

    void foo()
    {
        int x = 0, y = 0;       // (1)
        x = 5;                  // (2) 
        y = 10;                 // (3)
        printf( "%d,%d", x, y); // (4)
    }


In the previous example (1)-(3) may be optimized away, because they have
no observable behavior. If the are executed, (1) must preceed (2)-(4) and
(4) must follow (1)-(3). We know nothing about the relative execution order
of (2) and (3). Either may come first orthey might run simultaneously.
Sequence points offer no control over the execution of statements.


            if ( 0 == pinstance )   // re-check pinstance
            {
                // pinstance = new Singleton;
                Singleton *temp =
                    operator new( sizeof(Singleton) );
                new (temp) Singleton;
                pinstance = temp;
            }


Therefore the attempt above is not a solution for the problem. The compiler
will likely optimize out the temp variable. Also other "tricks" will likely
fail: declaring temp as extern , and defining in an other compilation unit.
Declaring the Singleton constructor in a different unit to avoid inlining or
assume that it may throw an exception, etc...


The compiler has a huge motivation for such optimizations:

- Execute operations in parallel where the hardware allows it.
- To avoid spilling data from register.
- To perform common subexpression elimination.
- To keep the instruction pipeline full.
- To reduce the size of the generated executable.


Volatile

Douglas Schmidt proposed volatile to prevent "agressive optimalizations"
in his article. Volatile prevents some compiler optimalizations.

The order of reads/writes of volatile data must be preserved.

Volatile memory may change outside of program control, i.e. some hardware
action, therefore "redundant" reads/writes in source code must not be
eliminated.


This is the C++ meaning of volatile. In Java and in C# volatile is different.
They do add acquire/release semantics to volatile.

Volatile would seem to make the temporary variable strategy viable:

    // in singleton.h:

    class Singleton
    {
    public:
        static Singleton *instance();

        void other_method();
        // other methods ...
    private:
        static volatile Singleton *pinstance;
        static          Mutex      lock_;

        // for example:
        int x;
        Singleton() : x(1)  {}      // test constructor
    };


    // in singleton.cpp:

    Singleton *Singleton::pinstance = 0;

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            Guard<Mutex> guard(lock_);

            // this is now the critical section        

            if ( 0 == pinstance )   // re-check pinstance
            {
                // these lines can't be reordered
                volatile Singleton *temp = new Singleton;   (1)
                pinstance = temp;                           (2)
            }
        }

        return pinstance;
    }


Indeed, the lines (1) and (2) must not be reordered. This is the code
generated after inlining the constructor:

    Singleton *Singleton::instance()
    {
        if ( 0 == pinstance )
        {
            Guard<Mutex> guard(lock_);

            // this is now the critical section        

            if ( 0 == pinstance )   // re-check pinstance
            {
                // these lines can't be reordered
                volatile Singleton *temp = static_cast<Singleton *>
                            (operator new(sizeof(Singleton)));       //  (1)
                temp -> x = 1;                                      //  (2)
                pinstance = temp;                                   //  (3)
            }
        }

        return pinstance;
    }


where (2) is the constructor inlined. Though temp is volatile, *temp is
not, so instructions can be reordered.

            if ( 0 == pinstance )   // re-check pinstance
            {
                // these lines can't be reordered ???
                volatile Singleton *temp = static_cast<Singleton *>
                            (operator new(sizeof(Singleton)));      //  (1)
                pinstance = temp;                                   //  (3)
                temp -> x = 1;                                      //  (2)
            }


And here, pinstance again points to an uninitialized memory.

The main problem is that constraints on the observable behavior apply only
to C++ abstract machine. But the abstract machine was defined as implicitelly
single threaded.


Memory Model

Modern hardware architecture with multiprocessors looks in the following:

             ______________________________________
            |                                      |
            |                                      |
            |                memory                |
            |                                      |
            |______________________________________|
                |       |       |       |       |
                |       |       |       |       |
             cache1  cache2  cache3  cache4  cache5
                |       |       |       |       |
              proc1   proc2   proc3   proc4   proc5



The value of a shared memory location may appear in more then one cache.
They might different values in the different caches. Hardware regularly
takes care of this situations. Different contents of the cache may be
flushed in a different order as they were written. Example: processor A
modifies x variable then modifies y variable. When the cache is flushed,
y is flushed before x. Therefore an other processor may see y value change
before x value change.

Out of order write visibility can cause DCLP to fail. Remember, that
pInstance = new Singleton; does the following:

(1) Allocate memory (via operator new) to hold the Singleton object.
(2) Construct the Singleton object in the memory.
(3) Assign to pinstance the address off the memory.

Now suppose, processor A does these steps in this order. Processor B sees
the result of (3) before the result of (2), therefore goes on to use the
Singleton object as it would be initialized. (It is, but the cache was not	written out yet.)

C++ offers no guaranties that how different threads see the phenomenon above.
Some libraries, like POSIX offers some guaranties.

Java memory model was revised to offer guaranties. Download the latest
documentation from http://www.cs.umd.edu/~pugh/java/memoryModel/jsr133.pdf.

The visibility problems described above can be prevented via the use of
memory barriers. Barriers are instructions that constraint the reads and
writes to allow readers and writers to develop some consistent strategy
on memory usage.

Readers use an acquire barrier to prevent subsequent source code memory
accesses moving before the barrier.

Writers use a release barrier to prevent prior source code memory access
moving after the barrier.

Then one can use a protocol of release/acquire. The release changes the
state of memory and the acquire guaranties that the reader see the new
state. Compilers may not reorder reads and writes that violates memory
barrier semantics. Runtime systems and hardware may also not violate that.

    Singleton *Singleton::instance()
    {
        Singleton *temp = pInstance;    // read pInstance

        Perform acquire;
        // prevent visibility of later memory operations
        // from moving up from this point

        if ( 0 == temp )
        {
            Guard<Mutex> guard(lock_);

            // this is now the critical section        

            if ( 0 == pinstance )   // re-check pinstance
            {
                temp = new Singleton;

                Perform release;
                // prevent visibility of earlier memory operations
                // from moving down from this point

                pinstance = temp;   // write pInstance
            }
        }

        return pinstance;
    }


Problem: memory barriers are not implemented on every hardware so
the code above is not portable.


Rule for Avoiding Memory Visibility Problems

Shared data should be accessed only if one of the following is true:
- The access is inside a critical section.

For communication via message passing, these protocols are followed:

(1) Read ready flag; perform acquire; read message.
(2) Write message; perform release; write ready flag.

DCLP effectively uses message passing:

pInstance is the ready flag: if it is not null, then "ready".
pInstance itself is the message.


    Singleton *Singleton::instance()
    {
        Singleton *temp = pInstance;    // read flag

        Perform acquire;                // acquire

        if ( 0 == temp )
        {
            Guard<Mutex> guard(lock_);

            // this is now the critical section        

            if ( 0 == pinstance )
            {
                temp = new Singleton;   // write message

                Perform release;        // release

                pinstance = temp;       // write flag
            }
        }

        return pinstance;
    }


This is the correct messaging protocol.
Locks as Barriers

It is critical, that guard is initialized before the second if statement.
Why the following is never happened:

        if ( 0 == temp )
        {
            if ( 0 == pinstance )
            {
                // move lock inside the second conditional statement
                Guard<Mutex> guard(lock_);

                temp = new Singleton;   // write message



Threading libraries ensure that lock acquisition includes the moral
equivalent of an acquire, and lock release includes the equivalent
of a release.

    Singleton *Singleton::instance()
    {
        Singleton *temp = pInstance;    // read flag
        Perform acquire;                // acquire

        if ( 0 == temp )
        {
            Guard<Mutex> guard(lock_);
            Perform acquire;            // constructor of lock

            // this is now the critical section        

            if ( 0 == pinstance )
            {
                temp = new Singleton;   // write message

                Perform release;        // release
                pinstance = temp;       // write flag
            }
            Perform release;            // destructor of lock
        }

        return pinstance;
    }


Thus locks provide us a portable way to insert memory barriers.

//==================================================================
//
//   C++11
//
//==================================================================


Atomics & Sequential consistency in C++11



int x, y;

// thread 1      |       // thread 2                  
x = 1;           |       cout << y << ", ";
y = 2;           |       cout << x << endl;

// In C++03  not even Undefined Behavior

// In C++11  Undefined Behavior




int x, y;
mutex x_mutex, y_mutex;

// thread 1      |       // thread 2                  
x_mutex.lock()   |       y_mutex.lock();
x = 1;           |       cout << y << ", ";
x_mutex.unlock() |       y_mutex.unlock();
y_mutex.lock()   |       x_mutex.lock();
y = 2;           |       cout << x << endl;
y_mutex.unlock() |       x_mutex.unlock();





std::atomic<int> x, y;

// thread 1      |       // thread 2
x.store(1);      |       cout << y.load() << ", ";
y.store(2);      |       cout << x.load() << endl;

// In C++11 Defined and the result can be: 
0 0
2 1
0 1
// never prints: 2 0

store()/load()  ----> sequential consistency

atomics --> atomic load()/store() + ordering



Full ordering can be expensive in modern hardware,
so we might want to relax these rules



Memory ordering options:




memory_order_seq_cst  ____ sequentially consistent (default)

memory_order_consume  _
memory_order_acquire   |__ acquire-release
memory_order_release   |
memory_order_acq_rel  _|

memory_order_relaxed  ____ relaxed ordering



For example x86/x86_64 architectures does not require additional instructions
to implement acquire-release ordering.




Sequential consistent ordering
------------------------------


Each individual thread is sequential, operations can't be reordered.
A seq_cst store is sincronized with a seq_cst load of the same variable.








std::atomic<int> x, y;

// thread 1      |       // thread 2
x.store(1, memory_order_relaxed); |cout << y.load(memory_order_relaxed) << ", ";
y.store(2, memory_order_relaxed); |cout << x.load(memory_order_relaxed) << endl;

// Defined, atomic but not ordered, result may be:
0 0
2 1
0 1
2 0

atomics with relaxed mode --> atomic load()/store()




std::atomic<int> x, y;

// thread 1      |       // thread 2
x.store(1, memory_order_release); |cout << y.load(memory_order_acquire) << ", ";
y.store(2, memory_order_release); |cout << x.load(memory_order_acquire) << endl;

// In C++11 Defined and the result can be: 
0 0
2 1
0 1
// never prints: 2 0, but can be faster than strict ordering.
// results may be different in more complex programs




//
// C++11 memory model
// from http://meetingcpp.com/tl_files/mcpp/slides/12/mutz-cxx11-mm.pdf
//


Only minimal progress guaranties are given:

1. unblocked threads will make progress
2. implementation should ensure that writes in a thread
   should be visible in other threads "in a finite amount of time".



The A happens before B relationship:

1. A is sequenced before B  or
2. A inter-thread happens before B
             == there is a syncronization point between A and B



Syncronization point:

1. thread creation sync with start of thread execution
2. thread completion sync with the return of join()
3. unlocking a mutex sync with the next locking of that mutex



Data race:
   A program containes a "data race"
   if contains two actions in different threads, at least one is not "atomic"
   and neithr happens before the other.



Memory location:
1. an object of scalar type  or
2. a maximal sequence of adjacent bit-fields all having non-zero width




Two threads of execution can update and access separate memory locations
without interferuing each others:


struct { char a; char b; } x;

// thread 1          // thread 2
x.a = 1;             x.b = 1;


// problems under posix threads
// no data race in C++11






std::thread
===========




//
// In C++11, threads are part of the language.
//

#include <iostream>
#include <thread>



namespace std
{


class thread
{
public:
  typedef native_handle ...;
  typedef id ...;

  thread() noexept;                // does not represent a thread
  thread( thread&& other) noexept; // move constructor
  ~thread();                       // if joinable() calls std::terminate()

  template <typename Function, typename... Args> // copies args to thread local
  explicit thread( Function&& f, Arg&&... args); // then execute f with args

  thread(const thread&) = delete;  // no copy
  thread& operator=(thread&& other) noexept;  // move
  void swap( thread& other);  // swap 

  bool joinable() const; // thread object owns a physical thread 
  void join();           // blocks current thread until *this finish
  void detach();         // separates physical thread from the thread object

  std::thread::id get_id() const; // std::this_thread
  static unsigned int hardware_concurrency();  // supported concurrent threads
  native_handle_type native_handle();          // e.g. thread id
};

}





//
// Typesafe parameter passing to the thread
//

void f( int i, const std::string& s);

//
// Creates a new thread of execution with t, which calls
// f(3,"hello"), where arguments are copied (as is) into
// an internal storage (even if the function takes them
// as reference)

// f is the initial function

// if an exception occurs, it will thrown in the hosting thread, 
// not in the newly created thread


std::thread t(f,3,"Hello");

// f can be any callable function


class f  // f will be copied to the new thread
{
public:
    f( int i, std::string s) : _i(i), _s(s) {}
    void operator() const
    {
       // background activity
    }
    int _i;
    std::string _s;
};


std::thread t( f() );   // MOST VAXING PARSE 



std::thread t(( f(3,"Hello") )); // ok

std::thread t{  f(3,"Hello") };  // ok



std::thread t( [] { /* background activity */ } );


// f is not a memberfunction:         f(t1, t2, ... )
// f is pointer to member function of T and t1 is T: (t1.*f)(t2, ... )
// f                                and t1 is not T: (*t1).*f(t2, ... ) 




void f( int i, const std::string&);
{
    std::cout << "Hello concurrent world" << std::endl;
}

int main()
{
    int i = 3;
    std::string s("Hello");

    // Will copy both i and s
    // We can prevent the copy by using reference wrapper
    // std::thread t( f, std::ref(i), std::ref(s));

    std::thread t( f, i, s);

    // if the thread destructor runs and the thread is joinable, than 
    // std::system_error will be thrown. 
    // Use join() or detach() to avoid that.

    t.join();

    return 0;
}



Possible alternative destructor strategies:
(Scott Meyers: Effective Modern C++)


 - Implicit join: the destructor waits until the thread execution is completed.
   Hard to detect performance issues.

 - Implicit detach: destructor runs, but the underlying thread will still run.
   Destructor may free memory but the thread may try to access them.



Still, there is possible to make wrong code (of course, this is C++).
Better to avoid pointers and references, or join().



struct func
{
    int& i;
    func(int& i_) : i (i_) { }

    void operator()()
    {
        for(unsigned int j=0; j < 1000000; ++j)
        {
            do_something(i);  // i refers to a destroyed variable <-
        }                                                          |
    }                                                              |
};

void oops()
{
    int some_local_state=0;

    func my_func(some_local_state);

    std::thread my_thread(my_func);

    my_thread.detach();  // don't wait the thread to finish    ____|
}  // i is destroyed, but the thread is likely still running.. 




void f( int i, const std::string& s);

std::thread t( f, 3, "Hello");



"Hello" is passed to f as const cast *
and converted to std::string in the new thread!



This can be a problem:


void f( int i, std::string const& s);
void oops( int some_param)
{
    char buffer[1024];
    sprintf(buffer, "%i", some_param);
    std::thread t(f,3,buffer);
    t.detach();
}


void f( int i, std::string const& s);
void not_oops( int some_param)
{
    char buffer[1024];
    sprintf(buffer,"%i",some_param);
    std::thread t(f,3,std::string(buffer));
    t.detach();
}




It is possible to create a RAII thread.




class scoped_thread 	// Anthony Williams
{
    std::thread t;
public:
    explicit scoped_thread(std::thread t_): t(std::move(t_))
    {
        if(!t.joinable())
            throw std::logic_error(“No thread”);
    }
    ~scoped_thread()
    {
        t.join();
    }
    scoped_thread(scoped_thread const&)=delete;
    scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func;

void f()
{
    int some_local_state;
    scoped_thread t(std::thread(func(some_local_state)));
    do_something_in_current_thread();
}



//
// sth::thread is ok to use with containers
//

void do_work(unsigned id);

void f()
{
    std::vector<std::thread> threads;
    for(unsigned i=0;i<20;++i)
    {
        threads.push_back(std::thread(do_work,i));
    }
    std::for_each(threads.begin(),threads.end(),
                 std::mem_fn(&std::thread::join));
}



std::thread::hardware_concurency() gives a hint about the available cores.
(Be aware of "oversubscription", i.e. using more threads than cores we have.)



std::thread::id identifiers returned by std::this_thread::get_id();
It returns std::thread::id() if there is no associated thread.

Operators on std::thread::id:

== != <
std::hash<std::thread::id>
std::ostream& <<



std::thread::id master_thread;

void some_core_part_of_algorithm()
{
    if(std::this_thread::get_id()==master_thread)
    {
         do_master_thread_work();
    }
    do_common_work();
}





Syncronization objects
======================


//
// Mutex
//

#include <mutex>

void f()
{
    std::mutex m;
    int sh;	// shared data
    // ...
    m.lock();
    // manipulate shared data:
    sh+=1;
    m.unlock();
}


//
// Recursive mutex
//

std::recursive_mutex m;
int sh;	// shared data
// ...
void f(int i)
{
// ...
    m.lock();
    // manipulate shared data:
    sh+=1;
    if (--i>0) f(i);
    m.unlock();
    // ...
}


//
// Timed mutex
//

void f()
{
    std::timed_mutex m;
    int sh;	// shared data
    // ...
    if (m.try_lock_for(std::chrono::seconds(10)))
    {
	// manipulate shared data:
 	sh+=1;
	m.unlock();
    }
    else
    {
	// we didn't get the mutex; do something else
    }
}

void f()
{
    std::timed_mutex m;
    int sh;	// shared data
    // ...
    if (m.try_lock_until(midnight))
    {
	// manipulate shared data:
	sh+=1;
	m.unlock();
    }
    else
    {
	// we didn't get the mutex; do something else
    }
}


//
// Locks -- support RAII
//

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> l;
std::mutex     m;

void add_to_list(int value);
{
    // lock acquired - with RAII style lock management
    std::lock_guard< std::mutex > guard(m);
    l.push_back(value);
}   // lock released





"Pointers or references pointing out from the guarded area may be an issue!"




//
// Deadlock danger
//

friend bool operator<( T const& lhs, X const& rhs)
{
    if ( &lhs == &rhs )
	return false;

    lhs.m.lock();
    rhs.m.lock();

    return lhs.data < rhs.data;
}



// thread1          |           thread2

    a < b           |            b < a




Avoid deadlocks:

  1. Avoid nested locks
  2. Avoi user defined call when holding a lock
  3. Ackquire locks in a fixed order





//
// The correct solution is:
//

friend bool operator<( T const& lhs, X const& rhs)
{
    if ( &lhs == &rhs )
        return false;

    // std::lock - lock two or more mutexes
    std::lock( lhs.m, rhs.m);
    std::lock_guard< std::mutex >  lock_lhs( lhs.m, std::adopt_lock);
    std::lock_guard< std::mutex >  lock_rhs( rhs.m, std::adopt_lock);

    return lhs.data < rhs.data;
}


//
// The unique_lock with defer_locks:
//

friend bool operator<( T const& lhs, X const& rhs)
{
    if ( &lhs == &rhs )
        return false;

    // std::unique_locks constructed with defer_lock can be locked
    // manually, by using lock() on the lock object ...
    std::unique_lock< std::mutex >  lock_lhs( lhs.m, std::defer_lock);
    std::unique_lock< std::mutex >  lock_rhs( rhs.m, std::defer_lock);
    // lock_lhs.owns_lock() now false

    // ... or passing to std::lock
    std::lock( lock_lhs, lock_rhs);  // designed to avoid dead-lock
    // also there is an unlock() memberfunction

    // lock_lhs.owns_lock() now true
    return lhs.data < rhs.data;
}



// unique_lock is movable but not copyable.



std::unique_lock<std::mutex> get_lock()
{
    extern std::mutex some_mutex;
    std::unique_lock<std::mutex> lk(some_mutex);
    prepare_data();
    return lk; // same as std::move(lk), return does not require std::move
}

void process_data()
{
    std::unique_lock<std::mutex> lk(get_lock());
    do_something();
}





//
//


recursive_mutex



//
// Singleton
// naive implementation - bad
//

template <typename T>
class MySingleton
{
public:
    std::shared_ptr<T> instance()
    {
	std::unique_lock<std::mutex> lock(resource_mutex);
        if ( ! resource_ptr )
	    resource_ptr.reset( new T(...) );
	lock.unlock();
	return resource_ptr;
    }
private:
    std::shared_ptr<T> resource_ptr;
    mutable std::mutex resource_mutex;
};


//
// Singleton
// DCLP implementation - still bad
//

template <typename T>
class MySingleton
{
public:
    std::shared_ptr<T> instance()
    {
        if ( ! resource_ptr )  // 1
	{
	    std::unique_lock<std::mutex> lock(resource_mutex);
            if ( ! resource_ptr )
	        resource_ptr.reset( new T(...) );  // 2
	    lock.unlock();
	}
	return resource_ptr;
    }
private:
    std::shared_ptr<T> resource_ptr;
    mutable std::mutex resource_mutex;
};

// load in 1 and store in 2 is not synchronized.



//
// Singleton
// C++11 implementation with call_once
//

template <typename T>
class MySingleton
{
public:
    std::shared_ptr<T> instance()
    {
	std::call_once( resource_init_flag, init_resource);
	return resource_ptr;
    }
private:
    void init_resource()
    {
	resource_ptr.reset( new T(...) );
    }
    std::shared_ptr<T> resource_ptr;
    std::once          resource_init_flag; // can't be moved or copied
};



//
// Meyers singleton:
// C++11 guaranties that this is thread safe:
//
class MySingleton;
MySingleton& MySingletonInstance()
{
    static MySingleton _instance;
    return _instance;
}




// there is no reader-writer mutex 
// use std::lock_guard<boost::shared_mutex>    for write
//     boost::shared_lock<boost::shared_mutex> for reads






Syncronization
==============




//
// Spin lock
//




//
// Waiting for a flag:
//


bool flag;
std::mutex m;

void wait_for_flag()
{
    std::unique_lock<std::mutex> lk(m);
    while(!flag)
    {
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock();
    }
}









//
// Condition variable
//

std::mutex                my_mutex;
std::queue< data_t >      my_queue;
std::conditional_variable data_cond;  // conditional variable

void producer()
{
    while ( more_data_to_produce() )
    {
        const data_t data = produce_data();
	std::lock_guard< std::mutex > prod_lock(my_mutex);  // guard the push
	my_queue.push(data);
	data_cond.notify_one(); // notify the waiting thread to evaluate cond.
    }
}

void consumer()
{
    while ( true )
    {
	std::unique_lock< std::mutex > cons_lock(my_mutex); // not lock_guard
	data_cond.wait(cons_lock,            // returns if lamdba returns true 
           [&my_queue]{return !my_queue.empty();});  // else unlocks and waits 
	data_t data = my_queue.front();  // lock is hold here to protect pop...
	my_queue.pop();
	cons_lock.unlock();   // ... until here
	consume_data(data);
    }
}




During the wait the cndition variable may check the condition any time,
but under the protection of the mutex and returns immediately if cond is true.

Spurious wake: wake up without notification from other thread.
Undefined times and frequency -> better to avoid functions with side effect.
(e.g.using a counter in lambda to check how many notifications were is bad)









Futures
=======



1976 Daniel P. Friedman and David Wise: promise
1977 Henry Baker and Carl Hewitt: future (similar to)



Future is a read-only placeholder view of a variable
Promise is a writable, single assignment container (set the value of future)

Promise --> Future communnication channel


Futures are results of asyncronous function calls. When I execute that
function I won't get the result, but get a "future" which will hold the
result when the function completed.

Later, I can check a future, whether the result is already available.
If it is, I can continue my work.
If not, I can either do other things and later re-check the future,
or I can block and wait for the result.

A future is also capable to store exceptions.



   std::future has been renamed to std::future

      - The only instance to refer the assincron event
      - move only

   std::shared_future

      - Multiple instances may refer to the event
      - copy
      - all instances will be ready on the same time


   is_ready()
   get() - returns rvalue







#include <future>
#include <iostream>

int f(int);
void do_other_stuff();

int main()
{
    std::future<int> the_answer = std::async(f,1);
    do_other_stuff();
    std::cout<< "The answer is " << the_answer.get() << std::endl;
}




The std::async() executes the task either in a new thread or on get()


// starts in a new thread
auto fut1 = std::async(std::launch::async, f, 1);

// run in the same thread on wait() or get()
auto fut2 = std::async(std::launch::deferred, f, 2);

// default: implementation chooses
auto fut3 = std::async(std::launch::deferred | std::launch::async, f, 3);

// default: implementation chooses
auto fut4 = std::async(f, 4);


If no wait() or get() is called, then the task may not be executed at all.



Exceptions:



double square_root(double x)
{
    if ( x < 0 )
    {
        throw std::out_of_range("x<0");
    }
    return sqrt(x);
}

int main()
{
    std::future<double> fut = std::async( square_root, -1);
    double res = fut.get(); // f becomes ready on exception and rethrows
}		 	    // exception object could be a copy of original



Also there are:

    fut.valid()      -- future has a shared state
    fut.wait()       -- wait until result is available
    fut.wait_for()   -- timeout duration
    fut.wait_until() -- wait until specific time point





Packaged task
=============



double x = 4.0;

std::packaged_task<double(double)>   tsk(square_root);

std::future<double> fut = tsk.get_future(); // future will ready 
                                            // when task completes

std::thread t(std::move(tsk), x);   // make sure, task starts immediatelly
				    // on different thread
				    // thread can be joined, detached, scoped

double res = fut.get();    // using the future







Promise
=======


How to implement a packaged_task?
http://stackoverflow.com/questions/11004273/what-is-stdpromise

 Promises: lower-level tool than package_task
 It is a tool for passing the return value (or exception) from the thread
 executing a function to the thread that consumes the result using future.



template <typename> class my_task;

template <typename R, typename ...Args>
class my_task<R(Args...)>
{
    std::function<R(Args...)> fn;
    std::promise<R> pr;
public:
    template <typename ...Ts>
    explicit my_task(Ts&&... ts) : fn(std::forward<Ts>(ts)...) { }

    template <typename ...Ts>
    void operator()(Ts&&... ts)
    {
        pr.set_value(fn(std::forward<Ts>(ts)...));
    }

    std::future<R> get_future() { return pr.get_future(); }

    // disable copy, default move
};


try
{
    my_promise.set_value( square_root(x) )
}
catch(...)
{
    my_promise.set_exception( std::current_exception() );
}



Use of a promise:




void asyncFun( std::promise<int> myPromise)
{
    int result;
    try
    {
        // calculate the result
	myPromise.set_value(result);
    }
    catch ( MyException e )
    {
	myPromise.set_exception(std::copy_exception(e));
    }
}

or

void asyncFun( std::promise<int> myPromise)
{
    int result;
    try
    {
        // calculate the result
	myPromise.set_value(result);
    }
    catch ( ... )
    {
	myPromise.set_exception(std::current_exception());
    }
}



//
// In the calling thread:
//

int main()
{
    std::promise<int> intPromise;
    std::future<int> intFuture = intPromise.getFuture();
    std::thread t(asyncFun, std::move(intPromise));

    // do other stuff here, while asyncFun is working

    int result = intFuture.get();  // may throw MyException
    return 0;
}









 Example from Stroustrup:


void comp(vector<double>& v)
{
// package the tasks:
// (the task here is the standard accumulate() for an array of doubles):
    packaged_task<double(double*,double*,double)>
	                     pt0{std::accumulate<double*,double*,double>};
    packaged_task<double(double*,double*,double)>
	                     pt1{std::accumulate<double*,double*,double>};

    auto f0 = pt0.get_future();	// get hold of the futures
    auto f1 = pt1.get_future();

    pt0(&v[0],&v[v.size()/2],0);	// start the threads
    pt1(&[v.size()/2],&v[size()],0);

    return f0.get()+f1.get();	// get the results
}


//
// Stroustrup: async()
//

template<class T, class V> struct Accum
{	// simple accumulator function object
    T* b;
    T* e;
    V val;
    Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
    V operator() () { return std::accumulate(b,e,val); }
};

void comp(vector<double>& v)
// spawn many tasks if v is large enough
{
    if (v.size()<10000) return std::accumulate(v.begin(),v.end(),0.0);

    auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
    auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
    auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
    auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};

    return f0.get()+f1.get()+f2.get()+f3.get();
}



Coming:


C++17  resumable functions
         async ... wait
       transactional memory
       continuation
         then()
         when_any()
         when_all()
       parallel STL (Intel TBB)?





// resumable functions

std::future<std::ptrdiff_t> tcp_reader(int total)
{
    char buffer[64*1024];
    std::ptrdiff_t result = 0;

    auto conn = await Tcp::Connect("127.0.0.1", 1337); // returns a future

    do
    {
        auto bytesRead = await conn.Read(buf, sizeof(buf));
        total -= bytesRead;
        result += std::count(buf, buf+bytesRead, 'c');
    }
    while ( total > 0 );

    return result;
}





// chaining continuations

future<string> make_string()
{
    future<int>    f1 = async( []() -> int { return 42; });

    future<string> f2 = f1.then(
                        [](future<int> f) -> string
                        {
                            return to_string(f.get()); // .get() won't block
                        });
}



future<int> test_when_all()
{
    shared_future<int> sf1 = async([]()->int { return 42; });
    future<string>      f2 = async([]()->string { return string( "hello"); });

    future<tuple<shared_future<int>, future<string>>> all_f = when_all(sf1,f2);     // also when_any, etc.
}






Critics:


Bartosz Milewski's blog: Broken promises - C++0x futures
http://bartoszmilewski.com/2009/03/03/broken-promises-c0x-futures/


MeetingC++ - Hartmut Kaiser: Plain Threads are the GOTO of todays computing
https://www.youtube.com/watch?v=4OCUEgSNIAY