Fix issue with non atomic transition to running state

After popping a job from input queue
This commit is contained in:
tamasmeszaros 2021-12-02 14:55:52 +01:00
parent 39198ff845
commit 78118b7b1d
3 changed files with 18 additions and 9 deletions

View File

@ -45,7 +45,6 @@ void BoostThreadWorker::run()
stop = true; stop = true;
else { else {
m_canceled.store(false); m_canceled.store(false);
m_running.store(true);
try { try {
e.job->process(*this); e.job->process(*this);
@ -55,9 +54,9 @@ void BoostThreadWorker::run()
e.canceled = m_canceled.load(); e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message m_output_queue.push(std::move(e)); // finalization message
m_running.store(false);
} }
}); m_running.store(false);
}, &m_running);
}; };
} }
@ -94,14 +93,16 @@ constexpr int ABORT_WAIT_MAX_MS = 10000;
BoostThreadWorker::~BoostThreadWorker() BoostThreadWorker::~BoostThreadWorker()
{ {
bool joined = false;
try { try {
cancel_all(); cancel_all();
m_input_queue.push(JobEntry{nullptr}); m_input_queue.push(JobEntry{nullptr});
join(ABORT_WAIT_MAX_MS); joined = join(ABORT_WAIT_MAX_MS);
} catch(...) { } catch(...) {}
if (!joined)
BOOST_LOG_TRIVIAL(error) BOOST_LOG_TRIVIAL(error)
<< "Could not join worker thread '" << m_name << "'"; << "Could not join worker thread '" << m_name << "'";
}
} }
bool BoostThreadWorker::join(int timeout_ms) bool BoostThreadWorker::join(int timeout_ms)

View File

@ -5,6 +5,7 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <atomic>
namespace Slic3r { namespace GUI { namespace Slic3r { namespace GUI {
@ -21,7 +22,7 @@ class ThreadSafeQueueSPSC
public: public:
// Consume one element, block if the queue is empty. // Consume one element, block if the queue is empty.
template<class Fn> void consume_one_blk(Fn &&fn) template<class Fn> void consume_one_blk(Fn &&fn, std::atomic<bool> *pop_flag = nullptr)
{ {
static_assert(!std::is_reference_v<T>, ""); static_assert(!std::is_reference_v<T>, "");
static_assert(std::is_default_constructible_v<T>, ""); static_assert(std::is_default_constructible_v<T>, "");
@ -38,6 +39,9 @@ public:
el = m_queue.front(); el = m_queue.front();
m_queue.pop(); m_queue.pop();
if (pop_flag) // The optional atomic is set before the lock us unlocked
pop_flag->store(true);
} }
fn(el); fn(el);
@ -50,7 +54,11 @@ public:
{ {
std::unique_lock lk{m_mutex}; std::unique_lock lk{m_mutex};
if (!m_queue.empty()) { if (!m_queue.empty()) {
el = std::move(m_queue.front()); if constexpr (std::is_move_assignable_v<T>)
el = std::move(m_queue.front());
else
el = m_queue.front();
m_queue.pop(); m_queue.pop();
} else } else
return false; return false;

View File

@ -86,7 +86,7 @@ TEST_CASE("Cancellation should be recognized be the worker", "[Jobs]") {
REQUIRE(pri->pr != 100); REQUIRE(pri->pr != 100);
} }
TEST_CASE("Cancel_all should remove all pending jobs", "[Jobs]") { TEST_CASE("cancel_all should remove all pending jobs", "[Jobs]") {
using namespace Slic3r; using namespace Slic3r;
using namespace Slic3r::GUI; using namespace Slic3r::GUI;