Use a semaphore to synchronize threads instead of blocking with join(). Lock threads array in order to ensure all of them are signalled. #2394
This commit is contained in:
parent
84760b8d59
commit
e9166a8fe6
2 changed files with 41 additions and 28 deletions
|
@ -82,30 +82,37 @@ use constant EXTERNAL_INFILL_MARGIN => 3;
|
||||||
use constant INSET_OVERLAP_TOLERANCE => 0.2;
|
use constant INSET_OVERLAP_TOLERANCE => 0.2;
|
||||||
|
|
||||||
# keep track of threads we created
|
# keep track of threads we created
|
||||||
my @threads = ();
|
my @my_threads = ();
|
||||||
my $sema = Thread::Semaphore->new;
|
my @threads : shared = ();
|
||||||
|
my $pause_sema = Thread::Semaphore->new;
|
||||||
|
my $parallel_sema;
|
||||||
my $paused = 0;
|
my $paused = 0;
|
||||||
|
|
||||||
sub spawn_thread {
|
sub spawn_thread {
|
||||||
my ($cb) = @_;
|
my ($cb) = @_;
|
||||||
|
|
||||||
my $parent_tid = threads->tid;
|
my $parent_tid = threads->tid;
|
||||||
|
lock @threads;
|
||||||
|
|
||||||
@_ = ();
|
@_ = ();
|
||||||
my $thread = threads->create(sub {
|
my $thread = threads->create(sub {
|
||||||
|
@my_threads = ();
|
||||||
|
|
||||||
Slic3r::debugf "Starting thread %d (parent: %d)...\n", threads->tid, $parent_tid;
|
Slic3r::debugf "Starting thread %d (parent: %d)...\n", threads->tid, $parent_tid;
|
||||||
local $SIG{'KILL'} = sub {
|
local $SIG{'KILL'} = sub {
|
||||||
Slic3r::debugf "Exiting thread %d...\n", threads->tid;
|
Slic3r::debugf "Exiting thread %d...\n", threads->tid;
|
||||||
|
$parallel_sema->up if $parallel_sema;
|
||||||
kill_all_threads();
|
kill_all_threads();
|
||||||
Slic3r::thread_cleanup();
|
Slic3r::thread_cleanup();
|
||||||
threads->exit();
|
threads->exit();
|
||||||
};
|
};
|
||||||
local $SIG{'STOP'} = sub {
|
local $SIG{'STOP'} = sub {
|
||||||
$sema->down;
|
$pause_sema->down;
|
||||||
$sema->up;
|
$pause_sema->up;
|
||||||
};
|
};
|
||||||
$cb->();
|
$cb->();
|
||||||
});
|
});
|
||||||
|
push @my_threads, $thread->tid;
|
||||||
push @threads, $thread->tid;
|
push @threads, $thread->tid;
|
||||||
return $thread;
|
return $thread;
|
||||||
}
|
}
|
||||||
|
@ -118,10 +125,15 @@ sub parallelize {
|
||||||
my $q = Thread::Queue->new;
|
my $q = Thread::Queue->new;
|
||||||
$q->enqueue(@items, (map undef, 1..$params{threads}));
|
$q->enqueue(@items, (map undef, 1..$params{threads}));
|
||||||
|
|
||||||
|
$parallel_sema = Thread::Semaphore->new(-$params{threads});
|
||||||
|
$parallel_sema->up;
|
||||||
my $thread_cb = sub {
|
my $thread_cb = sub {
|
||||||
# execute thread callback
|
# execute thread callback
|
||||||
$params{thread_cb}->($q);
|
$params{thread_cb}->($q);
|
||||||
|
|
||||||
|
# signal the parent thread that we're done
|
||||||
|
$parallel_sema->up;
|
||||||
|
|
||||||
# cleanup before terminating thread
|
# cleanup before terminating thread
|
||||||
Slic3r::thread_cleanup();
|
Slic3r::thread_cleanup();
|
||||||
|
|
||||||
|
@ -133,17 +145,17 @@ sub parallelize {
|
||||||
# The downside to using this exit is that we can't return
|
# The downside to using this exit is that we can't return
|
||||||
# any value to the main thread but we're not doing that
|
# any value to the main thread but we're not doing that
|
||||||
# anymore anyway.
|
# anymore anyway.
|
||||||
# collect_cb is completely useless now
|
|
||||||
# and should be removed from the codebase.
|
|
||||||
threads->exit;
|
threads->exit;
|
||||||
};
|
};
|
||||||
$params{collect_cb} ||= sub {};
|
|
||||||
|
|
||||||
@_ = ();
|
@_ = ();
|
||||||
my @my_threads = map spawn_thread($thread_cb), 1..$params{threads};
|
my @my_threads = map spawn_thread($thread_cb), 1..$params{threads};
|
||||||
foreach my $th (@my_threads) {
|
|
||||||
$params{collect_cb}->($th->join);
|
# We use a semaphore instead of $th->join because joined threads are
|
||||||
}
|
# not listed by threads->list or threads->object anymore, thus can't
|
||||||
|
# be signalled.
|
||||||
|
$parallel_sema->down;
|
||||||
|
$_->detach for @my_threads;
|
||||||
} else {
|
} else {
|
||||||
$params{no_threads_cb}->();
|
$params{no_threads_cb}->();
|
||||||
}
|
}
|
||||||
|
@ -202,40 +214,43 @@ sub thread_cleanup {
|
||||||
}
|
}
|
||||||
|
|
||||||
sub get_running_threads {
|
sub get_running_threads {
|
||||||
return grep defined($_), map threads->object($_), @threads;
|
return grep defined($_), map threads->object($_), @_;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub kill_all_threads {
|
sub kill_all_threads {
|
||||||
# detach any running thread created in the current one
|
# if we're the main thread, we send SIGKILL to all the running threads
|
||||||
my @killed = ();
|
if (threads->tid == 0) {
|
||||||
foreach my $thread (get_running_threads()) {
|
lock @threads;
|
||||||
Slic3r::debugf "Thread %d killing %d...\n", threads->tid, $thread->tid;
|
foreach my $thread (get_running_threads(@threads)) {
|
||||||
$thread->kill('KILL');
|
Slic3r::debugf "Thread %d killing %d...\n", threads->tid, $thread->tid;
|
||||||
push @killed, $thread;
|
$thread->kill('KILL');
|
||||||
|
}
|
||||||
|
|
||||||
|
# unlock semaphore before we block on wait
|
||||||
|
# otherwise we'd get a deadlock if threads were paused
|
||||||
|
resume_threads();
|
||||||
}
|
}
|
||||||
|
|
||||||
# unlock semaphore before we block on wait
|
# in any thread we wait for our children
|
||||||
# otherwise we'd get a deadlock if threads were paused
|
foreach my $thread (get_running_threads(@my_threads)) {
|
||||||
resume_threads();
|
Slic3r::debugf " Thread %d waiting for %d...\n", threads->tid, $thread->tid;
|
||||||
foreach my $thread (@killed) {
|
|
||||||
Slic3r::debugf " Threads %d waiting for %d...\n", threads->tid, $thread->tid;
|
|
||||||
$thread->join; # block until threads are killed
|
$thread->join; # block until threads are killed
|
||||||
Slic3r::debugf " Thread %d finished waiting for %d...\n", threads->tid, $thread->tid;
|
Slic3r::debugf " Thread %d finished waiting for %d...\n", threads->tid, $thread->tid;
|
||||||
}
|
}
|
||||||
@threads = ();
|
@my_threads = ();
|
||||||
}
|
}
|
||||||
|
|
||||||
sub pause_threads {
|
sub pause_threads {
|
||||||
return if $paused;
|
return if $paused;
|
||||||
$paused = 1;
|
$paused = 1;
|
||||||
$sema->down;
|
$pause_sema->down;
|
||||||
$_->kill('STOP') for get_running_threads();
|
$_->kill('STOP') for get_running_threads(@threads);
|
||||||
}
|
}
|
||||||
|
|
||||||
sub resume_threads {
|
sub resume_threads {
|
||||||
return unless $paused;
|
return unless $paused;
|
||||||
$paused = 0;
|
$paused = 0;
|
||||||
$sema->up;
|
$pause_sema->up;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub encode_path {
|
sub encode_path {
|
||||||
|
|
|
@ -426,7 +426,6 @@ sub make_perimeters {
|
||||||
$self->get_layer($i)->make_perimeters;
|
$self->get_layer($i)->make_perimeters;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
collect_cb => sub {},
|
|
||||||
no_threads_cb => sub {
|
no_threads_cb => sub {
|
||||||
$_->make_perimeters for @{$self->layers};
|
$_->make_perimeters for @{$self->layers};
|
||||||
},
|
},
|
||||||
|
@ -506,7 +505,6 @@ sub infill {
|
||||||
$layerm->fills->append($_) for $self->fill_maker->make_fill($layerm);
|
$layerm->fills->append($_) for $self->fill_maker->make_fill($layerm);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
collect_cb => sub {},
|
|
||||||
no_threads_cb => sub {
|
no_threads_cb => sub {
|
||||||
foreach my $layerm (map @{$_->regions}, @{$self->layers}) {
|
foreach my $layerm (map @{$_->regions}, @{$self->layers}) {
|
||||||
$layerm->fills->clear;
|
$layerm->fills->clear;
|
||||||
|
|
Loading…
Add table
Reference in a new issue