Multithreading in C++
Multithreading is just one damn thing after, before, or simultaneous
with another.
--Andrei Alexandrescu
int X = 0;
int Y = 0;
int r1 = X; int r2 = Y;
if ( 1 == r1 ) if ( 1 == r2 )
Y = 1; X = 1;
struct s { char a; char b; } x;
x.a = 1; x.b = 1;
struct s tmp = x; struct s tmp = x;
tmp.a = 1; tmp.b = 1;
x = tmp; x = tmp;
Beyond the errors which can occur in single-threaded programs,
multithreaded environments are subject to additional errors:
- Race conditions
- Deadlock
- Priority failures (priority inversion, starvation, etc...)
Moreover testing and debugging of multithreaded programs are harder.
Multithreaded programs are non-deterministic. Failures are often
non-repeatable. Debugged code can produce very different results then
non-debugged ones. Testing on single processor hardware may produce
different results than testing on multiprocessor hardware.
In this section we show how easy to make mistakes when we are not
carefull enough. The text-book example is the double-checked lockig
pattern, invented by Douglas C. Schmidt in the Washington University,
St. Luis, and the idea has been appeared in a chapter of the book
Pattern Languages of Program Design 3. published by Addison-Wesley, 1997.
Singleton Pattern
Singleton is a well-known Design Pattern, which ensures that a class
has only one instance and provides a global point of access to that
instance. For many reasons, dynamically allocated Singletons are very
popular in C++ programs. (One reason: the evaluation order of static
data declarations is not defined over multiple compilation units, one
other: lazy instantiation: if there is no request for the Singleton
object, we can spare the initialization cost.)
The canonical implementation of the Singleton pattern is the following:
class Singleton
{
public:
static Singleton *instance();
void other_method();
private:
static Singleton *pinstance;
};
Singleton *Singleton::pinstance = 0;
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
pinstance = new Singleton;
}
return pinstance;
}
And the user simply call other methods on the instance() member function:
Singleton::istance()-> other_method();
Thread Safety
It is clear that the canonical implementation of Singleton is not thread
safe. Let consider the following race condition: Thread A and B try to use
Singleton simultaneously on a preemptive parallel environment before its
first initialization.
First thread A evaluates the if condition and consider its value as a
null pointer. Then this thread is suspended.
Now thread B evaluates the condition, reads the null pointer and goes on
to create the instance. The Singleton has been created.
Thread A gets the controll again. It continues to create the Singleton.
Now a second instance has been created.
A critical section is a sequence of instructions that obeys the following
invariant: only one thread/process must execute at a time. The
creation/initialization of the Singleton pattern is a critical section.
The common way to implement a critical section is to add a Lock, like a
Mutex. With a static Mutex, make the Singleton pattern thread safe is easy.
class Singleton
{
public:
static Singleton *instance();
void other_method();
private:
static Singleton *pinstance;
static Mutex lock_;
};
Singleton *Singleton::pinstance = 0;
Singleton *Singleton::instance()
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
pinstance = new Singleton;
}
return pinstance;
}
The ACE::Guard class can use different kind of lock strategies via the
template parameter. Guard uses the RAII (Resource Allocation Is
Initialization) principle: the constructor requires the lock, and the
destructor releases it.
Double-Checked Locking Pattern
However the previous solution now works correctly, it is far from ideal.
The problem is, that although the problematic race condition is connected
only to the initialization of the Singleton instance, the critical section
executed for every calls of the instance() method. All access to the instance
acquire and release the lock. Such an excessive usage of the locking mechanism
may cause serious overhead which could not be acceptable.
One obviously wrong optimalization attempt would be to move the critical
section inside the conditional check of pinstance:
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
pinstance = new Singleton;
}
return pinstance;
}
There is still a race-condition, when each threads read pinstance as null
pointer. Then one thread is able to require the lock, the other has been
blocked. The first thread initializes the instance and then releases the
lock. Now the second thread continous the execution after the condition
check and re-initialize the Singleton.
The solution proposed by Schmidt uses a second check to ensure thread safety.
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
pinstance = new Singleton;
}
}
return pinstance;
}
Unneccessary locking is avoided by wrapping the call to new with another
conditional test. When in the previous (bad) scenario, the blocked thread
wakes up, it re-checks the state of the pointer, and finds that it is not
null anymore. The solution minimalizes the lock attempts and still ensures
correctness.
But...
The Problem with DCLP
At the end of its article Schmidt mention a certain disadvantage of the
pattern: There is a subtle portability issue that can lead to pernicious
bugs if the pattern is used in software that have non-atomic pointer or
integral assignment semantics. ... A related problem can occur if an
overly-agressive compiler optimizes (pinstance) by caching it in some
way (e.g. storing it in a register) or by removing the second check of
0 == pinstance.
Consider this part of the DLCP implementation:
pinstance = new Singleton;
This does three things:
(1) Allocate memory (via operator new) to hold the Singleton object.
(2) Construct the Singleton object in the memory.
(3) Assign to pinstance the address off the memory.
But they need not to be done in this order! For example, the compiler is
free to generate the following code:
pinstance =
operator new( sizeof(Singleton) );
new (pinstance) Singleton;
If this code is generated, the order is 1, 3, 2 .
In the context:
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
pinstance =
operator new( sizeof(Singleton) );
new (pinstance) Singleton;
}
}
return pinstance;
}
Consider: thread A executes (1) and (3) and is suspended. Now pinstance
is not null, however the instance has not been constructed. Thread B
checks the outmost condition, sees pinstance not null, and continues to
return the pointer to the non-initialized Singleton object.
The fundamental problem: we need a way to specify instruction ordering
in C++. The sad news is that there is no way for that in C++ (or in C)!
Sequence Points and Observable Behavior
The C++ standard defines so called sequence points. There is a sequence
point at the end of each statement, i.e., "at the semicolon". Also there
is a number of other cases for sequence points, like the shortcut-behaviour
logical operators, the conditional operator, the comma operator and at the
function calls.
When a sequence point is reached, all side effects of previous evaluations
shall be complete and no side effects of subsequent evaluations shall have
taken place.
This suggest that perhaps a careful statement ordering could help to control
the order of the generated instructions. It does not. The compiler may reorder
instruction as long as they preserve the observable behavior of the C++
abstract machine.
Observable behavior consist only of
- Reads and writes of volatile data.
- Calls to library I/O functions.
void foo()
{
int x = 0, y = 0;
x = 5;
y = 10;
printf( "%d,%d", x, y);
}
In the previous example (1)-(3) may be optimized away, because they have
no observable behavior. If the are executed, (1) must preceed (2)-(4) and
(4) must follow (1)-(3). We know nothing about the relative execution order
of (2) and (3). Either may come first orthey might run simultaneously.
Sequence points offer no control over the execution of statements.
if ( 0 == pinstance )
{
Singleton *temp =
operator new( sizeof(Singleton) );
new (temp) Singleton;
pinstance = temp;
}
Therefore the attempt above is not a solution for the problem. The compiler
will likely optimize out the temp variable. Also other "tricks" will likely
fail: declaring temp as extern , and defining in an other compilation unit.
Declaring the Singleton constructor in a different unit to avoid inlining or
assume that it may throw an exception, etc...
The compiler has a huge motivation for such optimizations:
- Execute operations in parallel where the hardware allows it.
- To avoid spilling data from register.
- To perform common subexpression elimination.
- To keep the instruction pipeline full.
- To reduce the size of the generated executable.
Volatile
Douglas Schmidt proposed volatile to prevent "agressive optimalizations"
in his article. Volatile prevents some compiler optimalizations.
The order of reads/writes of volatile data must be preserved.
Volatile memory may change outside of program control, i.e. some hardware
action, therefore "redundant" reads/writes in source code must not be
eliminated.
This is the C++ meaning of volatile. In Java and in C# volatile is different.
They do add acquire/release semantics to volatile.
Volatile would seem to make the temporary variable strategy viable:
class Singleton
{
public:
static Singleton *instance();
void other_method();
private:
static volatile Singleton *pinstance;
static Mutex lock_;
int x;
Singleton() : x(1) {}
};
Singleton *Singleton::pinstance = 0;
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
volatile Singleton *temp = new Singleton; (1)
pinstance = temp; (2)
}
}
return pinstance;
}
Indeed, the lines (1) and (2) must not be reordered. This is the code
generated after inlining the constructor:
Singleton *Singleton::instance()
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
volatile Singleton *temp = static_cast<Singleton *>
(operator new(sizeof(Singleton)));
temp -> x = 1;
pinstance = temp;
}
}
return pinstance;
}
where (2) is the constructor inlined. Though temp is volatile, *temp is
not, so instructions can be reordered.
if ( 0 == pinstance )
{
volatile Singleton *temp = static_cast<Singleton *>
(operator new(sizeof(Singleton)));
pinstance = temp;
temp -> x = 1;
}
And here, pinstance again points to an uninitialized memory.
The main problem is that constraints on the observable behavior apply only
to C++ abstract machine. But the abstract machine was defined as implicitelly
single threaded.
Memory Model
Modern hardware architecture with multiprocessors looks in the following:
______________________________________
| |
| |
| memory |
| |
|______________________________________|
| | | | |
| | | | |
cache1 cache2 cache3 cache4 cache5
| | | | |
proc1 proc2 proc3 proc4 proc5
The value of a shared memory location may appear in more then one cache.
They might different values in the different caches. Hardware regularly
takes care of this situations. Different contents of the cache may be
flushed in a different order as they were written. Example: processor A
modifies x variable then modifies y variable. When the cache is flushed,
y is flushed before x. Therefore an other processor may see y value change
before x value change.
Out of order write visibility can cause DCLP to fail. Remember, that
pInstance = new Singleton; does the following:
(1) Allocate memory (via operator new) to hold the Singleton object.
(2) Construct the Singleton object in the memory.
(3) Assign to pinstance the address off the memory.
Now suppose, processor A does these steps in this order. Processor B sees
the result of (3) before the result of (2), therefore goes on to use the
Singleton object as it would be initialized. (It is, but the cache was not written out yet.)
C++ offers no guaranties that how different threads see the phenomenon above.
Some libraries, like POSIX offers some guaranties.
Java memory model was revised to offer guaranties. Download the latest
documentation from http:
The visibility problems described above can be prevented via the use of
memory barriers. Barriers are instructions that constraint the reads and
writes to allow readers and writers to develop some consistent strategy
on memory usage.
Readers use an acquire barrier to prevent subsequent source code memory
accesses moving before the barrier.
Writers use a release barrier to prevent prior source code memory access
moving after the barrier.
Then one can use a protocol of release/acquire. The release changes the
state of memory and the acquire guaranties that the reader see the new
state. Compilers may not reorder reads and writes that violates memory
barrier semantics. Runtime systems and hardware may also not violate that.
Singleton *Singleton::instance()
{
Singleton *temp = pInstance;
Perform acquire;
if ( 0 == temp )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
temp = new Singleton;
Perform release;
pinstance = temp;
}
}
return pinstance;
}
Problem: memory barriers are not implemented on every hardware so
the code above is not portable.
Rule for Avoiding Memory Visibility Problems
Shared data should be accessed only if one of the following is true:
- The access is inside a critical section.
For communication via message passing, these protocols are followed:
(1) Read ready flag; perform acquire; read message.
(2) Write message; perform release; write ready flag.
DCLP effectively uses message passing:
pInstance is the ready flag: if it is not null, then "ready".
pInstance itself is the message.
Singleton *Singleton::instance()
{
Singleton *temp = pInstance;
Perform acquire;
if ( 0 == temp )
{
Guard<Mutex> guard(lock_);
if ( 0 == pinstance )
{
temp = new Singleton;
Perform release;
pinstance = temp;
}
}
return pinstance;
}
This is the correct messaging protocol.
Locks as Barriers
It is critical, that guard is initialized before the second if statement.
Why the following is never happened:
if ( 0 == temp )
{
if ( 0 == pinstance )
{
Guard<Mutex> guard(lock_);
temp = new Singleton;
Threading libraries ensure that lock acquisition includes the moral
equivalent of an acquire, and lock release includes the equivalent
of a release.
Singleton *Singleton::instance()
{
Singleton *temp = pInstance;
Perform acquire;
if ( 0 == temp )
{
Guard<Mutex> guard(lock_);
Perform acquire;
if ( 0 == pinstance )
{
temp = new Singleton;
Perform release;
pinstance = temp;
}
Perform release;
}
return pinstance;
}
Thus locks provide us a portable way to insert memory barriers.
Atomics & Sequential consistency in C++11
int x, y;
x = 1; | cout << y << ", ";
y = 2; | cout << x << endl;
int x, y;
mutex x_mutex, y_mutex;
x_mutex.lock() | y_mutex.lock();
x = 1; | cout << y << ", ";
x_mutex.unlock() | y_mutex.unlock();
y_mutex.lock() | x_mutex.lock();
y = 2; | cout << x << endl;
y_mutex.unlock() | x_mutex.unlock();
std::atomic<int> x, y;
x.store(1); | cout << y.load() << ", ";
y.store(2); | cout << x.load() << endl;
0 0
2 1
0 1
store()/load() ----> sequential consistency
atomics --> atomic load()/store() + ordering
Full ordering can be expensive in modern hardware,
so we might want to relax these rules
Memory ordering options:
memory_order_seq_cst ____ sequentially consistent (default)
memory_order_consume _
memory_order_acquire |__ acquire-release
memory_order_release |
memory_order_acq_rel _|
memory_order_relaxed ____ relaxed ordering
For example x86/x86_64 architectures does not require additional instructions
to implement acquire-release ordering.
Sequential consistent ordering
------------------------------
Each individual thread is sequential, operations can't be reordered.
A seq_cst store is sincronized with a seq_cst load of the same variable.
std::atomic<int> x, y;
x.store(1, memory_order_relaxed); |cout << y.load(memory_order_relaxed) << ", ";
y.store(2, memory_order_relaxed); |cout << x.load(memory_order_relaxed) << endl;
0 0
2 1
0 1
2 0
atomics with relaxed mode --> atomic load()/store()
std::atomic<int> x, y;
x.store(1, memory_order_release); |cout << y.load(memory_order_acquire) << ", ";
y.store(2, memory_order_release); |cout << x.load(memory_order_acquire) << endl;
0 0
2 1
0 1
Only minimal progress guaranties are given:
1. unblocked threads will make progress
2. implementation should ensure that writes in a thread
should be visible in other threads "in a finite amount of time".
The A happens before B relationship:
1. A is sequenced before B or
2. A inter-thread happens before B
== there is a syncronization point between A and B
Syncronization point:
1. thread creation sync with start of thread execution
2. thread completion sync with the return of join()
3. unlocking a mutex sync with the next locking of that mutex
Data race:
A program containes a "data race"
if contains two actions in different threads, at least one is not "atomic"
and neithr happens before the other.
Memory location:
1. an object of scalar type or
2. a maximal sequence of adjacent bit-fields all having non-zero width
Two threads of execution can update and access separate memory locations
without interferuing each others:
struct { char a; char b; } x;
x.a = 1; x.b = 1;
std::thread
===========
#include <iostream>
#include <thread>
namespace std
{
class thread
{
public:
typedef native_handle ...;
typedef id ...;
thread() noexept;
thread( thread&& other) noexept;
~thread();
template <typename Function, typename... Args>
explicit thread( Function&& f, Arg&&... args);
thread(const thread&) = delete;
thread& operator=(thread&& other) noexept;
void swap( thread& other);
bool joinable() const;
void join();
void detach();
std::thread::id get_id() const;
static unsigned int hardware_concurrency();
native_handle_type native_handle();
};
}
void f( int i, const std::string& s);
std::thread t(f,3,"Hello");
class f
{
public:
f( int i, std::string s) : _i(i), _s(s) {}
void operator() const
{
}
int _i;
std::string _s;
};
std::thread t( f() );
std::thread t(( f(3,"Hello") ));
std::thread t{ f(3,"Hello") };
std::thread t( [] { } );
void f( int i, const std::string&);
{
std::cout << "Hello concurrent world" << std::endl;
}
int main()
{
int i = 3;
std::string s("Hello");
std::thread t( f, i, s);
t.join();
return 0;
}
Possible alternative destructor strategies:
(Scott Meyers: Effective Modern C++)
- Implicit join: the destructor waits until the thread execution is completed.
Hard to detect performance issues.
- Implicit detach: destructor runs, but the underlying thread will still run.
Destructor may free memory but the thread may try to access them.
Still, there is possible to make wrong code (of course, this is C++).
Better to avoid pointers and references, or join().
struct func
{
int& i;
func(int& i_) : i (i_) { }
void operator()()
{
for(unsigned int j=0; j < 1000000; ++j)
{
do_something(i);
} |
} |
};
void oops()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread my_thread(my_func);
my_thread.detach();
}
void f( int i, const std::string& s);
std::thread t( f, 3, "Hello");
"Hello" is passed to f as const cast *
and converted to std::string in the new thread!
This can be a problem:
void f( int i, std::string const& s);
void oops( int some_param)
{
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(f,3,buffer);
t.detach();
}
void f( int i, std::string const& s);
void not_oops( int some_param)
{
char buffer[1024];
sprintf(buffer,"%i",some_param);
std::thread t(f,3,std::string(buffer));
t.detach();
}
It is possible to create a RAII thread.
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_): t(std::move(t_))
{
if(!t.joinable())
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func;
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state)));
do_something_in_current_thread();
}
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for(unsigned i=0;i<20;++i)
{
threads.push_back(std::thread(do_work,i));
}
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join));
}
std::thread::hardware_concurency() gives a hint about the available cores.
(Be aware of "oversubscription", i.e. using more threads than cores we have.)
std::thread::id identifiers returned by std::this_thread::get_id();
It returns std::thread::id() if there is no associated thread.
Operators on std::thread::id:
== != <
std::hash<std::thread::id>
std::ostream& <<
std::thread::id master_thread;
void some_core_part_of_algorithm()
{
if(std::this_thread::get_id()==master_thread)
{
do_master_thread_work();
}
do_common_work();
}
Syncronization objects
======================
#include <mutex>
void f()
{
std::mutex m;
int sh;
m.lock();
sh+=1;
m.unlock();
}
std::recursive_mutex m;
int sh;
void f(int i)
{
m.lock();
sh+=1;
if (--i>0) f(i);
m.unlock();
}
void f()
{
std::timed_mutex m;
int sh;
if (m.try_lock_for(std::chrono::seconds(10)))
{
sh+=1;
m.unlock();
}
else
{
}
}
void f()
{
std::timed_mutex m;
int sh;
if (m.try_lock_until(midnight))
{
sh+=1;
m.unlock();
}
else
{
}
}
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> l;
std::mutex m;
void add_to_list(int value);
{
std::lock_guard< std::mutex > guard(m);
l.push_back(value);
}
"Pointers or references pointing out from the guarded area may be an issue!"
friend bool operator<( T const& lhs, X const& rhs)
{
if ( &lhs == &rhs )
return false;
lhs.m.lock();
rhs.m.lock();
return lhs.data < rhs.data;
}
a < b | b < a
Avoid deadlocks:
1. Avoid nested locks
2. Avoi user defined call when holding a lock
3. Ackquire locks in a fixed order
friend bool operator<( T const& lhs, X const& rhs)
{
if ( &lhs == &rhs )
return false;
std::lock( lhs.m, rhs.m);
std::lock_guard< std::mutex > lock_lhs( lhs.m, std::adopt_lock);
std::lock_guard< std::mutex > lock_rhs( rhs.m, std::adopt_lock);
return lhs.data < rhs.data;
}
friend bool operator<( T const& lhs, X const& rhs)
{
if ( &lhs == &rhs )
return false;
std::unique_lock< std::mutex > lock_lhs( lhs.m, std::defer_lock);
std::unique_lock< std::mutex > lock_rhs( rhs.m, std::defer_lock);
std::lock( lock_lhs, lock_rhs);
return lhs.data < rhs.data;
}
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
recursive_mutex
template <typename T>
class MySingleton
{
public:
std::shared_ptr<T> instance()
{
std::unique_lock<std::mutex> lock(resource_mutex);
if ( ! resource_ptr )
resource_ptr.reset( new T(...) );
lock.unlock();
return resource_ptr;
}
private:
std::shared_ptr<T> resource_ptr;
mutable std::mutex resource_mutex;
};
template <typename T>
class MySingleton
{
public:
std::shared_ptr<T> instance()
{
if ( ! resource_ptr )
{
std::unique_lock<std::mutex> lock(resource_mutex);
if ( ! resource_ptr )
resource_ptr.reset( new T(...) );
lock.unlock();
}
return resource_ptr;
}
private:
std::shared_ptr<T> resource_ptr;
mutable std::mutex resource_mutex;
};
template <typename T>
class MySingleton
{
public:
std::shared_ptr<T> instance()
{
std::call_once( resource_init_flag, init_resource);
return resource_ptr;
}
private:
void init_resource()
{
resource_ptr.reset( new T(...) );
}
std::shared_ptr<T> resource_ptr;
std::once resource_init_flag;
};
class MySingleton;
MySingleton& MySingletonInstance()
{
static MySingleton _instance;
return _instance;
}
Syncronization
==============
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
lk.lock();
}
}
std::mutex my_mutex;
std::queue< data_t > my_queue;
std::conditional_variable data_cond;
void producer()
{
while ( more_data_to_produce() )
{
const data_t data = produce_data();
std::lock_guard< std::mutex > prod_lock(my_mutex);
my_queue.push(data);
data_cond.notify_one();
}
}
void consumer()
{
while ( true )
{
std::unique_lock< std::mutex > cons_lock(my_mutex);
data_cond.wait(cons_lock,
[&my_queue]{return !my_queue.empty();});
data_t data = my_queue.front();
my_queue.pop();
cons_lock.unlock();
consume_data(data);
}
}
During the wait the cndition variable may check the condition any time,
but under the protection of the mutex and returns immediately if cond is true.
Spurious wake: wake up without notification from other thread.
Undefined times and frequency -> better to avoid functions with side effect.
(e.g.using a counter in lambda to check how many notifications were is bad)
Futures
=======
1976 Daniel P. Friedman and David Wise: promise
1977 Henry Baker and Carl Hewitt: future (similar to)
Future is a read-only placeholder view of a variable
Promise is a writable, single assignment container (set the value of future)
Promise --> Future communnication channel
Futures are results of asyncronous function calls. When I execute that
function I won't get the result, but get a "future" which will hold the
result when the function completed.
Later, I can check a future, whether the result is already available.
If it is, I can continue my work.
If not, I can either do other things and later re-check the future,
or I can block and wait for the result.
A future is also capable to store exceptions.
std::future has been renamed to std::future
- The only instance to refer the assincron event
- move only
std::shared_future
- Multiple instances may refer to the event
- copy
- all instances will be ready on the same time
is_ready()
get() - returns rvalue
#include <future>
#include <iostream>
int f(int);
void do_other_stuff();
int main()
{
std::future<int> the_answer = std::async(f,1);
do_other_stuff();
std::cout<< "The answer is " << the_answer.get() << std::endl;
}
The std::async() executes the task either in a new thread or on get()
auto fut1 = std::async(std::launch::async, f, 1);
auto fut2 = std::async(std::launch::deferred, f, 2);
auto fut3 = std::async(std::launch::deferred | std::launch::async, f, 3);
auto fut4 = std::async(f, 4);
If no wait() or get() is called, then the task may not be executed at all.
Exceptions:
double square_root(double x)
{
if ( x < 0 )
{
throw std::out_of_range("x<0");
}
return sqrt(x);
}
int main()
{
std::future<double> fut = std::async( square_root, -1);
double res = fut.get();
}
Also there are:
fut.valid() -- future has a shared state
fut.wait() -- wait until result is available
fut.wait_for() -- timeout duration
fut.wait_until() -- wait until specific time point
Packaged task
=============
double x = 4.0;
std::packaged_task<double(double)> tsk(square_root);
std::future<double> fut = tsk.get_future();
std::thread t(std::move(tsk), x);
double res = fut.get();
Promise
=======
How to implement a packaged_task?
http:
Promises: lower-level tool than package_task
It is a tool for passing the return value (or exception) from the thread
executing a function to the thread that consumes the result using future.
template <typename> class my_task;
template <typename R, typename ...Args>
class my_task<R(Args...)>
{
std::function<R(Args...)> fn;
std::promise<R> pr;
public:
template <typename ...Ts>
explicit my_task(Ts&&... ts) : fn(std::forward<Ts>(ts)...) { }
template <typename ...Ts>
void operator()(Ts&&... ts)
{
pr.set_value(fn(std::forward<Ts>(ts)...));
}
std::future<R> get_future() { return pr.get_future(); }
};
try
{
my_promise.set_value( square_root(x) )
}
catch(...)
{
my_promise.set_exception( std::current_exception() );
}
Use of a promise:
void asyncFun( std::promise<int> myPromise)
{
int result;
try
{
myPromise.set_value(result);
}
catch ( MyException e )
{
myPromise.set_exception(std::copy_exception(e));
}
}
or
void asyncFun( std::promise<int> myPromise)
{
int result;
try
{
myPromise.set_value(result);
}
catch ( ... )
{
myPromise.set_exception(std::current_exception());
}
}
int main()
{
std::promise<int> intPromise;
std::future<int> intFuture = intPromise.getFuture();
std::thread t(asyncFun, std::move(intPromise));
int result = intFuture.get();
return 0;
}
Example from Stroustrup:
void comp(vector<double>& v)
{
packaged_task<double(double*,double*,double)>
pt0{std::accumulate<double*,double*,double>};
packaged_task<double(double*,double*,double)>
pt1{std::accumulate<double*,double*,double>};
auto f0 = pt0.get_future();
auto f1 = pt1.get_future();
pt0(&v[0],&v[v.size()/2],0);
pt1(&[v.size()/2],&v[size()],0);
return f0.get()+f1.get();
}
template<class T, class V> struct Accum
{
T* b;
T* e;
V val;
Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
V operator() () { return std::accumulate(b,e,val); }
};
void comp(vector<double>& v)
{
if (v.size()<10000) return std::accumulate(v.begin(),v.end(),0.0);
auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};
return f0.get()+f1.get()+f2.get()+f3.get();
}
Coming:
C++17 resumable functions
async ... wait
transactional memory
continuation
then()
when_any()
when_all()
parallel STL (Intel TBB)?
std::future<std::ptrdiff_t> tcp_reader(int total)
{
char buffer[64*1024];
std::ptrdiff_t result = 0;
auto conn = await Tcp::Connect("127.0.0.1", 1337);
do
{
auto bytesRead = await conn.Read(buf, sizeof(buf));
total -= bytesRead;
result += std::count(buf, buf+bytesRead, 'c');
}
while ( total > 0 );
return result;
}
future<string> make_string()
{
future<int> f1 = async( []() -> int { return 42; });
future<string> f2 = f1.then(
[](future<int> f) -> string
{
return to_string(f.get());
});
}
future<int> test_when_all()
{
shared_future<int> sf1 = async([]()->int { return 42; });
future<string> f2 = async([]()->string { return string( "hello"); });
future<tuple<shared_future<int>, future<string>>> all_f = when_all(sf1,f2);
}
Critics:
Bartosz Milewski's blog: Broken promises - C++0x futures
http:
MeetingC++ - Hartmut Kaiser: Plain Threads are the GOTO of todays computing
https: