Add synchronisation of multi-threaded system

This commit is contained in:
Magnus Ulimoen 2021-08-18 11:45:55 +00:00
parent 4b6ee18491
commit 4f0af1f6c1
3 changed files with 37 additions and 13 deletions

View File

@ -19,3 +19,4 @@ indexmap = { version = "1.5.2", features = ["serde-1"] }
argh = "0.1.4" argh = "0.1.4"
evalexpr = "6.3.0" evalexpr = "6.3.0"
crossbeam-channel = "0.5.0" crossbeam-channel = "0.5.0"
crossbeam-utils = "0.8.5"

View File

@ -108,6 +108,9 @@ fn main() {
} }
let timer = if opt.timings { let timer = if opt.timings {
if let system::System::MultiThreaded(sys) = &sys {
sys.synchronise()
}
Some(std::time::Instant::now()) Some(std::time::Instant::now())
} else { } else {
None None
@ -119,25 +122,30 @@ fn main() {
sys.advance(nexttime - itime); sys.advance(nexttime - itime);
itime = nexttime; itime = nexttime;
sys.output(itime); if itime != ntime {
sys.output(itime);
}
} }
let timer = timer.map(|timer| {
if let system::System::MultiThreaded(sys) = &sys {
sys.synchronise();
}
timer.elapsed()
});
sys.output(ntime);
let mut outinfo = OutputInformation {
filename: opt.output,
time_elapsed: timer,
..Default::default()
};
if !opt.no_progressbar { if !opt.no_progressbar {
sys.finish_progressbar(); sys.finish_progressbar();
} }
let mut outinfo = OutputInformation {
filename: opt.output,
..Default::default()
};
if let Some(timer) = timer {
let duration = timer.elapsed();
outinfo.time_elapsed = Some(duration);
}
//output.add_timestep(ntime, &sys.fnow);
if opt.error { if opt.error {
outinfo.error = Some(sys.error()) outinfo.error = Some(sys.error())
} }

View File

@ -415,6 +415,7 @@ impl System {
for tid in &sys.send { for tid in &sys.send {
tid.send(MsgFromHost::ProgressbarDrop).unwrap(); tid.send(MsgFromHost::ProgressbarDrop).unwrap();
} }
sys.synchronise();
let target = sys.progressbar.take().unwrap(); let target = sys.progressbar.take().unwrap();
target.clear().unwrap(); target.clear().unwrap();
} }
@ -658,6 +659,17 @@ impl DistributedSystem {
target.set_move_cursor(true); target.set_move_cursor(true);
self.progressbar = Some(target); self.progressbar = Some(target);
} }
fn send_barrier(&self, barrier: &crossbeam_utils::sync::WaitGroup) {
for tid in &self.send {
tid.send(MsgFromHost::Barrier(barrier.clone())).unwrap()
}
}
pub fn synchronise(&self) {
// Syncronise before starting the timer
let barrier = crossbeam_utils::sync::WaitGroup::new();
self.send_barrier(&barrier);
barrier.wait();
}
} }
impl Drop for DistributedSystem { impl Drop for DistributedSystem {
@ -687,6 +699,8 @@ enum MsgFromHost {
Stop, Stop,
/// Request the current error /// Request the current error
Error, Error,
/// A barrier that must be waited on
Barrier(crossbeam_utils::sync::WaitGroup),
/// Progressbar to report progress /// Progressbar to report progress
Progressbar(indicatif::ProgressBar), Progressbar(indicatif::ProgressBar),
/// Clear and remove the progressbar /// Clear and remove the progressbar
@ -760,6 +774,7 @@ impl DistributedSystemPart {
MsgFromHost::Output(ntime) => self.output(ntime), MsgFromHost::Output(ntime) => self.output(ntime),
MsgFromHost::Stop => return, MsgFromHost::Stop => return,
MsgFromHost::Error => self.send(MsgToHost::Error(self.error())).unwrap(), MsgFromHost::Error => self.send(MsgToHost::Error(self.error())).unwrap(),
MsgFromHost::Barrier(barrier) => barrier.wait(),
MsgFromHost::Progressbar(pbar) => self.progressbar = Some(pbar), MsgFromHost::Progressbar(pbar) => self.progressbar = Some(pbar),
MsgFromHost::ProgressbarDrop => { MsgFromHost::ProgressbarDrop => {
let pb = self.progressbar.take().unwrap(); let pb = self.progressbar.take().unwrap();