diff options
author | Oliver Reiche <oliver.reiche@huawei.com> | 2021-10-22 15:50:47 +0200 |
---|---|---|
committer | Oliver Reiche <oliver.reiche@huawei.com> | 2022-07-06 18:49:49 +0200 |
commit | a71a4b5015327b15fb0bb4fea16c43f21ef0616c (patch) | |
tree | 10596e9c89659845c7cc6d3bf632a07fe407af86 /src | |
parent | dc1129c79931d4391c3bcf6b629758fab7038219 (diff) | |
download | justbuild-a71a4b5015327b15fb0bb4fea16c43f21ef0616c.tar.gz |
TaskSystem: Fix early shutdown
... conceptually, it was possible that a previous task
decrements the `num_threads_running_` counter before it is
incremented by the next task. Therefore, we have to unify
the queue and thread status in a single counter
(`total_workload_`) and ensure that woken threads increment
it before decrementing it for popping a queue.
Diffstat (limited to 'src')
-rw-r--r-- | src/buildtool/multithreading/notification_queue.hpp | 84 | ||||
-rw-r--r-- | src/buildtool/multithreading/task_system.cpp | 13 | ||||
-rw-r--r-- | src/buildtool/multithreading/task_system.hpp | 3 |
3 files changed, 26 insertions, 74 deletions
diff --git a/src/buildtool/multithreading/notification_queue.hpp b/src/buildtool/multithreading/notification_queue.hpp index 7e79aa43..5965fde0 100644 --- a/src/buildtool/multithreading/notification_queue.hpp +++ b/src/buildtool/multithreading/notification_queue.hpp @@ -11,87 +11,42 @@ #include "src/buildtool/multithreading/task.hpp" #include "src/utils/cpp/atomic.hpp" -// Flag that can block the caller until it is set. Cannot be cleared after set. -class WaitableOneWayFlag { - public: - // Clear flag. Essentially a noop, if it was set before. - void Clear() { - if (not was_set_) { - initial_ = false; - } - } - - // Set flag. Essentially a noop, if it was set before. - void Set() { - if (not was_set_) { - was_set_ = true; - was_set_.notify_all(); - } - } - - // Blocks caller until it is set, if it was ever cleared. - void WaitForSet() { - if (not was_set_ and not initial_) { - was_set_.wait(false); - } - } - - private: - atomic<bool> was_set_{}; - bool initial_{true}; -}; - // Counter that can block the caller until it reaches zero. class WaitableZeroCounter { - enum class Status { Init, Wait, Reached }; - public: explicit WaitableZeroCounter(std::size_t init = 0) : count_{init} {} - // Essentially a noop, if count reached zero since last wait call. void Decrement() { - if (status_ != Status::Reached and --count_ == 0) { - if (status_ == Status::Wait) { - status_ = Status::Reached; - status_.notify_all(); - } + if (--count_ == 0) { + count_.notify_all(); } } - // Essentially a noop, if count reached zero since last wait call. - void Increment() { - if (status_ != Status::Reached) { - ++count_; - } - } + void Increment() { ++count_; } - // Blocks caller until count reached zero, since last call to this method. void WaitForZero() { - status_ = Status::Wait; - if (count_ != 0) { - status_.wait(Status::Wait); + auto val = count_.load(); + while (val != 0) { // loop to protect against spurious wakeups + count_.wait(val); + val = count_.load(); } - status_ = Status::Reached; } private: - std::atomic<std::size_t> count_{}; - atomic<Status> status_{Status::Init}; + atomic<std::size_t> count_{}; }; class NotificationQueue { public: - NotificationQueue(gsl::not_null<WaitableOneWayFlag*> queues_read, - gsl::not_null<WaitableZeroCounter*> num_threads_running) - : queues_read_{std::move(queues_read)}, - num_threads_running_{std::move(num_threads_running)} {} + explicit NotificationQueue( + gsl::not_null<WaitableZeroCounter*> total_workload) + : total_workload_{std::move(total_workload)} {} NotificationQueue(NotificationQueue const& other) = delete; NotificationQueue(NotificationQueue&& other) noexcept : queue_{std::move(other.queue_)}, done_{other.done_}, - queues_read_{std::move(other.queues_read_)}, - num_threads_running_{std::move(other.num_threads_running_)} {} + total_workload_{std::move(other.total_workload_)} {} ~NotificationQueue() = default; [[nodiscard]] auto operator=(NotificationQueue const& other) @@ -110,9 +65,9 @@ class NotificationQueue { return !queue_.empty() || done_; }; if (not there_is_something_to_pop_or_we_are_done()) { - num_threads_running_->Decrement(); + total_workload_->Decrement(); ready_.wait(lock, there_is_something_to_pop_or_we_are_done); - num_threads_running_->Increment(); + total_workload_->Increment(); } if (queue_.empty()) { @@ -120,7 +75,7 @@ class NotificationQueue { } auto t = std::move(queue_.front()); queue_.pop_front(); - queues_read_->Set(); + total_workload_->Decrement(); return t; } @@ -133,7 +88,7 @@ class NotificationQueue { } auto t = std::move(queue_.front()); queue_.pop_front(); - queues_read_->Set(); + total_workload_->Decrement(); return t; } @@ -145,7 +100,7 @@ class NotificationQueue { std::unique_lock lock{mutex_}; queue_.emplace_back(std::forward<FunctionType>(f)); } - queues_read_->Clear(); + total_workload_->Increment(); ready_.notify_one(); } @@ -160,7 +115,7 @@ class NotificationQueue { } queue_.emplace_back(std::forward<FunctionType>(f)); } - queues_read_->Clear(); + total_workload_->Increment(); ready_.notify_one(); return true; } @@ -181,8 +136,7 @@ class NotificationQueue { bool done_{false}; std::mutex mutex_{}; std::condition_variable ready_{}; - gsl::not_null<WaitableOneWayFlag*> queues_read_; - gsl::not_null<WaitableZeroCounter*> num_threads_running_; + gsl::not_null<WaitableZeroCounter*> total_workload_; }; #endif // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP diff --git a/src/buildtool/multithreading/task_system.cpp b/src/buildtool/multithreading/task_system.cpp index 8c976a2f..4af57d0d 100644 --- a/src/buildtool/multithreading/task_system.cpp +++ b/src/buildtool/multithreading/task_system.cpp @@ -7,9 +7,9 @@ TaskSystem::TaskSystem() : TaskSystem(std::thread::hardware_concurrency()) {} TaskSystem::TaskSystem(std::size_t number_of_threads) : thread_count_{std::max(1UL, number_of_threads)}, - num_threads_running_{thread_count_} { + total_workload_{thread_count_} { for (std::size_t index = 0; index < thread_count_; ++index) { - queues_.emplace_back(&queues_read_, &num_threads_running_); + queues_.emplace_back(&total_workload_); } for (std::size_t index = 0; index < thread_count_; ++index) { threads_.emplace_back([&, index]() { Run(index); }); @@ -19,11 +19,10 @@ TaskSystem::TaskSystem(std::size_t number_of_threads) TaskSystem::~TaskSystem() { // When starting a new task system all spawned threads will immediately go // to sleep and wait for tasks. Even after adding some tasks, it can take a - // while until the first thread wakes up. Therefore, we first need to wait - // for the queues being read, before we can wait for all threads to become - // idle. - queues_read_.WaitForSet(); - num_threads_running_.WaitForZero(); + // while until the first thread wakes up. Therefore, we need to wait for the + // total workload (number of active threads _and_ total number of queued + // tasks) to become zero. + total_workload_.WaitForZero(); for (auto& q : queues_) { q.done(); } diff --git a/src/buildtool/multithreading/task_system.hpp b/src/buildtool/multithreading/task_system.hpp index 20950142..6387988a 100644 --- a/src/buildtool/multithreading/task_system.hpp +++ b/src/buildtool/multithreading/task_system.hpp @@ -54,8 +54,7 @@ class TaskSystem { std::vector<std::thread> threads_{}; std::vector<NotificationQueue> queues_{}; std::atomic<std::size_t> index_{0}; - WaitableOneWayFlag queues_read_{}; - WaitableZeroCounter num_threads_running_{}; + WaitableZeroCounter total_workload_{}; static constexpr std::size_t kNumberOfAttempts = 5; |