diff --git a/sbp/examples/multigrid/bin.rs b/sbp/examples/multigrid/bin.rs index f3c79d3..f8efe92 100644 --- a/sbp/examples/multigrid/bin.rs +++ b/sbp/examples/multigrid/bin.rs @@ -87,89 +87,7 @@ impl System { } } - fn advance(&mut self, dt: Float) { - for i in 0.. { - let time; - let fnext; - match i { - 0 => { - for (prev, fut) in self.fnow.iter().zip(self.fnext.iter_mut()) { - fut.assign(prev); - } - fnext = &mut self.k[i]; - time = self.time; - } - 1 | 2 => { - for ((prev, fut), k) in self - .fnow - .iter() - .zip(self.fnext.iter_mut()) - .zip(&self.k[i - 1]) - { - fut.assign(prev); - fut.scaled_add(1.0 / 2.0 * dt, k); - } - fnext = &mut self.k[i]; - time = self.time + dt / 2.0; - } - 3 => { - for ((prev, fut), k) in self - .fnow - .iter() - .zip(self.fnext.iter_mut()) - .zip(&self.k[i - 1]) - { - fut.assign(prev); - fut.scaled_add(dt, k); - } - fnext = &mut self.k[i]; - time = self.time + dt; - } - 4 => { - for (((((prev, fut), k0), k1), k2), k3) in self - .fnow - .iter() - .zip(self.fnext.iter_mut()) - .zip(&self.k[0]) - .zip(&self.k[1]) - .zip(&self.k[2]) - .zip(&self.k[3]) - { - ndarray::Zip::from(&mut **fut) - .and(&**prev) - .and(&**k0) - .and(&**k1) - .and(&**k2) - .and(&**k3) - .apply(|y1, &y0, &k1, &k2, &k3, &k4| { - *y1 = y0 + dt / 6.0 * (k1 + 2.0 * k2 + 2.0 * k3 + k4) - }); - } - std::mem::swap(&mut self.fnext, &mut self.fnow); - self.time += dt; - return; - } - _ => { - unreachable!(); - } - } - - let fields = &self.fnext; - let bt = extract_boundaries(&fields, &mut self.bt, &mut self.eb, &self.grids, time); - - for ((((prev, fut), metrics), wb), bt) in fields - .iter() - .zip(fnext) - .zip(&self.metrics) - .zip(&mut self.wb) - .zip(bt) - { - euler::RHS_upwind(fut, prev, metrics, &bt, wb) - } - } - } - - fn advance_parallel(&mut self, dt: Float, s: &rayon::ThreadPool) { + fn advance(&mut self, dt: Float, s: &rayon::ThreadPool) { for i in 0.. { let time; match i { @@ -404,33 +322,69 @@ fn main() { let ntime = (integration_time / dt).round() as u64; - let pool = if let Some(j) = opt.jobs { + let pool = { let builder = rayon::ThreadPoolBuilder::new(); - let builder = if let Some(j) = j { - builder.num_threads(j) + if let Some(j) = opt.jobs { + if let Some(j) = j { + builder.num_threads(j) + } else { + builder + } } else { - builder - }; - Some(builder.build().unwrap()) - } else { - None + builder.num_threads(1) + } + .build() + .unwrap() }; let output = File::create(&opt.output, sys.grids.as_slice()).unwrap(); - output.add_timestep(0, sys.fnow.as_slice()).unwrap(); + + // Pingpong back and forth a number of Vec to be used for the + // output. The sync_channel applies some backpressure + let (tx_thread, rx) = std::sync::mpsc::channel::>(); + let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec)>(3); + let outputthread = std::thread::Builder::new() + .name("multigrid_output".to_owned()) + .spawn(move || { + let mut times = Vec::::new(); + + for (ntime, fields) in rx_thread.iter() { + if !times.contains(&ntime) { + output.add_timestep(ntime, fields.as_slice()).unwrap(); + times.push(ntime); + } + tx_thread.send(fields).unwrap(); + } + }) + .unwrap(); + + let output = |ntime: u64, nowfield: &[euler::Field]| match rx.try_recv() { + Ok(mut fields) => { + for (from, to) in nowfield.iter().zip(fields.iter_mut()) { + to.assign(&from); + } + tx.send((ntime, fields)).unwrap(); + } + Err(std::sync::mpsc::TryRecvError::Empty) => { + let fields = nowfield.to_vec(); + tx.send((ntime, fields)).unwrap(); + } + Err(e) => panic!("{:?}", e), + }; + + output(0, &sys.fnow); let bar = progressbar(opt.no_progressbar, ntime); for _ in 0..ntime { bar.inc(1); - if let Some(pool) = pool.as_ref() { - sys.advance_parallel(dt, &pool); - } else { - sys.advance(dt); - } + sys.advance(dt, &pool); } bar.finish(); - output.add_timestep(ntime, sys.fnow.as_slice()).unwrap(); + output(ntime, &sys.fnow); + + std::mem::drop(tx); + outputthread.join().unwrap(); } fn progressbar(dummy: bool, ntime: u64) -> indicatif::ProgressBar {