std::jthread and cooperative cancellation with stop token

Tutorial | Dec 12, 2020 | hkumar 

C++ jthreads ir-md-block-image ir-md-block-image-w-70 ir-md-block-image-bottom-line

Overview

A std::jthread instance represents an automatically joining and cooperatively cancellable thread. std::jthread has an exception-safe thread termination flow compared to std::thread and can replace it in most situations with little or no code changes. A quick comparison before we go to details:

void comparison() {

 {  // Block 
  //std::thread
  std::thread t([]() {
   std::this_thread::sleep_for(1s);
  });

  //Must be joined explicitly (unless detached).
  //Otherwise t.~thread() calls std::terminate()
  t.join();
 }

 {
  //std::jthread
  std::jthread jt([]() {
   std::this_thread::sleep_for(1s);
  });
  //Joins automatically in jt.~jthread()
 }
}

Why std::jthread?

A std::thread encapsulates an OS-specific thread object (e.g., a pthread on a POSIX system), which should be joined on termination so the OS can reclaim the resources associated with the thread. A thread can be marked as detached — in that case, the OS automatically recycles its resources. A detached thread is not joinable.

A std::thread instance can be in either the joinable or unjoinable state. A std::thread that is default constructed, detached, or moved is unjoinable. We must join a joinable std::thread explicitly before the end of its life; otherwise, the std::thread's destructor calls std::terminate, whose default behavior is to abort the process.

Therefore, unexpected and hard to troubleshoot crashes are possible with std::thread:

void unsafe() {

 std::thread th([](){
  //Some work...
 });

 /* ..more code...

  An exception can be raised here!!

  If exception is raised here,
  th.~thread() will invoke std::terminate()
 */

 th.join();
}

We can make the above code exception-safe with a try-catch block. But that is only one specific pattern of many such possibilities, and it highlights that the std::thread does not provide satisfactory RAII. Even (move) assigning to a joinable std::thread also terminates the process.

std::jthread addresses this problem by automatically joining in the destructor (and on move-assignment) if the thread is joinable.

However, some might suggest that explicitly not calling join is a programming error and would prefer the application to terminate in that case. Joining in destructor provides exception-safety, but it can lead to unexpected freezes because the join() is a blocking call waiting for another thread to end. Therefore, std::jthread also makes a stop request to the thread before joining during destruction. Here is a typical implementation of std::jthread's destructor:

~jthread() {
  if(joinable()) {
   request_stop(); //More on stop request below.
   join();
  }
}

Politely requesting a thread to stop instead of killing it is known as cooperative cancellation, which we will cover in the next section.

Cooperative cancellation

C++20 also introduced a framework for the cooperative cancellation of threads based on a cancellation token. Conceptually, the top-level function of a thread receives a cancellation token from the originating function. The thread operation listens for cancellation requests on the token. The originating function or the object can use the token anytime to request the thread to stop.

Most threading platforms (e.g., pthreads) support a few ways to interrupt a thread, from abruptly killing to politely asking a thread to stop. But cooperative or polite cancellation is the only way that works sanely. See Herb Sutter's article: Interrupt Politely.

For clear separation, the functionality of receiving stop notifications and requesting a stop is divided into two types — std::stop_token and std::stop_source. A std::stop_token instance can be queried for stop notifications. And, std::stop_source provides functions to get a std::stop_token instance and send a stop notification to its associated tokens. An example is in order:

void data_processors() {

 using namespace std::literals::chrono_literals;

 //Create a stop source
 std::stop_source source;

 //Data processor thread function (lambda) 
 // gets a stop token from the stop source.
 std::thread 
 processor1([stoken=ssource.get_token()]() {
  while(!stoken.stop_requested()) {
   //Process data...
   std::this_thread::sleep_for(1s);
  }     
 });    


 //Data processor thread function (lambda) 
 // gets a stop token from the stop source.
 std::thread 
 processor2([stoken=ssource.get_token()]() {
  while(!stoken.stop_requested()) {
    //Process data...
    std::this_thread::sleep_for(1s);
  }     
 });

 //Sleep for few seconds
 std::this_thread::sleep_for(5s);

 //Request stop. This would stop both data processors.
 ssource.request_stop();

 //Join the data processor threads
 processor1.join();
 processor2.join();
}

As shown above, the originating function (data_processors) owns a std::stop_source. It passes a std::stop_token to each of the processor operations from the same stop source. It then sends a stop request to both tokens through the source. Both operations actively monitor their tokens, and they stop on request.

However, threads cannot always actively monitor a stop token. For instance, a thread waiting on a condition variable cannot check a stop condition unless it is signaled. For that reason, a callback mechanism is provided through std::stop_callback. A std::stop_callback instance registers a callback function for a given stop token. The callback is invoked when the token receives a stop request.

The following example shows how we can use std::stop_callback to signal a thread waiting on a std::condition_variable on a stop request:

using namespace std::literals::chrono_literals;

std::queue<int> jobs;
std::mutex mut;
std::condition_variable cv;

void worker(std::stop_token stoken) {
 //Register a stop callback  
 std::stop_callback cb(stoken, []() {
  cv.notify_all(); //Wake thread on stop request
 });

 while (true) {
  int jobId = -1;

  { //Aquire a jobId in lock
   std::unique_lock lck(mut);
   cv.wait(lck, [stoken]() {
    //Condition for wake up
    return jobs.size() > 0 ||
           stoken.stop_requested();
   });

   if (stoken.stop_requested()) { //Stop if requested to stop
    break;
   }
   jobId = jobs.front(); //There is jobId. Grab it.
   jobs.pop();
  } //End of locked block

  //Do job here outside the lock
  std::cout << std::this_thread::get_id() << " "
              << "Doing job " << jobId << "\n";
 } //End of while loop
}

void manager() {

 std::stop_source ssource; //Create a stop source

 //Create 2 workers and pass stop tokens
 std::thread worker1(worker, ssource.get_token());
 std::thread worker2(worker, ssource.get_token());
 //Enqueue some jobs
 for (int i = 0; i < 5; i++) {
  { //Locked block
    std::unique_lock lck(mut);
    jobs.push(i);
    cv.notify_one(); //Wakes up only one worker
  }
  std::this_thread::sleep_for(1s);
 }

 //Stop all workers
 ssource.request_stop();
 //Join threads
 worker1.join();
 worker2.join();
}

std::stop_callback constructor registers a callback atomically. The callback executes in the thread that calls request_stop() on the associated std::stop_source, but it is invoked immediately in the registering thread if the stop has already been requested before the registration.

In fact, we don't have to use std::stop_callback explicitly when waiting on a condition variable. C++20 introduced a wait function in std::condition_variable_any that takes a std::stop_token and awakes a waiting thread on stop request. Unlike std::condition_variable, the std::condition_variable_any can work with any lock type. Effectively, it is a feature-rich wrapper on std::condition_variable and uses a std::stop_callback to wake the thread. Below code shows the above worker code with a std::condition_variable_any:

//...
std::condition_variable_any cv_any;

void worker_cv_any(std::stop_token stoken) {
 while (true) {
  int jobId = -1;

  { //Locked block
   std::unique_lock lck(mut);

   //wait() function takes a stop token and predicate
   // and returns 'predicate()' on signal
   if(!cv_any.wait(lck, stoken, []() {
       return jobs.size() > 0; //Condition to wake
      })) {
    /*Predicate returned false.
      Therefore woke up because of a stop request. */
    break; //Leave
   }
   //Grab jobId
   jobId = jobs.front();
   jobs.pop();
  }

  //Do job here outside the lock
  std::cout << std::this_thread::get_id() << " "
              << "Doing job " << jobId << "\n";
 } //End of while loop
}

std::stop_source, std::stop_token, and std::stop_callback are lightweight objects that refer to a shared stop-state. A new stop-state is created when a std::stop_source is default constructed. Any further copies of a std::stop_source, associated std::stop_token(s), and std::stop_callback(s) refer to the same stop-state.

A stop-state records a stop event and maintains the callback registrations. It stays alive as long as there is at least one object referring to it. The following schematic shows the typical relationship among these objects:

Cooperative Cancellation in C++20


Cooperative cancellation in std::jthread

std::jthread utilizes the C++20 cooperative cancellation model. It creates and owns a std::stop_source. If initialized with a function that accepts a std::stop_token, std::jthread gets a stop token from its stop source and passes it to the function.

A std::jthread's top-level function doesn't have to take a stop token, but a long-running thread should always have the means to honor a cancellation request. Here is an example of std::jthread cancellation:

void jthread_cancel() {
 using namespace std::literals::chrono_literals;
 //Lambda takes a stop token
 std::jthread jt([](std::stop_token stoken) {
  while (!stoken.stop_requested()) {
    //Working...
    std::this_thread::sleep_for(1s);
  }
 });

 std::this_thread::sleep_for(5s);
 //Thread is stopped and joined in jt.~jthread() here
}

There are two points worth noting here. One, it doesn't seem to be specified whether std::jthread constructor initializes a stop-state or not if the passed function does not take a std::stop_token. Two, std::jthread cannot be initialized with an external std::stop_source. These points could be important in some cases because they can affect the application design.

Conclusion

Automatic joining and cooperative cancellation in std::jthread makes it easier to write safer code.

Cooperative cancellation in C++20 is inspired by a similar framework in .NET for the cancellation of managed threads. This framework is not tied to std::jthread and can be used independently even with std::thread or other async operations.

References

Discussion about std::thread and RAII

std::jthread proposal documents

Cancellation in Managed Threads - .NET 4