Add possibility to wait for current job to stop.

This commit is contained in:
tamasmeszaros 2021-12-02 23:05:55 +01:00
parent 1bfbf189c5
commit 35ca045b1c
7 changed files with 134 additions and 36 deletions

View File

@ -39,24 +39,25 @@ void BoostThreadWorker::WorkerMessage::deliver(BoostThreadWorker &runner)
void BoostThreadWorker::run()
{
bool stop = false;
while(!stop) {
m_input_queue.consume_one_blk([this, &stop](JobEntry &e) {
if (!e.job)
stop = true;
else {
m_canceled.store(false);
while (!stop) {
m_input_queue
.consume_one(BlockingWait{0, &m_running}, [this, &stop](JobEntry &e) {
if (!e.job)
stop = true;
else {
m_canceled.store(false);
try {
e.job->process(*this);
} catch (...) {
e.eptr = std::current_exception();
try {
e.job->process(*this);
} catch (...) {
e.eptr = std::current_exception();
}
e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message
}
e.canceled = m_canceled.load();
m_output_queue.push(std::move(e)); // finalization message
}
m_running.store(false);
}, &m_running);
m_running.store(false);
});
};
}
@ -96,6 +97,7 @@ BoostThreadWorker::~BoostThreadWorker()
bool joined = false;
try {
cancel_all();
wait_for_idle(ABORT_WAIT_MAX_MS);
m_input_queue.push(JobEntry{nullptr});
joined = join(ABORT_WAIT_MAX_MS);
} catch(...) {}
@ -129,6 +131,45 @@ void BoostThreadWorker::process_events()
}));
}
bool BoostThreadWorker::wait_for_current_job(unsigned timeout_ms)
{
bool ret = true;
if (!is_idle()) {
bool was_finish = false;
bool timeout_reached = false;
while (!timeout_reached && !was_finish) {
timeout_reached =
!m_output_queue.consume_one(BlockingWait{timeout_ms},
[this, &was_finish](
WorkerMessage &msg) {
msg.deliver(*this);
if (msg.get_type() ==
WorkerMessage::Finalize)
was_finish = true;
});
}
ret = !timeout_reached;
}
return ret;
}
bool BoostThreadWorker::wait_for_idle(unsigned timeout_ms)
{
bool timeout_reached = false;
while (!timeout_reached && !is_idle()) {
timeout_reached = !m_output_queue
.consume_one(BlockingWait{timeout_ms},
[this](WorkerMessage &msg) {
msg.deliver(*this);
});
}
return !timeout_reached;
}
bool BoostThreadWorker::push(std::unique_ptr<Job> job)
{
if (job)

View File

@ -44,7 +44,10 @@ class BoostThreadWorker : public Worker, private Job::Ctl
class WorkerMessage
{
public:
enum MsgType { Empty, Status, Finalize, MainThreadCall };
private:
boost::variant<EmptyMessage, StatusInfo, JobEntry, MainThreadCallData> m_data;
public:
@ -127,6 +130,9 @@ public:
const ProgressIndicator * get_pri() const { return m_progress.get(); }
void process_events() override;
bool wait_for_current_job(unsigned timeout_ms = 0) override;
bool wait_for_idle(unsigned timeout_ms = 0) override;
};
}} // namespace Slic3r::GUI

View File

@ -124,6 +124,14 @@ public:
void cancel() override { m_w.cancel(); }
void cancel_all() override { m_w.cancel_all(); }
void process_events() override { m_w.process_events(); }
bool wait_for_current_job(unsigned timeout_ms = 0) override
{
return m_w.wait_for_current_job(timeout_ms);
}
bool wait_for_idle(unsigned timeout_ms = 0) override
{
return m_w.wait_for_idle(timeout_ms);
}
};
}} // namespace Slic3r::GUI

View File

@ -9,6 +9,12 @@
namespace Slic3r { namespace GUI {
struct BlockingWait
{
unsigned timeout_ms = 0;
std::atomic<bool> *pop_flag = nullptr;
};
// A thread safe queue for one producer and one consumer. Use consume_one_blk
// to block on an empty queue.
template<class T,
@ -22,7 +28,7 @@ class ThreadSafeQueueSPSC
public:
// Consume one element, block if the queue is empty.
template<class Fn> void consume_one_blk(Fn &&fn, std::atomic<bool> *pop_flag = nullptr)
template<class Fn> bool consume_one(const BlockingWait &blkw, Fn &&fn)
{
static_assert(!std::is_reference_v<T>, "");
static_assert(std::is_default_constructible_v<T>, "");
@ -31,7 +37,15 @@ public:
T el;
{
std::unique_lock lk{m_mutex};
m_cond_var.wait(lk, [this]{ return !m_queue.empty(); });
auto pred = [this]{ return !m_queue.empty(); };
if (blkw.timeout_ms > 0) {
auto timeout = std::chrono::milliseconds(blkw.timeout_ms);
if (!m_cond_var.wait_for(lk, timeout, pred))
return false;
}
else
m_cond_var.wait(lk, pred);
if constexpr (std::is_move_assignable_v<T>)
el = std::move(m_queue.front());
@ -40,11 +54,12 @@ public:
m_queue.pop();
if (pop_flag) // The optional atomic is set before the lock us unlocked
pop_flag->store(true);
if (blkw.pop_flag) // The optional atomic is set before the lock us unlocked
blkw.pop_flag->store(true);
}
fn(el);
return true;
}
// Consume one element, return true if consumed, false if queue was empty.

View File

@ -16,8 +16,9 @@ public:
// Returns false if the job gets discarded.
virtual bool push(std::unique_ptr<Job> job) = 0;
// Returns true if no job is running and no job message is left to be processed.
// This means that nothing is left to finalize or take care of in the main thread.
// Returns true if no job is running, the job queue is empty and no job
// message is left to be processed. This means that nothing is left to
// finalize or take care of in the main thread.
virtual bool is_idle() const = 0;
// Ask the current job gracefully to cancel. This call is not blocking and
@ -29,11 +30,21 @@ public:
// This method will delete the queued jobs and cancel the current one.
virtual void cancel_all() = 0;
// Needs to be called continuously to process events (like status update or
// finalizing of jobs) in the UI thread. This can be done e.g. in a wxIdle
// handler.
// Needs to be called continuously to process events (like status update
// or finalizing of jobs) in the main thread. This can be done e.g. in a
// wxIdle handler.
virtual void process_events() = 0;
// Wait until the current job finishes. Timeout will only be considered
// if not zero. Returns false if timeout is reached but the job has not
// finished.
virtual bool wait_for_current_job(unsigned timeout_ms = 0) = 0;
// Wait until the whole job queue finishes. Timeout will only be considered
// if not zero. Returns false only if timeout is reached but the worker has
// not reached the idle state.
virtual bool wait_for_idle(unsigned timeout_ms = 0) = 0;
// The destructor shall properly close the worker thread.
virtual ~Worker() = default;
};
@ -88,6 +99,21 @@ template<class...Args> bool replace_job(Worker &w, Args&& ...args)
return queue_job(w, std::forward<Args>(args)...);
}
// Cancel the current job and wait for it to actually be stopped.
inline void stop_current_job(Worker &w, unsigned timeout_ms = 0)
{
w.cancel();
w.wait_for_current_job(timeout_ms);
}
// Cancel all pending jobs including current one and wait until the worker
// becomes idle.
inline void stop_queue(Worker &w, unsigned timeout_ms = 0)
{
w.cancel_all();
w.wait_for_idle(timeout_ms);
}
}} // namespace Slic3r::GUI
#endif // WORKER_HPP

View File

@ -5046,6 +5046,7 @@ void Plater::import_sl1_archive()
{
auto &w = get_ui_job_worker();
if (w.is_idle() && p->m_sla_import_dlg->ShowModal() == wxID_OK) {
p->take_snapshot(_L("Import SLA archive"));
replace_job(w, std::make_unique<SLAImportJob>(p->m_sla_import_dlg));
}
}
@ -5482,8 +5483,10 @@ void Plater::set_number_of_copies(/*size_t num*/)
void Plater::fill_bed_with_instances()
{
auto &w = get_ui_job_worker();
if (w.is_idle())
if (w.is_idle()) {
p->take_snapshot(_L("Fill bed"));
replace_job(w, std::make_unique<FillBedJob>());
}
}
bool Plater::is_selection_empty() const
@ -5890,7 +5893,7 @@ void Plater::reslice()
return;
// Stop arrange and (or) optimize rotation tasks.
this->get_ui_job_worker().cancel_all();
stop_queue(this->get_ui_job_worker());
if (printer_technology() == ptSLA) {
for (auto& object : model().objects)
@ -6364,8 +6367,10 @@ GLCanvas3D* Plater::get_current_canvas3D()
void Plater::arrange()
{
auto &w = get_ui_job_worker();
if (w.is_idle())
if (w.is_idle()) {
p->take_snapshot(_L("Arrange"));
replace_job(w, std::make_unique<ArrangeJob>());
}
}
void Plater::set_current_canvas_as_dirty()

View File

@ -37,8 +37,7 @@ TEST_CASE("State should not be idle while running a job", "[Jobs]") {
}).wait();
});
while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();
REQUIRE(worker.is_idle());
}
@ -55,8 +54,7 @@ TEST_CASE("Status messages should be received by the main thread during job exec
}
});
while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();
REQUIRE(pri->pr == 100);
REQUIRE(pri->statustxt == "Running");
@ -85,8 +83,7 @@ TEST_CASE("Cancellation should be recognized be the worker", "[Jobs]") {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
worker.cancel();
while (!worker.is_idle())
worker.process_events();
worker.wait_for_current_job();
REQUIRE(pri->pr != 100);
}
@ -146,6 +143,6 @@ TEST_CASE("Exception should be properly forwarded to finalize()", "[Jobs]") {
eptr = nullptr;
});
while (!worker.is_idle())
worker.process_events();
worker.wait_for_idle();
REQUIRE(worker.is_idle());
}