summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/buildtool/multithreading/notification_queue.hpp84
-rw-r--r--src/buildtool/multithreading/task_system.cpp13
-rw-r--r--src/buildtool/multithreading/task_system.hpp3
3 files changed, 26 insertions, 74 deletions
diff --git a/src/buildtool/multithreading/notification_queue.hpp b/src/buildtool/multithreading/notification_queue.hpp
index 7e79aa43..5965fde0 100644
--- a/src/buildtool/multithreading/notification_queue.hpp
+++ b/src/buildtool/multithreading/notification_queue.hpp
@@ -11,87 +11,42 @@
#include "src/buildtool/multithreading/task.hpp"
#include "src/utils/cpp/atomic.hpp"
-// Flag that can block the caller until it is set. Cannot be cleared after set.
-class WaitableOneWayFlag {
- public:
- // Clear flag. Essentially a noop, if it was set before.
- void Clear() {
- if (not was_set_) {
- initial_ = false;
- }
- }
-
- // Set flag. Essentially a noop, if it was set before.
- void Set() {
- if (not was_set_) {
- was_set_ = true;
- was_set_.notify_all();
- }
- }
-
- // Blocks caller until it is set, if it was ever cleared.
- void WaitForSet() {
- if (not was_set_ and not initial_) {
- was_set_.wait(false);
- }
- }
-
- private:
- atomic<bool> was_set_{};
- bool initial_{true};
-};
-
// Counter that can block the caller until it reaches zero.
class WaitableZeroCounter {
- enum class Status { Init, Wait, Reached };
-
public:
explicit WaitableZeroCounter(std::size_t init = 0) : count_{init} {}
- // Essentially a noop, if count reached zero since last wait call.
void Decrement() {
- if (status_ != Status::Reached and --count_ == 0) {
- if (status_ == Status::Wait) {
- status_ = Status::Reached;
- status_.notify_all();
- }
+ if (--count_ == 0) {
+ count_.notify_all();
}
}
- // Essentially a noop, if count reached zero since last wait call.
- void Increment() {
- if (status_ != Status::Reached) {
- ++count_;
- }
- }
+ void Increment() { ++count_; }
- // Blocks caller until count reached zero, since last call to this method.
void WaitForZero() {
- status_ = Status::Wait;
- if (count_ != 0) {
- status_.wait(Status::Wait);
+ auto val = count_.load();
+ while (val != 0) { // loop to protect against spurious wakeups
+ count_.wait(val);
+ val = count_.load();
}
- status_ = Status::Reached;
}
private:
- std::atomic<std::size_t> count_{};
- atomic<Status> status_{Status::Init};
+ atomic<std::size_t> count_{};
};
class NotificationQueue {
public:
- NotificationQueue(gsl::not_null<WaitableOneWayFlag*> queues_read,
- gsl::not_null<WaitableZeroCounter*> num_threads_running)
- : queues_read_{std::move(queues_read)},
- num_threads_running_{std::move(num_threads_running)} {}
+ explicit NotificationQueue(
+ gsl::not_null<WaitableZeroCounter*> total_workload)
+ : total_workload_{std::move(total_workload)} {}
NotificationQueue(NotificationQueue const& other) = delete;
NotificationQueue(NotificationQueue&& other) noexcept
: queue_{std::move(other.queue_)},
done_{other.done_},
- queues_read_{std::move(other.queues_read_)},
- num_threads_running_{std::move(other.num_threads_running_)} {}
+ total_workload_{std::move(other.total_workload_)} {}
~NotificationQueue() = default;
[[nodiscard]] auto operator=(NotificationQueue const& other)
@@ -110,9 +65,9 @@ class NotificationQueue {
return !queue_.empty() || done_;
};
if (not there_is_something_to_pop_or_we_are_done()) {
- num_threads_running_->Decrement();
+ total_workload_->Decrement();
ready_.wait(lock, there_is_something_to_pop_or_we_are_done);
- num_threads_running_->Increment();
+ total_workload_->Increment();
}
if (queue_.empty()) {
@@ -120,7 +75,7 @@ class NotificationQueue {
}
auto t = std::move(queue_.front());
queue_.pop_front();
- queues_read_->Set();
+ total_workload_->Decrement();
return t;
}
@@ -133,7 +88,7 @@ class NotificationQueue {
}
auto t = std::move(queue_.front());
queue_.pop_front();
- queues_read_->Set();
+ total_workload_->Decrement();
return t;
}
@@ -145,7 +100,7 @@ class NotificationQueue {
std::unique_lock lock{mutex_};
queue_.emplace_back(std::forward<FunctionType>(f));
}
- queues_read_->Clear();
+ total_workload_->Increment();
ready_.notify_one();
}
@@ -160,7 +115,7 @@ class NotificationQueue {
}
queue_.emplace_back(std::forward<FunctionType>(f));
}
- queues_read_->Clear();
+ total_workload_->Increment();
ready_.notify_one();
return true;
}
@@ -181,8 +136,7 @@ class NotificationQueue {
bool done_{false};
std::mutex mutex_{};
std::condition_variable ready_{};
- gsl::not_null<WaitableOneWayFlag*> queues_read_;
- gsl::not_null<WaitableZeroCounter*> num_threads_running_;
+ gsl::not_null<WaitableZeroCounter*> total_workload_;
};
#endif // INCLUDED_SRC_BUILDTOOL_MULTITHREADING_NOTIFICATION_QUEUE_HPP
diff --git a/src/buildtool/multithreading/task_system.cpp b/src/buildtool/multithreading/task_system.cpp
index 8c976a2f..4af57d0d 100644
--- a/src/buildtool/multithreading/task_system.cpp
+++ b/src/buildtool/multithreading/task_system.cpp
@@ -7,9 +7,9 @@ TaskSystem::TaskSystem() : TaskSystem(std::thread::hardware_concurrency()) {}
TaskSystem::TaskSystem(std::size_t number_of_threads)
: thread_count_{std::max(1UL, number_of_threads)},
- num_threads_running_{thread_count_} {
+ total_workload_{thread_count_} {
for (std::size_t index = 0; index < thread_count_; ++index) {
- queues_.emplace_back(&queues_read_, &num_threads_running_);
+ queues_.emplace_back(&total_workload_);
}
for (std::size_t index = 0; index < thread_count_; ++index) {
threads_.emplace_back([&, index]() { Run(index); });
@@ -19,11 +19,10 @@ TaskSystem::TaskSystem(std::size_t number_of_threads)
TaskSystem::~TaskSystem() {
// When starting a new task system all spawned threads will immediately go
// to sleep and wait for tasks. Even after adding some tasks, it can take a
- // while until the first thread wakes up. Therefore, we first need to wait
- // for the queues being read, before we can wait for all threads to become
- // idle.
- queues_read_.WaitForSet();
- num_threads_running_.WaitForZero();
+ // while until the first thread wakes up. Therefore, we need to wait for the
+ // total workload (number of active threads _and_ total number of queued
+ // tasks) to become zero.
+ total_workload_.WaitForZero();
for (auto& q : queues_) {
q.done();
}
diff --git a/src/buildtool/multithreading/task_system.hpp b/src/buildtool/multithreading/task_system.hpp
index 20950142..6387988a 100644
--- a/src/buildtool/multithreading/task_system.hpp
+++ b/src/buildtool/multithreading/task_system.hpp
@@ -54,8 +54,7 @@ class TaskSystem {
std::vector<std::thread> threads_{};
std::vector<NotificationQueue> queues_{};
std::atomic<std::size_t> index_{0};
- WaitableOneWayFlag queues_read_{};
- WaitableZeroCounter num_threads_running_{};
+ WaitableZeroCounter total_workload_{};
static constexpr std::size_t kNumberOfAttempts = 5;