summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOliver Reiche <oliver.reiche@huawei.com>2021-10-22 15:50:47 +0200
committerOliver Reiche <oliver.reiche@huawei.com>2022-07-06 18:49:49 +0200
commita71a4b5015327b15fb0bb4fea16c43f21ef0616c (patch)
tree10596e9c89659845c7cc6d3bf632a07fe407af86
parentdc1129c79931d4391c3bcf6b629758fab7038219 (diff)
downloadjustbuild-a71a4b5015327b15fb0bb4fea16c43f21ef0616c.tar.gz
TaskSystem: Fix early shutdown
... conceptually, it was possible that a previous task decrements the `num_threads_running_` counter before it is incremented by the next task. Therefore, we have to unify the queue and thread status in a single counter (`total_workload_`) and ensure that woken threads increment it before decrementing it for popping a queue.
-rw-r--r--src/buildtool/multithreading/notification_queue.hpp84
-rw-r--r--src/buildtool/multithreading/task_system.cpp13
-rw-r--r--src/buildtool/multithreading/task_system.hpp3
-rw-r--r--test/buildtool/multithreading/task_system.test.cpp13
4 files changed, 31 insertions, 82 deletions
diff --git a/src/buildtool/multithreading/notification_queue.hpp b/src/buildtool/multithreading/notification_queue.hpp
index 7e79aa43..5965fde0 100644
--- a/src/buildtool/multithreading/notification_queue.hpp
+++ b/src/buildtool/multithreading/notification_queue.hpp
@@ -11,87 +11,42 @@
#include "src/buildtool/multithreading/task.hpp"
#include "src/utils/cpp/atomic.hpp"
-// Flag that can block the caller until it is set. Cannot be cleared after set.
-class WaitableOneWayFlag {
- public:
- // Clear flag. Essentially a noop, if it was set before.
- void Clear() {
- if (not was_set_) {
- initial_ = false;
- }
- }
-
- // Set flag. Essentially a noop, if it was set before.
- void Set() {
- if (not was_set_) {
- was_set_ = true;
- was_set_.notify_all();
- }
- }
-
- // Blocks caller until it is set, if it was ever cleared.
- void WaitForSet() {
- if (not was_set_ and not initial_) {
- was_set_.wait(false);
- }
- }
-
- private:
- atomic<bool> was_set_{};
- bool initial_{true};
-};
-
// Counter that can block the caller until it reaches zero.
class WaitableZeroCounter {
- enum class Status { Init, Wait, Reached };
-
public:
explicit WaitableZeroCounter(std::size_t init = 0) : count_{init} {}
- // Essentially a noop, if count reached zero since last wait call.
void Decrement() {
- if (status_ != Status::Reached and --count_ == 0) {
- if (status_ == Status::Wait) {
- status_ = Status::Reached;
- status_.notify_all();
- }
+ if (--count_ == 0) {
+ count_.notify_all();
}
}
- // Essentially a noop, if count reached zero since last wait call.
- void Increment() {
- if (status_ != Status::Reached) {
- ++count_;
- }
- }
+ void Increment() { ++count_; }
- // Blocks caller until count reached zero, since last call to this method.
void WaitForZero() {
- status_ = Status::Wait;
- if (count_ != 0) {
- status_.wait(Status::Wait);
+ auto val = count_.load();
+ while (val != 0) { // loop to protect against spurious wakeups
+ count_.wait(val);
+ val = count_.load();
}
- status_ = Status::Reached;
}
private:
- std::atomic<std::size_t> count_{};
- atomic<Status> status_{Status::Init};
+ atomic<std::size_t> count_{};
};
class NotificationQueue {
public:
- NotificationQueue(gsl::not_null<WaitableOneWayFlag*> queues_read,
- gsl::not_null<WaitableZeroCounter*> num_threads_running)
- : queues_read_{std::move(queues_read)},
- num_threads_running_{std::move(num_threads_running)} {}
+ explicit NotificationQueue(
+ gsl::not_null<WaitableZeroCounter*> total_workload)
+ : total_workload_{std::move(total_workload)} {}
NotificationQueue(NotificationQueue const& other) = delete;
NotificationQueue(NotificationQueue&& other) noexcept
: queue_{std::move(other.queue_)},
done_{other.done_},
- queues_read_{std::move(other.queues_read_)},
- num_threads_running_{std::move(other.num_threads_running_)} {}
+ total_workload_{std::move(other.total_workload_)} {}
~NotificationQueue() = default;
[[nodiscard]] auto operator=(NotificationQueue const& other)
@@ -110,9 +65,9 @@ class NotificationQueue {
return !queue_.empty() || done_;
};
if (not there_is_something_to_pop_or_we_are_done()) {
- num_threads_running_->Decrement();
+ total_workload_->Decrement();
ready_.wait(lock, there_is_something_to_pop_or_we_are_done);
- num_threads_running_->Increment();
+ total_workload_->Increment();
}
if (queue_.empty()) {
@@ -120,7 +75,7 @@ class NotificationQueue {
}
auto t = std::move(queue_.front());
queue_.pop_front();
- queues_read_->Set();
+ total_workload_->Decrement();
return t;
}
@@ -133,7 +88,7 @@ class NotificationQueue {
}
auto t = std::move(queue_.front());
queue_.pop_front();
- queues_read_->Set();
+ total_workload_->Decrement();
return t;
}
@@ -145,7 +100,7 @@ class NotificationQueue {
std::unique_lock lock{mutex_};
queue_.emplace_back(std::forward<FunctionType>(f));
}
- queues_read_->Clear();
+ total_workload_->Increment();
ready_.notify_one();
}
@@ -160,7 +115,7 @@ class NotificationQueue {
}
queue_.emplace_back(std::forward<FunctionType>(f));
}
- queues_read_->Clear();
+ total_workload_->Increment();
ready_.notify_one();
return true;
}
@@ -181,8 +136,7 @@ class NotificationQueue {
bool done_{false};
std::mutex mutex_{};
std::condition_variable ready_{};
- gsl::not_null<WaitableOneWayFlag*> queues_read_;
- gsl::not_null<WaitableZeroCounter*> num_threads_running_;
+ gsl::not_null<WaitableZeroCounter*> total_workload_;
};
#endif // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP
diff --git a/src/buildtool/multithreading/task_system.cpp b/src/buildtool/multithreading/task_system.cpp
index 8c976a2f..4af57d0d 100644
--- a/src/buildtool/multithreading/task_system.cpp
+++ b/src/buildtool/multithreading/task_system.cpp
@@ -7,9 +7,9 @@ TaskSystem::TaskSystem() : TaskSystem(std::thread::hardware_concurrency()) {}
TaskSystem::TaskSystem(std::size_t number_of_threads)
: thread_count_{std::max(1UL, number_of_threads)},
- num_threads_running_{thread_count_} {
+ total_workload_{thread_count_} {
for (std::size_t index = 0; index < thread_count_; ++index) {
- queues_.emplace_back(&queues_read_, &num_threads_running_);
+ queues_.emplace_back(&total_workload_);
}
for (std::size_t index = 0; index < thread_count_; ++index) {
threads_.emplace_back([&, index]() { Run(index); });
@@ -19,11 +19,10 @@ TaskSystem::TaskSystem(std::size_t number_of_threads)
TaskSystem::~TaskSystem() {
// When starting a new task system all spawned threads will immediately go
// to sleep and wait for tasks. Even after adding some tasks, it can take a
- // while until the first thread wakes up. Therefore, we first need to wait
- // for the queues being read, before we can wait for all threads to become
- // idle.
- queues_read_.WaitForSet();
- num_threads_running_.WaitForZero();
+ // while until the first thread wakes up. Therefore, we need to wait for the
+ // total workload (number of active threads _and_ total number of queued
+ // tasks) to become zero.
+ total_workload_.WaitForZero();
for (auto& q : queues_) {
q.done();
}
diff --git a/src/buildtool/multithreading/task_system.hpp b/src/buildtool/multithreading/task_system.hpp
index 20950142..6387988a 100644
--- a/src/buildtool/multithreading/task_system.hpp
+++ b/src/buildtool/multithreading/task_system.hpp
@@ -54,8 +54,7 @@ class TaskSystem {
std::vector<std::thread> threads_{};
std::vector<NotificationQueue> queues_{};
std::atomic<std::size_t> index_{0};
- WaitableOneWayFlag queues_read_{};
- WaitableZeroCounter num_threads_running_{};
+ WaitableZeroCounter total_workload_{};
static constexpr std::size_t kNumberOfAttempts = 5;
diff --git a/test/buildtool/multithreading/task_system.test.cpp b/test/buildtool/multithreading/task_system.test.cpp
index 21e1ca6e..d34e3e4b 100644
--- a/test/buildtool/multithreading/task_system.test.cpp
+++ b/test/buildtool/multithreading/task_system.test.cpp
@@ -174,14 +174,11 @@ TEST_CASE("All threads run until work is done", "[task_system]") {
// Wait some time for all threads to go to sleep.
std::this_thread::sleep_for(1s);
- // Run singe task that creates the actual store tasks. All threads
- // should stay alive until their corresponding queue is filled.
- ts.QueueTask([&ts, &store_id] {
- // One task per thread (assumes round-robin push to queues).
- for (std::size_t i{}; i < ts.NumberOfThreads(); ++i) {
- ts.QueueTask([&store_id] { store_id(); });
- }
- });
+ // All threads should stay alive until their corresponding queue is
+ // filled. One task per thread (assumes round-robin push to queues).
+ for (std::size_t i{}; i < ts.NumberOfThreads(); ++i) {
+ ts.QueueTask([&store_id] { store_id(); });
+ }
}
CHECK(tids.size() == kNumThreads);
}