statsandstuff a blog on statistics and machine learning

Making a future in C++

I recently started looking at concurrent programming in C++ and decided to design my own future class as an exercise. Throughout several iterations of the design, I learned a lot about why C++ futures are designed the way they are.

A future represents a later-known value. Values are usually computed eagerly. In the code below, the value f is computed right away before proceeding to the print statement.

auto f = compute_something();
std::cout << "This prints after something is computed.\n";

We could wrap compute_something() in a future that is lazily started or started eagerly on another thread. This allows execution to continue even though the value of f is not known yet.

auto f = Future<int>(compute_something);
std::cout << "This prints after the future f is constructed, but possibly before compute_something() runs and the future is ready.\n";

// Some time later we want the value of the future
std::cout << "The future has value: " << f.get() << ".\n";

With that very short introduction, my first attempt at making a future is below.

#include <thread>
#include <functional>
#include <exception>

template <typename R>
class Future {
public:
    using result_type = R;

    Future(std::function<R()> f)
        :   f_(std::move(f)) {
        start_on_new_thread_();
    }

    ~Future() {
        wait_();
    }

    Future(Future&&) = delete;
    Future& operator=(Future&&) = delete;
    Future(const Future&) = delete;
    Future& operator=(const Future&) = delete;

    void result_type get() {
        wait_();

        if (eptr_) {
            std::rethrow_exception(eptr_);
        }
        
        return result_;
    }
        
private:
    std::function<R()> f_;
    std::exception_ptr eptr_;
    std::thread thread_;
    result_type result_;

    void start_on_new_thread_() {
        thread_ = std::thread([this]{
            try {
                result_ = f_();
            } catch(...) {
                eptr_ = std::current_exception();
            }
        });
    }

    void wait_() {
        if (thread_.joinable()) {
            thread_.join();
        }
    }
};

The future is constructed from a callable f, which is immediately started on a new thread. Actually the callable itself is not passed to the new thread. Instead f is wrapped in a try-catch block that is in turn wrapped in a temporary callable. This temporary is what is passed to the new thread.

[this]{
    try {
        result_ = f_();
    } catch(...) {
        eptr_ = std::current_exception();
    }
}

The reason for this is two fold. The temporary callable never throws, even if the user-supplied callable f does. This is good news because throwing on another thread causes std::terminate() is execute, which we do not want. The other reason is we want to catch and store any exception thrown by f so that we can forward it if get() is later called on the future.

(I said the temporary never throws, but this isn’t quite true. We run into issues storing the result/exception if this is destroyed before the callable finishes. This doesn’t happen, though, because the future’s destructor waits for the thread to join.)

The write to result_ from the callable executing thread is synchronized with any read from result_ (via get()) in the future owning thread. Synchronization is accomplished with thread_.join() inside the wait_() function.

Now on to the problems with the design.

Adding ready() to the future

A naive idea to check if the future is ready is to check if the thread executing it has joined.

bool ready() const {
    return !thread_.joinable();
}

But this is silly. The future would only be ready after we wait for it (which is where thread_.join() is called). But of course the future is ready after we wait for it! And it could be ready a lot earlier, too.

We could introduce a new boolean member variable ready_ that is returned by ready(). We initialize ready_ to false in the constructor, and set it after execution finishes (but before the executing thread is joined).

[this]{
    try {
        result_ = f_();
    } catch(...) {
        eptr_ = std::current_exception();
    }
    ready_ = true;
}

Unfortunately this leads to a race condition between the thread setting ready_ and the thread calling ready(). To deal with this, we could make ready_ atomic or use a mutex. Either idea throws cold water on making the future copyable or moveable, since atomics and mutexes are neither.

Why isn’t the future moveable?

You might think “What a minute! I understand why the future isn’t copyable. std::thread isn’t copyable. But putting aside our wish to add synchronization primitives, why isn’t the current future moveable? All of its member variables are!”

Adding default move operations to the future leads to incorrect and undefined behavior.

Future(Future&&) = default;
Future& operator=(Future&&) = default;

Consider what happens in the following code that moves a future.

auto bright = Future<int>([](){
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 314;
});

Future<int> bleak(std::move(bright));
std::cout << "The bleak future has value: " << bleak.get() << ".\n";

The code defines a bright future that sleeps for 1 second and then returns 314. The future is moved to bleak before getting its value. On my machine bleak.get() returns 0. Not 314. What happened? This issue is the capture of this in the lambda that gets executed on the future’s thread_.

[this]{
    try {
        result_ = f_();
    } catch(...) {
        eptr_ = std::current_exception();
    }
}

When bright future is constructed, it starts executing the lambda on bright.thread_. The lambda captures this, the memory location of bright (&bright). Inside the lambda result_ means bright.result_. This is true from the moment the lambda is created.

During construction of bleak, bright.thread_ is moved to bleak.thread_ and bright.result_ is moved to bleak.result_. Since bright.result_ is not initialized until the future is ready, bleak.result_ is likely initialized with garbage. When the future is ready, its result (as instructed by the lambda) is written to bright.result_, where we cannot acess it from bleak.get(). Instead bleak.get() returns the initialized garbage (0 on my machine).

If we add a 2 second pause before constructing bleak, then bleak.get() returns 314, but not for the correct reason. In the pause, the bright future becomes ready and stores 314 in bright.result_. This value is moved into bleak.result_ when its constructed.

auto bright = Future<int>([](){
    std::this_thread::sleep_for(std::chrono::seconds(1));
    return 314;
});

std::this_thread::sleep_for(std::chrono::seconds(2)); // Enjoy the bright future for a bit

Future<int> bleak(std::move(bright));
std::cout << "The bleak future has value: " << bleak.get() << ".\n";

We need a persistent location to store the result of the future outside of the future itself.

Making the future flexible

We’d like to move and copy our futures, and guarantee safe access to the future from multiple threads. The idea is to seperate the future from its data.

// FutureData is not copyable or moveable and can therefore have
// synchronization primitives (atomics, mutexes, condition variables), which
// are usually not copyable or moveable
template <typename R>
struct FutureData {
    std::exception_ptr eptr;
    R result;
    std::atomic<bool> ready;
};

// Future contains a pointer to its data and can be moved
template <typename R>
class Future {
    // Stuff here
private:
    std::unique_ptr<FutureData<R>> data_ptr_;
};

This works great. Moving the future is done by moving data_ptr_. But the data is always fixed in one location, even if the future is moved. So at the time the lambda is created, it knows where to store the result.

[this]{
    try {
        data_ptr_->result = f_();
    } catch(...) {
        data_ptr_->eptr = std::current_exception();
    }
}

This code is still no good. Suppose we start a bright future and move it to a bleak one as before. Recall that data_ptr_ in the above lambda really means bright.data_ptr_ because this captures bright at construction. After bright is moved to bleak, bleak.data_ptr_ points to the (unmoved) future data and bright.data_ptr_ is set to nullptr. The line data_ptr_->result = f_() in the lambda attempts to dereference a nullptr. Not good.

We need two pointers to the future data. One to sit in the future itself as we have, and the other to sit in the lambda. I’ll make both pointers owning, meaning that each will keep the future data alive. This requires that we swap out std::unique_ptr for std::shared_ptr in the future.

class Future {
    // Stuff here
private:
    std::shared_ptr<FutureData<R>> data_ptr_;
};

We can then copy data_ptr_ into the lambda at construction.

[data_ptr=data_ptr_, f=f_]{
    try {
        data_ptr->result = f();
    } catch(...) {
        data_ptr->eptr = std::current_exception();
    }
}

Here I use initializer capture. The variable data_ptr is copy constructed from data_ptr_ and f is copy constructed from f_ when the lambda is created. Simply writing [data_ptr_, f_] to capture these variables by value directly does not compile. Clang complains that data_ptr_ and f_ in the capture list do not name variables. Writing [=] would implicitly capture this because data_ptr_ is interpreted as this->data_ptr_ within the future class. Initializer capture is the way to get a new copy of the shared pointer.

There is an entirely different route we could take to address the problem with the lambda capturing a particular future’s this. A particular future’s this is captured because the lambda is created in the context of Future. If instead the lambda were created in the context of FutureData, then this would capture the one and only FutureData. And that would be just fine.

If we move lambda creation to the FutureData struct, then start_() must belong to to FutureData. So the future data struct is more like a future control class.

Final attempt

Below is the code for the thread-safe future control block.

#include <thread>
#include <functional>
#include <exception>
#include <condition_variable>
#include <mutex>

template <typename R>
class FutureControlBlock {
private:
    std::function<R()> f_; // Callable that future executes
    std::thread thread_; // Thread executing the callable f_
    std::exception_ptr eptr_; // Stores exception (if any) from callable f_
    R result_; // Stores result of callable f_
    bool started_; // Initialized to false.  True after execution starts
                   // Once true, started_ is never set to false again.
    
    std::atomic<bool> ready_;
    mutable std::mutex ready_mutex_;
    std::condition_variable ready_cond_var_;


    mutable std::mutex started_mutex_; // mutex to protect started_
    std::mutex thread_mutex_; // mutex to protect thread_


    void join_thread_() {
        // Mutex is locked so multiple threads don't call thread._join()
        std::scoped_lock<std::mutex> lock(thread_mutex_);
        if (thread_.joinable()) {
            thread_.join();
        }
    }

    // We guarantee that start_() is invoked at most one time
    // across multiple thread calls to start() and start_on_new_thread()
    // Its invocation status is captured by started_
    // This means result_ and eptr_ are written by at most one thread
    // This happens before ready_ is set to true
    // which happens before any reads of those variables
    void start_() {
        try {
            result_ = f_();
        } catch(...) {
            eptr_ = std::current_exception();
        }

        {
            std::scoped_lock<std::mutex> lock(ready_mutex_);
            ready_ = true;
        }

        ready_cond_var_.notify_all();
    }

public:
    FutureControlBlock(std::function<R()> f, bool start)
        : f_(f), started_(false), ready_(false)
    {
        if (start) {
            start_on_new_thread();
        }
    }
    
    ~FutureControlBlock() { join_thread_(); }

    // Not default constructible
    FutureControlBlock() = delete;

    // Not copyable
    FutureControlBlock(const FutureControlBlock&) = delete;
    FutureControlBlock& operator=(const FutureControlBlock&) = delete;

    // Not moveable
    FutureControlBlock(FutureControlBlock&&) = delete;
    FutureControlBlock& operator=(FutureControlBlock&&) = delete;
    

    bool started() const
    {
        // Correct because the return value is constructed before the destruction of
        // local variables (i.e., the lock)
        std::scoped_lock<std::mutex> lock(started_mutex_);
        return started_;
    }

    void start() {
        {
            std::scoped_lock<std::mutex> lock(started_mutex_);
            if (started_) {
                return;
            }

            // Update started right away because this runs on current thread
            // Compare to start_on_new_thread()
            started_ = true; 
        }
        start_();
    }

    void start_on_new_thread()
    {
        std::scoped_lock<std::mutex> started_lock(started_mutex_);
        if (started_) {
            return;
        }

        // We allow early exit if started_ == true *before* trying to aquire thread_mutex_
        // which is unavailable during its join attempt
        
        // This is the only function that aquires a mutex when it already holds one
        // * started_mutex_ is aquired first
        // * thread_mutex_ is aquired second
        // * ready_mutex_ is aquired third (in the call to start_() on the other thread)
        // There is no cycle in the aquire sequence, so no deadlock
        std::scoped_lock<std::mutex> thread_lock(thread_mutex_);
        thread_ = std::thread([this]{ start_(); });
        started_ = true;  // Update started_ only after successful creation of thread
    }

    bool ready() const noexcept
    {
        std::scoped_lock<std::mutex> lock(ready_mutex_);
        return ready_;
    }

    void wait()
    {
        std::unique_lock<std::mutex> lock(ready_mutex_);
        ready_cond_var_.wait(lock, [this]{ return ready_ == true; } );
    }

    template <typename Rep, typename Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& rel_time)
    {
        std::unique_lock<std::mutex> lock(ready_mutex_);
        return ready_cond_var_.wait_for(lock, rel_time, [this]{ return ready_ == true; } );
    }

    template <typename Clock, typename Duration>
    bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time)
    {
        std::unique_lock<std::mutex> lock(ready_mutex_);
        return ready_cond_var_.wait_until(lock, timeout_time, [this]{ return ready_ == true; } );
    }

    R get()
    {
        wait();

        if (eptr_) {
            std::rethrow_exception(eptr_);
        }

        return result_;
    }

};

The class uses mutexes and is thread safe. The flag ready_ is protected with a mutex and synchronization in wait() is done with a condition variable. These changes are not needed to support wait(), but to support its variants wait_for() and wait_until() as well as the new method start() (in the first attempt we only had something like start_on_new_thread()). Without these changes, synchronization could still be done with join as before (and ready_ could be atomic).

The future class is just a view into the FutureControlBlock.

template <typename R>
class Future {
public:
    using result_type = R;

    Future()
        :   control_block_ptr_(nullptr),
            valid_(false) {}

    Future(std::function<R()> f, bool start=true)
        :   control_block_ptr_(std::make_shared<FutureControlBlock<R>>(std::move(f), start)),
            valid_(true) {}

    ~Future() = default;

    Future(Future&&) = default;
    Future& operator=(Future&&) = default;
    Future(const Future&) = default;
    Future& operator=(const Future&) = default;

    inline R get() const { return control_block_ptr_->get(); }
    inline void wait() const { control_block_ptr_->wait(); }
    
    template <typename Rep, typename Period>
    inline bool wait_for(const std::chrono::duration<Rep, Period>& rel_time) {
        return control_block_ptr_->wait_for(rel_time);
    }

    template <typename Clock, typename Duration>
    inline bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
        return control_block_ptr_->wait_until(timeout_time);
    }

    inline bool valid() const { return valid_; }
    inline bool ready() const { return control_block_ptr_->ready(); }
    inline void start_on_new_thread() const { control_block_ptr_->start_on_new_thread(); }
    inline void start() const { control_block_ptr_->start(); }
    inline bool started() const { return control_block_ptr_->started(); }

private:
    std::shared_ptr<FutureControlBlock<R>> control_block_ptr_;
    bool valid_;   
};

We also give a void total specialization that has almost no code change. It simply wraps the void-returning callable into an int-returning callable that is used to construct a FutureControlBlock<int>. The int is then discarded in the void-returning get() function. The code is below.

template <>
class Future<void> {
public:
    using result_type = void;

    Future()
        :   control_block_ptr_(nullptr),
            valid_(false) {}

    Future(std::function<void()> f, bool start=true)
        :   control_block_ptr_(
                std::make_shared<FutureControlBlock<int>>(
                    [f = std::move(f)] { f(); return 0; },
                    start)),
            valid_(true) {}

    ~Future() = default;

    Future(Future&&) = default;
    Future& operator=(Future&&) = default;
    Future(const Future&) = default;
    Future& operator=(const Future&) = default;

    inline void get() const { control_block_ptr_->get(); }
    inline void wait() const { control_block_ptr_->wait(); }
    
    template <typename Rep, typename Period>
    inline bool wait_for(const std::chrono::duration<Rep, Period>& rel_time) {
        return control_block_ptr_->wait_for(rel_time);
    }

    template <typename Clock, typename Duration>
    inline bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
        return control_block_ptr_->wait_until(timeout_time);
    }

    inline bool valid() const { return valid_; }
    inline bool ready() const { return control_block_ptr_->ready(); }
    inline void start_on_new_thread() const { control_block_ptr_->start_on_new_thread(); }
    inline void start() const { control_block_ptr_->start(); }
    inline bool started() const { return control_block_ptr_->started(); }

private:
    std::shared_ptr<FutureControlBlock<int>> control_block_ptr_;
    bool valid_;   
};

Here are some examples of using the future class. From the examples, you can see that this future is like a mixture of std::async, std::packaged_task, and std::future.

// Run 10 futures that are eagerly started on new threads
std::vector<Future<void>> futs;
for (int i = 0; i < 10; ++i) {
    futs.emplace_back([i]{
        std::this_thread::sleep_for(std::chrono::milliseconds(10*i));
        std::cout << "Done executing future " << i
            << " on thread " << std::this_thread::get_id() << ".\n";
    }, true);
}
    

// Define a future, but do not start it on construction
// Explicitly start it on main thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto f = Future<void>([]{
    std::cout << "This future runs on the main thread "
        << std::this_thread::get_id() << ".\n";
}, false);
std::cout << "On the main thread " << std::this_thread::get_id() << ".\n";
f.start();


// Define a future, but do not start it on construction
// Explicitly start it on another thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto g = Future<int>([]{ 
    std::cout << "This future runs on thread "
        << std::this_thread::get_id() << ".\n";

    return 314;
}, false);

g.start_on_new_thread();
std::cout << "The value of the future is " << g.get() << ".\n";


// Define a future, but do not start it on construction
// Manually start it on another thread after construction
std::this_thread::sleep_for(std::chrono::seconds(1));
auto h = Future<int>([]{
    std::cout << "This future runs on thread "
        << std::this_thread::get_id() << ".\n";
    
    return 42;
}, false);

std::thread thread([&h]{ h.start(); });
std::cout << "The value of the future is " << h.get() << ".\n";
thread.join();

To design a future-promise very little needs to change. We keep the non-copyable and non-moveable FutureControlBlock (called the shared state in the standard) and create seperate classes to access the block. Rather than have one accessor that both reads (via get()) and writes (via start() and start_on_new_thread()) to the control block as we do with the design of the above future, we create separate readers and writers.

The reader accessor is traditionally called a Future (with methods get(), ready(), and the variants of wait()). The writer accessor is usually called a Promise (with methods set_value() and set_exception()). We can have a seperate TaskPromise writer (similar to std::packaged_task) with methods like set_task(), start_task() and start_task_on_new_thread(). The control block only needs result_, eptr_, and ready_. We can move thread_, f_, and the various start methods to the TaskPromise writer.

The write to the control block is a one-time operation. It therefore makes sense to make the writers (Promise and TaskPromise) moveable, but not copyable. On the other hand, the readers (Future) can be moveable and copyable. (Note that std::future is only moveable, but std::shared_future is also copyable. std::promise is only moveable.)

One final question to address is who creates the control block when we have separate readers and writers? One design is to directly create the control block and give it get_future() and get_promise() methods. A more common design is to have the writer (Promise, TaskPromise) create the control block and give the writer a get_future() method that returns a control block reader.

The code is here. It includes code for tasks that can be chained as in the following snippet.

auto t = MakeTask([]{
    std::cout << "Task 1 on thread " << std::this_thread::get_id() << ".\n";
    return "Secret message";
}).then([](Future<const char*> input){
    std::cout << "Task 2 on thread " << std::this_thread::get_id() << ". "
        << "First task says: " << input.get() << ".\n";
    return 10;
}).then([](Future<int> input){
    std::cout << "Task 3 on thread " << std::this_thread::get_id() << ".\n";
    return 10 + input.get();
});

t.start();
std::cout << "Task final value is: " << t.get_future().get() << ".\n";

The chaining is dynamic and can be used in a for-loop as long as the return type is constant.

auto t = MakeTask([]{
    std::cout << "Task 1 on thread " << std::this_thread::get_id() << ".\n";
    return "Secret message";
}).then([](Future<const char*> input){
    std::cout << "Task 2 on thread " << std::this_thread::get_id() << ". "
        << "First task says: " << input.get() << ".\n";
    return 10;
});

for (int i = 3; i < 25; ++i) {
    t = t.then([i](Future<int> input){
        std::cout << "Task " << i << " on thread " << std::this_thread::get_id() << ".\n";
        return 10 + input.get();
    });
}

t.start();
std::cout << "Task final value is: " << t.get_future().get() << ".\n";

Tasks are moveable, but not copyable. The then() method moves from *this and returns a new task. I lastly want to go over the code for then() because I struggled getting it to work at first.

template <typename Callable>
auto then(Callable next) {
    using R2 = typename std::result_of_t<Callable(Future<R>)>;

    auto task_ptr = std::make_shared<Task<R>>(std::move(*this));

    return Task<R2>([task_ptr=task_ptr, next=std::move(next)] { 
        task_ptr->start_on_new_thread();
        auto res = task_ptr->get_future();
        res.wait();
        return next(std::move(res));
    });
}

This works by moving *this into a heap allocated location and giving the returned task a way to invoke *this through task_ptr. A more explicit representation of what is going on is a singly linked list of tasks arranged in reverse order; the head pointing to the last task to complete and the tail pointing to the first. Each node invokes previous work, waits for it to finish, and then does its own work. So the head of the list (last task) invokes the next node (penultimate task), waits for it to finish, and then does its own work. It seems far more natural for the list to be in order, with each node doing its own work, waiting, and then invoking the next node. One issue with that approach has to do with type. The final result of the task chaining has type parametrized by the return value of the last task. It is convenient having this be the head of the list so that each link in the chain knows its type.

Why use task_ptr and not directly capture *this in the lambda like this [task=std::move(*this)]? This would nest tasks within tasks and make larger and larger objects with each then() call. Beyond that two compilation errors occur. The first happens because capturing *this makes the lambda itself moveable, but not copyable. This matters because the Task<R> constructor creates a std::function<R()> from the passed callable and std::function requires the passed callable be copy constructable. The second error occurs because items captured by the lambda are effectively const. The lambda’s operator() is defined const so writing task.start_on_new_thread() in the lambda fails because the captured task is const, but start_on_new_thread() is not a const-method. (Notice we did not have this problem when capturing this earlier because the capture simply makes this a const-pointer, but not a pointer to const-object. Capturing *this has a different effect.)

Both these problems can be overcome. To overcome the first, we can remove std::function from the task template and parametrize the template on the callable itself. The second problem can be overcome by marking the lambda mutable. The template <Callable> Task2 in Task.h takes this approach. The downside for not dynamically allocating new space with each then() call is that the size (and type) of task object grows with each then() call. Since the type changes, it cannot be used in a for-loop like the first version.