I recently started looking at concurrent programming in C++ and decided to design my own future class as an exercise. Throughout several iterations of the design, I learned a lot about why C++ futures are designed the way they are.
A future represents a later-known value.
Values are usually computed eagerly.
In the code below, the value f
is computed right away before proceeding to the print statement.
auto f = compute_something();
std::cout << "This prints after something is computed.\n";
We could wrap compute_something()
in a future that is lazily started or started eagerly on another thread. This allows execution to continue even though the value of f
is not known yet.
auto f = Future<int>(compute_something);
std::cout << "This prints after the future f is constructed, but possibly before compute_something() runs and the future is ready.\n";
// Some time later we want the value of the future
std::cout << "The future has value: " << f.get() << ".\n";
With that very short introduction, my first attempt at making a future is below.
#include <thread>
#include <functional>
#include <exception>
template <typename R>
class Future {
public:
using result_type = R;
Future(std::function<R()> f)
: f_(std::move(f)) {
start_on_new_thread_();
}
~Future() {
wait_();
}
Future(Future&&) = delete;
Future& operator=(Future&&) = delete;
Future(const Future&) = delete;
Future& operator=(const Future&) = delete;
void result_type get() {
wait_();
if (eptr_) {
std::rethrow_exception(eptr_);
}
return result_;
}
private:
std::function<R()> f_;
std::exception_ptr eptr_;
std::thread thread_;
result_type result_;
void start_on_new_thread_() {
thread_ = std::thread([this]{
try {
result_ = f_();
} catch(...) {
eptr_ = std::current_exception();
}
});
}
void wait_() {
if (thread_.joinable()) {
thread_.join();
}
}
};
The future is constructed from a callable f
, which is immediately started on a new thread. Actually the callable itself is not passed to the new thread. Instead f
is wrapped in a try-catch block that is in turn wrapped in a temporary callable. This temporary is what is passed to the new thread.
[this]{
try {
result_ = f_();
} catch(...) {
eptr_ = std::current_exception();
}
}
The reason for this is two fold. The temporary callable never throws, even if the user-supplied callable f
does. This is good news because throwing on another thread causes std::terminate()
is execute, which we do not want. The other reason is we want to catch and store any exception thrown by f
so that we can forward it if get()
is later called on the future.
(I said the temporary never throws, but this isn’t quite true. We run into issues storing the result/exception if this
is destroyed before the callable finishes. This doesn’t happen, though, because the future’s destructor waits for the thread to join.)
The write to result_
from the callable executing thread is synchronized with any read from result_
(via get()
) in the future owning thread. Synchronization is accomplished with thread_.join()
inside the wait_()
function.
Now on to the problems with the design.
This future is not default constructible, copyable, or moveable, which means we can forget about storing these futures in a container.
There is no way to check if the future is ready (i.e., if result_
is set).
The design does not work with void futures. In C++ we cannot have the class member void result_
, much less assign to it result_ = f_()
.
The future is not thread safe. If get()
is called from two threads around the same time, thread_.join()
can be called from both threads, which throws an exception.
ready()
to the futureA naive idea to check if the future is ready is to check if the thread executing it has joined.
bool ready() const {
return !thread_.joinable();
}
But this is silly. The future would only be ready after we wait for it (which is where thread_.join()
is called). But of course the future is ready after we wait for it! And it could be ready a lot earlier, too.
We could introduce a new boolean member variable ready_
that is returned by ready()
. We initialize ready_
to false in the constructor, and set it after execution finishes (but before the executing thread is joined).
[this]{
try {
result_ = f_();
} catch(...) {
eptr_ = std::current_exception();
}
ready_ = true;
}
Unfortunately this leads to a race condition between the thread setting ready_
and the thread calling ready()
. To deal with this, we could make ready_
atomic or use a mutex. Either idea throws cold water on making the future copyable or moveable, since atomics and mutexes are neither.
You might think “What a minute! I understand why the future isn’t copyable. std::thread
isn’t copyable. But putting aside our wish to add synchronization primitives, why isn’t the current future moveable? All of its member variables are!”
Adding default move operations to the future leads to incorrect and undefined behavior.
Future(Future&&) = default;
Future& operator=(Future&&) = default;
Consider what happens in the following code that moves a future.
auto bright = Future<int>([](){
std::this_thread::sleep_for(std::chrono::seconds(1));
return 314;
});
Future<int> bleak(std::move(bright));
std::cout << "The bleak future has value: " << bleak.get() << ".\n";
The code defines a bright future that sleeps for 1 second and then returns 314.
The future is moved to bleak before getting its value.
On my machine bleak.get()
returns 0. Not 314.
What happened?
This issue is the capture of this
in the lambda that gets executed on the future’s thread_
.
[this]{
try {
result_ = f_();
} catch(...) {
eptr_ = std::current_exception();
}
}
When bright future is constructed, it starts executing the lambda on bright.thread_
. The lambda captures this
, the memory location of bright (&bright
). Inside the lambda result_
means bright.result_
. This is true from the moment the lambda is created.
During construction of bleak, bright.thread_
is moved to bleak.thread_
and bright.result_
is moved to bleak.result_
. Since bright.result_
is not initialized until the future is ready, bleak.result_
is likely initialized with garbage. When the future is ready, its result (as instructed by the lambda) is written to bright.result_
, where we cannot acess it from bleak.get()
. Instead bleak.get()
returns the initialized garbage (0 on my machine).
If we add a 2 second pause before constructing bleak, then bleak.get()
returns 314, but not for the correct reason. In the pause, the bright future becomes ready and stores 314 in bright.result_
. This value is moved into bleak.result_
when its constructed.
auto bright = Future<int>([](){
std::this_thread::sleep_for(std::chrono::seconds(1));
return 314;
});
std::this_thread::sleep_for(std::chrono::seconds(2)); // Enjoy the bright future for a bit
Future<int> bleak(std::move(bright));
std::cout << "The bleak future has value: " << bleak.get() << ".\n";
We need a persistent location to store the result of the future outside of the future itself.
We’d like to move and copy our futures, and guarantee safe access to the future from multiple threads. The idea is to seperate the future from its data.
// FutureData is not copyable or moveable and can therefore have
// synchronization primitives (atomics, mutexes, condition variables), which
// are usually not copyable or moveable
template <typename R>
struct FutureData {
std::exception_ptr eptr;
R result;
std::atomic<bool> ready;
};
// Future contains a pointer to its data and can be moved
template <typename R>
class Future {
// Stuff here
private:
std::unique_ptr<FutureData<R>> data_ptr_;
};
This works great.
Moving the future is done by moving data_ptr_
.
But the data is always fixed in one location, even if the future is moved.
So at the time the lambda is created, it knows where to store the result.
[this]{
try {
data_ptr_->result = f_();
} catch(...) {
data_ptr_->eptr = std::current_exception();
}
}
This code is still no good.
Suppose we start a bright future and move it to a bleak one as before.
Recall that data_ptr_
in the above lambda really means bright.data_ptr_
because this
captures bright at construction.
After bright is moved to bleak, bleak.data_ptr_
points to the (unmoved) future data and bright.data_ptr_
is set to nullptr
.
The line data_ptr_->result = f_()
in the lambda attempts to dereference a nullptr
.
Not good.
We need two pointers to the future data.
One to sit in the future itself as we have, and the other to sit in the lambda.
I’ll make both pointers owning, meaning that each will keep the future data alive.
This requires that we swap out std::unique_ptr
for std::shared_ptr
in the future.
class Future {
// Stuff here
private:
std::shared_ptr<FutureData<R>> data_ptr_;
};
We can then copy data_ptr_
into the lambda at construction.
[data_ptr=data_ptr_, f=f_]{
try {
data_ptr->result = f();
} catch(...) {
data_ptr->eptr = std::current_exception();
}
}
Here I use initializer capture.
The variable data_ptr
is copy constructed from data_ptr_
and f
is copy constructed from f_
when the lambda is created.
Simply writing [data_ptr_, f_]
to capture these variables by value directly does not compile. Clang complains that data_ptr_
and f_
in the capture list do not name variables. Writing [=]
would implicitly capture this
because data_ptr_
is interpreted as this->data_ptr_
within the future class. Initializer capture is the way to get a new copy of the shared pointer.
There is an entirely different route we could take to address the problem with the lambda capturing a particular future’s this
.
A particular future’s this
is captured because the lambda is created in the context of Future. If instead the lambda were created in the context of FutureData, then this
would capture the one and only FutureData. And that would be just fine.
If we move lambda creation to the FutureData struct, then start_()
must belong to to FutureData. So the future data struct is more like a future control class.
Below is the code for the thread-safe future control block.
#include <thread>
#include <functional>
#include <exception>
#include <condition_variable>
#include <mutex>
template <typename R>
class FutureControlBlock {
private:
std::function<R()> f_; // Callable that future executes
std::thread thread_; // Thread executing the callable f_
std::exception_ptr eptr_; // Stores exception (if any) from callable f_
R result_; // Stores result of callable f_
bool started_; // Initialized to false. True after execution starts
// Once true, started_ is never set to false again.
std::atomic<bool> ready_;
mutable std::mutex ready_mutex_;
std::condition_variable ready_cond_var_;
mutable std::mutex started_mutex_; // mutex to protect started_
std::mutex thread_mutex_; // mutex to protect thread_
void join_thread_() {
// Mutex is locked so multiple threads don't call thread._join()
std::scoped_lock<std::mutex> lock(thread_mutex_);
if (thread_.joinable()) {
thread_.join();
}
}
// We guarantee that start_() is invoked at most one time
// across multiple thread calls to start() and start_on_new_thread()
// Its invocation status is captured by started_
// This means result_ and eptr_ are written by at most one thread
// This happens before ready_ is set to true
// which happens before any reads of those variables
void start_() {
try {
result_ = f_();
} catch(...) {
eptr_ = std::current_exception();
}
{
std::scoped_lock<std::mutex> lock(ready_mutex_);
ready_ = true;
}
ready_cond_var_.notify_all();
}
public:
FutureControlBlock(std::function<R()> f, bool start)
: f_(f), started_(false), ready_(false)
{
if (start) {
start_on_new_thread();
}
}
~FutureControlBlock() { join_thread_(); }
// Not default constructible
FutureControlBlock() = delete;
// Not copyable
FutureControlBlock(const FutureControlBlock&) = delete;
FutureControlBlock& operator=(const FutureControlBlock&) = delete;
// Not moveable
FutureControlBlock(FutureControlBlock&&) = delete;
FutureControlBlock& operator=(FutureControlBlock&&) = delete;
bool started() const
{
// Correct because the return value is constructed before the destruction of
// local variables (i.e., the lock)
std::scoped_lock<std::mutex> lock(started_mutex_);
return started_;
}
void start() {
{
std::scoped_lock<std::mutex> lock(started_mutex_);
if (started_) {
return;
}
// Update started right away because this runs on current thread
// Compare to start_on_new_thread()
started_ = true;
}
start_();
}
void start_on_new_thread()
{
std::scoped_lock<std::mutex> started_lock(started_mutex_);
if (started_) {
return;
}
// We allow early exit if started_ == true *before* trying to aquire thread_mutex_
// which is unavailable during its join attempt
// This is the only function that aquires a mutex when it already holds one
// * started_mutex_ is aquired first
// * thread_mutex_ is aquired second
// * ready_mutex_ is aquired third (in the call to start_() on the other thread)
// There is no cycle in the aquire sequence, so no deadlock
std::scoped_lock<std::mutex> thread_lock(thread_mutex_);
thread_ = std::thread([this]{ start_(); });
started_ = true; // Update started_ only after successful creation of thread
}
bool ready() const noexcept
{
std::scoped_lock<std::mutex> lock(ready_mutex_);
return ready_;
}
void wait()
{
std::unique_lock<std::mutex> lock(ready_mutex_);
ready_cond_var_.wait(lock, [this]{ return ready_ == true; } );
}
template <typename Rep, typename Period>
bool wait_for(const std::chrono::duration<Rep, Period>& rel_time)
{
std::unique_lock<std::mutex> lock(ready_mutex_);
return ready_cond_var_.wait_for(lock, rel_time, [this]{ return ready_ == true; } );
}
template <typename Clock, typename Duration>
bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time)
{
std::unique_lock<std::mutex> lock(ready_mutex_);
return ready_cond_var_.wait_until(lock, timeout_time, [this]{ return ready_ == true; } );
}
R get()
{
wait();
if (eptr_) {
std::rethrow_exception(eptr_);
}
return result_;
}
};
The class uses mutexes and is thread safe.
The flag ready_
is protected with a mutex and synchronization in wait()
is done with a condition variable.
These changes are not needed to support wait()
, but to support its variants wait_for()
and wait_until()
as well as the new method start()
(in the first attempt we only had something like start_on_new_thread()
). Without these changes, synchronization could still be done with join as before (and ready_ could be atomic).
The future class is just a view into the FutureControlBlock.
template <typename R>
class Future {
public:
using result_type = R;
Future()
: control_block_ptr_(nullptr),
valid_(false) {}
Future(std::function<R()> f, bool start=true)
: control_block_ptr_(std::make_shared<FutureControlBlock<R>>(std::move(f), start)),
valid_(true) {}
~Future() = default;
Future(Future&&) = default;
Future& operator=(Future&&) = default;
Future(const Future&) = default;
Future& operator=(const Future&) = default;
inline R get() const { return control_block_ptr_->get(); }
inline void wait() const { control_block_ptr_->wait(); }
template <typename Rep, typename Period>
inline bool wait_for(const std::chrono::duration<Rep, Period>& rel_time) {
return control_block_ptr_->wait_for(rel_time);
}
template <typename Clock, typename Duration>
inline bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
return control_block_ptr_->wait_until(timeout_time);
}
inline bool valid() const { return valid_; }
inline bool ready() const { return control_block_ptr_->ready(); }
inline void start_on_new_thread() const { control_block_ptr_->start_on_new_thread(); }
inline void start() const { control_block_ptr_->start(); }
inline bool started() const { return control_block_ptr_->started(); }
private:
std::shared_ptr<FutureControlBlock<R>> control_block_ptr_;
bool valid_;
};
We also give a void total specialization that has almost no code change.
It simply wraps the void-returning callable into an int-returning callable that is used to construct a FutureControlBlock<int>
. The int is then discarded in the void-returning get()
function. The code is below.
template <>
class Future<void> {
public:
using result_type = void;
Future()
: control_block_ptr_(nullptr),
valid_(false) {}
Future(std::function<void()> f, bool start=true)
: control_block_ptr_(
std::make_shared<FutureControlBlock<int>>(
[f = std::move(f)] { f(); return 0; },
start)),
valid_(true) {}
~Future() = default;
Future(Future&&) = default;
Future& operator=(Future&&) = default;
Future(const Future&) = default;
Future& operator=(const Future&) = default;
inline void get() const { control_block_ptr_->get(); }
inline void wait() const { control_block_ptr_->wait(); }
template <typename Rep, typename Period>
inline bool wait_for(const std::chrono::duration<Rep, Period>& rel_time) {
return control_block_ptr_->wait_for(rel_time);
}
template <typename Clock, typename Duration>
inline bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
return control_block_ptr_->wait_until(timeout_time);
}
inline bool valid() const { return valid_; }
inline bool ready() const { return control_block_ptr_->ready(); }
inline void start_on_new_thread() const { control_block_ptr_->start_on_new_thread(); }
inline void start() const { control_block_ptr_->start(); }
inline bool started() const { return control_block_ptr_->started(); }
private:
std::shared_ptr<FutureControlBlock<int>> control_block_ptr_;
bool valid_;
};
Here are some examples of using the future class.
From the examples, you can see that this future is like a mixture of std::async
, std::packaged_task
, and std::future
.
// Run 10 futures that are eagerly started on new threads
std::vector<Future<void>> futs;
for (int i = 0; i < 10; ++i) {
futs.emplace_back([i]{
std::this_thread::sleep_for(std::chrono::milliseconds(10*i));
std::cout << "Done executing future " << i
<< " on thread " << std::this_thread::get_id() << ".\n";
}, true);
}
// Define a future, but do not start it on construction
// Explicitly start it on main thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto f = Future<void>([]{
std::cout << "This future runs on the main thread "
<< std::this_thread::get_id() << ".\n";
}, false);
std::cout << "On the main thread " << std::this_thread::get_id() << ".\n";
f.start();
// Define a future, but do not start it on construction
// Explicitly start it on another thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto g = Future<int>([]{
std::cout << "This future runs on thread "
<< std::this_thread::get_id() << ".\n";
return 314;
}, false);
g.start_on_new_thread();
std::cout << "The value of the future is " << g.get() << ".\n";
// Define a future, but do not start it on construction
// Manually start it on another thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto h = Future<int>([]{
std::cout << "This future runs on thread "
<< std::this_thread::get_id() << ".\n";
return 42;
}, false);
std::thread thread([&h]{ h.start(); });
std::cout << "The value of the future is " << h.get() << ".\n";
thread.join();
To design a future-promise very little needs to change. We keep the non-copyable and non-moveable FutureControlBlock (called the shared state in the standard) and create seperate classes to access the block. Rather than have one accessor that both reads (via get()
) and writes (via start()
and start_on_new_thread()
) to the control block as we do with the design of the above future, we create separate readers and writers.
The reader accessor is traditionally called a Future (with methods get()
, ready()
, and the variants of wait()
). The writer accessor is usually called a Promise (with methods set_value()
and set_exception()
). We can have a seperate TaskPromise writer (similar to std::packaged_task
) with methods like set_task()
, start_task()
and start_task_on_new_thread()
. The control block only needs result_
, eptr_
, and ready_
. We can move thread_
, f_
, and the various start methods to the TaskPromise writer.
The write to the control block is a one-time operation. It therefore makes sense to make the writers (Promise and TaskPromise) moveable, but not copyable. On the other hand, the readers (Future) can be moveable and copyable. (Note that std::future
is only moveable, but std::shared_future
is also copyable. std::promise
is only moveable.)
One final question to address is who creates the control block when we have separate readers and writers? One design is to directly create the control block and give it get_future()
and get_promise()
methods. A more common design is to have the writer (Promise, TaskPromise) create the control block and give the writer a get_future()
method that returns a control block reader.
The code is here. It includes code for tasks that can be chained as in the following snippet.
auto t = MakeTask([]{
std::cout << "Task 1 on thread " << std::this_thread::get_id() << ".\n";
return "Secret message";
}).then([](Future<const char*> input){
std::cout << "Task 2 on thread " << std::this_thread::get_id() << ". "
<< "First task says: " << input.get() << ".\n";
return 10;
}).then([](Future<int> input){
std::cout << "Task 3 on thread " << std::this_thread::get_id() << ".\n";
return 10 + input.get();
});
t.start();
std::cout << "Task final value is: " << t.get_future().get() << ".\n";
The chaining is dynamic and can be used in a for-loop as long as the return type is constant.
auto t = MakeTask([]{
std::cout << "Task 1 on thread " << std::this_thread::get_id() << ".\n";
return "Secret message";
}).then([](Future<const char*> input){
std::cout << "Task 2 on thread " << std::this_thread::get_id() << ". "
<< "First task says: " << input.get() << ".\n";
return 10;
});
for (int i = 3; i < 25; ++i) {
t = t.then([i](Future<int> input){
std::cout << "Task " << i << " on thread " << std::this_thread::get_id() << ".\n";
return 10 + input.get();
});
}
t.start();
std::cout << "Task final value is: " << t.get_future().get() << ".\n";
Tasks are moveable, but not copyable.
The then()
method moves from *this
and returns a new task.
I lastly want to go over the code for then()
because I struggled getting it to work at first.
template <typename Callable>
auto then(Callable next) {
using R2 = typename std::result_of_t<Callable(Future<R>)>;
auto task_ptr = std::make_shared<Task<R>>(std::move(*this));
return Task<R2>([task_ptr=task_ptr, next=std::move(next)] {
task_ptr->start_on_new_thread();
auto res = task_ptr->get_future();
res.wait();
return next(std::move(res));
});
}
This works by moving *this
into a heap allocated location and giving the returned task a way to invoke *this
through task_ptr
.
A more explicit representation of what is going on is a singly linked list of tasks arranged in reverse order; the head pointing to the last task to complete and the tail pointing to the first.
Each node invokes previous work, waits for it to finish, and then does its own work.
So the head of the list (last task) invokes the next node (penultimate task), waits for it to finish, and then does its own work.
It seems far more natural for the list to be in order, with each node doing its own work, waiting, and then invoking the next node.
One issue with that approach has to do with type.
The final result of the task chaining has type parametrized by the return value of the last task.
It is convenient having this be the head of the list so that each link in the chain knows its type.
Why use task_ptr
and not directly capture *this
in the lambda like this [task=std::move(*this)]
?
This would nest tasks within tasks and make larger and larger objects with each then()
call.
Beyond that two compilation errors occur.
The first happens because capturing *this
makes the lambda itself moveable, but not copyable.
This matters because the Task<R>
constructor creates a std::function<R()>
from the passed callable and std::function
requires the passed callable be copy constructable.
The second error occurs because items captured by the lambda are effectively const
.
The lambda’s operator()
is defined const
so writing task.start_on_new_thread()
in the lambda fails because the captured task
is const
, but start_on_new_thread()
is not a const
-method.
(Notice we did not have this problem when capturing this
earlier because the capture simply makes this
a const
-pointer, but not a pointer to const
-object. Capturing *this
has a different effect.)
Both these problems can be overcome.
To overcome the first, we can remove std::function
from the task template and parametrize the template on the callable itself.
The second problem can be overcome by marking the lambda mutable
.
The template <Callable> Task2
in Task.h takes this approach.
The downside for not dynamically allocating new space with each then()
call is that the size (and type) of task object grows with each then()
call.
Since the type changes, it cannot be used in a for-loop like the first version.