From 4d44b4a74a2a034e27eb7963d89025b8baeab135 Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Sun, 8 Aug 2021 20:28:37 +0000 Subject: [PATCH] Fix ouptut for single/multi backend --- multigrid/src/file.rs | 155 ----------------------------- multigrid/src/main.rs | 8 +- multigrid/src/system.rs | 209 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 191 insertions(+), 181 deletions(-) delete mode 100644 multigrid/src/file.rs diff --git a/multigrid/src/file.rs b/multigrid/src/file.rs deleted file mode 100644 index 6be9569..0000000 --- a/multigrid/src/file.rs +++ /dev/null @@ -1,155 +0,0 @@ -use super::*; - -pub struct OutputThread { - rx: Option>>, - tx: Option)>>, - thread: Option>, -} - -impl OutputThread { - pub fn new(file: File) -> Self { - // Pingpong back and forth a number of Vec to be used for the - // output. The sync_channel applies some backpressure - let (tx_thread, rx) = std::sync::mpsc::channel::>(); - let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec)>(3); - let thread = std::thread::Builder::new() - .name("multigrid_output".to_owned()) - .spawn(move || { - let mut times = Vec::::new(); - - for (ntime, fields) in rx_thread.iter() { - if !times.contains(&ntime) { - file.add_timestep(ntime, fields.as_slice()).unwrap(); - times.push(ntime); - } - tx_thread.send(fields).unwrap(); - } - }) - .unwrap(); - - Self { - tx: Some(tx), - rx: Some(rx), - thread: Some(thread), - } - } - - pub fn add_timestep(&mut self, ntime: u64, fields: &[euler::Field]) { - match self.rx.as_ref().unwrap().try_recv() { - Ok(mut copy_fields) => { - for (from, to) in fields.iter().zip(copy_fields.iter_mut()) { - to.clone_from(&from); - } - self.tx - .as_ref() - .unwrap() - .send((ntime, copy_fields)) - .unwrap(); - } - Err(std::sync::mpsc::TryRecvError::Empty) => { - let fields = fields.to_vec(); - self.tx.as_ref().unwrap().send((ntime, fields)).unwrap(); - } - Err(e) => panic!("{:?}", e), - }; - } -} - -impl Drop for OutputThread { - fn drop(&mut self) { - let tx = self.tx.take(); - std::mem::drop(tx); - let thread = self.thread.take().unwrap(); - thread.join().unwrap(); - } -} - -#[derive(Debug, Clone)] -pub struct File(hdf5::File, Vec); - -impl File { - pub fn create>( - path: P, - grids: &[sbp::grid::Grid], - names: Vec, - ) -> Result> { - assert_eq!(grids.len(), names.len()); - let file = hdf5::File::create(path.as_ref())?; - let _tds = file - .new_dataset::() - .resizable(true) - .chunk((1,)) - .create("t", (0,))?; - - for (name, grid) in names.iter().zip(grids.iter()) { - let g = file.create_group(name)?; - g.link_soft("/t", "t").unwrap(); - - let add_dim = |name| { - g.new_dataset::() - .chunk((grid.ny(), grid.nx())) - .gzip(9) - .create(name, (grid.ny(), grid.nx())) - }; - let xds = add_dim("x")?; - xds.write(grid.x())?; - let yds = add_dim("y")?; - yds.write(grid.y())?; - - let add_var = |name| { - g.new_dataset::() - .gzip(3) - .shuffle(true) - .chunk((1, grid.ny(), grid.nx())) - .resizable(true) - .create(name, (0, grid.ny(), grid.nx())) - }; - add_var("rho")?; - add_var("rhou")?; - add_var("rhov")?; - add_var("e")?; - } - - Ok(Self(file, names)) - } - - pub fn add_timestep( - &self, - t: u64, - fields: &[euler::Field], - ) -> Result<(), Box> { - let file = &self.0; - let tds = file.dataset("t")?; - let tpos = tds.size(); - tds.resize((tpos + 1,))?; - tds.write_slice(&[t], ndarray::s![tpos..tpos + 1])?; - - for (groupname, fnow) in self.1.iter().zip(fields.iter()) { - let g = file.group(groupname)?; - let (tpos, ny, nx) = { - let ds = g.dataset("rho")?; - let shape = ds.shape(); - (shape[0], shape[1], shape[2]) - }; - - let rhods = g.dataset("rho")?; - let rhouds = g.dataset("rhou")?; - let rhovds = g.dataset("rhov")?; - let eds = g.dataset("e")?; - - let (rho, rhou, rhov, e) = fnow.components(); - rhods.resize((tpos + 1, ny, nx))?; - rhods.write_slice(rho, ndarray::s![tpos, .., ..])?; - - rhouds.resize((tpos + 1, ny, nx))?; - rhouds.write_slice(rhou, ndarray::s![tpos, .., ..])?; - - rhovds.resize((tpos + 1, ny, nx))?; - rhovds.write_slice(rhov, ndarray::s![tpos, .., ..])?; - - eds.resize((tpos + 1, ny, nx))?; - eds.write_slice(e, ndarray::s![tpos, .., ..])?; - } - Ok(()) - } -} diff --git a/multigrid/src/main.rs b/multigrid/src/main.rs index ed16796..2f7a718 100644 --- a/multigrid/src/main.rs +++ b/multigrid/src/main.rs @@ -1,13 +1,10 @@ use argh::FromArgs; -use euler::eval::Evaluator; use sbp::*; -mod file; +mod eval; mod input; mod parsing; -use file::*; -mod eval; mod system; #[derive(Debug, FromArgs)] @@ -90,13 +87,14 @@ fn main() { ); // System::new(grids, grid_connections, operators); - let sys = if opt.distribute { + let mut sys = if opt.distribute { basesystem.create_distributed() } else { basesystem.create() }; let dt = sys.max_dt(); + sys.set_dt(dt); let ntime = (integration_time / dt).round() as u64; let steps_between_outputs = if let Some(n) = opt.number_of_outputs { std::cmp::max(n / ntime, 1) diff --git a/multigrid/src/system.rs b/multigrid/src/system.rs index 8058b4b..cf92604 100644 --- a/multigrid/src/system.rs +++ b/multigrid/src/system.rs @@ -80,7 +80,38 @@ impl BaseSystem { .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) .collect(); - let sys = SingleThreadedSystem { + let mut outputs = Vec::with_capacity(self.grids.len()); + for (name, grid) in self.names.iter().zip(&self.grids) { + let g = self.output.create_group(name).unwrap(); + g.link_soft("/t", "t").unwrap(); + + let add_dim = |name| { + g.new_dataset::() + .chunk((grid.ny(), grid.nx())) + .gzip(9) + .create(name, (grid.ny(), grid.nx())) + }; + let xds = add_dim("x").unwrap(); + xds.write(grid.x()).unwrap(); + let yds = add_dim("y").unwrap(); + yds.write(grid.y()).unwrap(); + + let add_var = |name| { + g.new_dataset::() + .gzip(3) + .shuffle(true) + .chunk((1, grid.ny(), grid.nx())) + .resizable(true) + .create(name, (0, grid.ny(), grid.nx())) + }; + add_var("rho").unwrap(); + add_var("rhou").unwrap(); + add_var("rhov").unwrap(); + add_var("e").unwrap(); + outputs.push(g); + } + + let mut sys = SingleThreadedSystem { fnow, fnext, k, @@ -90,8 +121,9 @@ impl BaseSystem { bt: self.boundary_conditions, eb, time: self.time, + dt: Float::NAN, operators: self.operators, - output: todo!(), + output: (self.output, outputs), }; match &self.initial_conditions { /* @@ -120,7 +152,6 @@ impl BaseSystem { /// Spreads the computation over n threads, in a thread per grid way. pub fn create_distributed(self) -> System { let nthreads = self.grids.len(); - // let dt = self.max_dt(); // Build up the boundary conditions let mut push_channels: Vec>>>> = @@ -215,6 +246,8 @@ impl BaseSystem { communicators.push(t_send); let time = self.time; + let output = self.output.clone(); + tids.push( builder .spawn(move || { @@ -231,19 +264,48 @@ impl BaseSystem { let wb = WorkBuffers::new(ny, nx); + let g = { + let g = output.create_group(&name).unwrap(); + g.link_soft("/t", "t").unwrap(); + let add_dim = |name| { + g.new_dataset::() + .chunk((grid.ny(), grid.nx())) + .gzip(9) + .create(name, (grid.ny(), grid.nx())) + }; + let xds = add_dim("x").unwrap(); + xds.write(grid.x()).unwrap(); + let yds = add_dim("y").unwrap(); + yds.write(grid.y()).unwrap(); + + let add_var = |name| { + g.new_dataset::() + .gzip(3) + .shuffle(true) + .chunk((1, grid.ny(), grid.nx())) + .resizable(true) + .create(name, (0, grid.ny(), grid.nx())) + }; + add_var("rho").unwrap(); + add_var("rhou").unwrap(); + add_var("rhov").unwrap(); + add_var("e").unwrap(); + g + }; + let mut sys = DistributedSystemPart { current, fut, k, boundary_conditions, grid: (grid, metrics), - output: todo!(), + output: g, push, sbp, t: time, dt: Float::NAN, - name, + _name: name, id, recv: t_recv, @@ -266,6 +328,7 @@ impl BaseSystem { sys: tids, recv: master_recv, send: communicators, + output: self.output, }) } } @@ -277,13 +340,47 @@ pub enum System { impl System { pub fn max_dt(&self) -> Float { - todo!() + match self { + Self::SingleThreaded(sys) => sys.max_dt(), + Self::MultiThreaded(sys) => { + for send in &sys.send { + send.send(MsgFromHost::DtRequest).unwrap(); + } + let mut max_dt = Float::MAX; + let mut to_receive = sys.sys.len(); + while to_receive != 0 { + let dt = match sys.recv.recv().unwrap() { + (_, MsgToHost::MaxDt(dt)) => dt, + _ => unreachable!(), + }; + max_dt = max_dt.min(dt); + to_receive -= 1; + } + max_dt + } + } } - pub fn advance(&self, nsteps: u64) { - todo!() + pub fn set_dt(&mut self, dt: Float) { + match self { + Self::SingleThreaded(sys) => sys.dt = dt, + Self::MultiThreaded(sys) => { + for tid in &sys.send { + tid.send(MsgFromHost::DtSet(dt)).unwrap() + } + } + } + } + pub fn advance(&mut self, nsteps: u64) { + match self { + Self::SingleThreaded(sys) => sys.advance(nsteps), + Self::MultiThreaded(sys) => sys.advance(nsteps), + } } pub fn output(&self, ntime: u64) { - todo!() + match self { + Self::SingleThreaded(sys) => sys.output(ntime), + Self::MultiThreaded(sys) => sys.output(ntime), + } } } @@ -297,8 +394,9 @@ pub struct SingleThreadedSystem { pub bt: Vec, pub eb: Vec, pub time: Float, + pub dt: Float, pub operators: Vec>, - pub output: Vec, + pub output: (hdf5::File, Vec), } impl integrate::Integrable for SingleThreadedSystem { @@ -319,7 +417,13 @@ impl SingleThreadedSystem { } } - pub fn advance(&mut self, dt: Float) { + pub fn advance(&mut self, nsteps: u64) { + for _ in 0..nsteps { + self.advance_single_step(self.dt) + } + } + + pub fn advance_single_step(&mut self, dt: Float) { let metrics = &self.metrics; let grids = &self.grids; let bt = &self.bt; @@ -409,6 +513,34 @@ impl SingleThreadedSystem { max_dt } + + pub fn output(&self, ntime: u64) { + let tds = self.output.0.dataset("t").unwrap(); + let tpos = tds.size(); + tds.resize((tpos + 1,)).unwrap(); + tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1]) + .unwrap(); + for (group, fnow) in self.output.1.iter().zip(&self.fnow) { + let (ny, nx) = (fnow.ny(), fnow.nx()); + let rhods = group.dataset("rho").unwrap(); + let rhouds = group.dataset("rhou").unwrap(); + let rhovds = group.dataset("rhov").unwrap(); + let eds = group.dataset("e").unwrap(); + + let (rho, rhou, rhov, e) = fnow.components(); + rhods.resize((tpos + 1, ny, nx)).unwrap(); + rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap(); + + rhouds.resize((tpos + 1, ny, nx)).unwrap(); + rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap(); + + rhovds.resize((tpos + 1, ny, nx)).unwrap(); + rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap(); + + eds.resize((tpos + 1, ny, nx)).unwrap(); + eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap(); + } + } } pub struct DistributedSystem { @@ -416,23 +548,31 @@ pub struct DistributedSystem { send: Vec>, /// All threads should be joined to mark the end of the computation sys: Vec>, + output: hdf5::File, } impl DistributedSystem { pub fn advance(&mut self, ntime: u64) { for tid in &self.send { - tid.send(MsgFromHost::Advance(ntime)); + tid.send(MsgFromHost::Advance(ntime)).unwrap(); } } - pub fn output(&self) { - todo!() + pub fn output(&self, ntime: u64) { + for tid in &self.send { + tid.send(MsgFromHost::Output(ntime)).unwrap(); + } + let tds = self.output.dataset("t").unwrap(); + let tpos = tds.size(); + tds.resize((tpos + 1,)).unwrap(); + tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1]) + .unwrap(); } } impl Drop for DistributedSystem { fn drop(&mut self) { for tid in &self.send { - tid.send(MsgFromHost::Stop); + tid.send(MsgFromHost::Stop).unwrap(); } let handles = std::mem::take(&mut self.sys); for tid in handles { @@ -443,7 +583,8 @@ impl Drop for DistributedSystem { enum MsgFromHost { Advance(u64), - MaxDt(Float), + DtRequest, + DtSet(Float), Output(u64), Stop, } @@ -480,12 +621,12 @@ struct DistributedSystemPart { t: Float, dt: Float, - name: String, + _name: String, id: usize, recv: Receiver, send: Sender<(usize, MsgToHost)>, - output: hdf5::Dataset, + output: hdf5::Group, k: [Diff; 4], wb: WorkBuffers, @@ -499,16 +640,42 @@ impl DistributedSystemPart { fn run(&mut self) { loop { match self.recv.recv().unwrap() { - MsgFromHost::MaxDt(dt) => self.dt = dt, + MsgFromHost::DtSet(dt) => self.dt = dt, + MsgFromHost::DtRequest => todo!(), MsgFromHost::Advance(ntime) => self.advance(ntime), - MsgFromHost::Output(ntime) => todo!(), + MsgFromHost::Output(ntime) => self.output(ntime), MsgFromHost::Stop => return, } } } + + fn output(&mut self, _ntime: u64) { + let (ny, nx) = (self.current.ny(), self.current.nx()); + let rhods = self.output.dataset("rho").unwrap(); + let rhouds = self.output.dataset("rhou").unwrap(); + let rhovds = self.output.dataset("rhov").unwrap(); + let eds = self.output.dataset("e").unwrap(); + + let (rho, rhou, rhov, e) = self.current.components(); + let tpos = rhods.size() / (ny * nx); + rhods.resize((tpos + 1, ny, nx)).unwrap(); + rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap(); + + rhouds.resize((tpos + 1, ny, nx)).unwrap(); + rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap(); + + rhovds.resize((tpos + 1, ny, nx)).unwrap(); + rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap(); + + eds.resize((tpos + 1, ny, nx)).unwrap(); + eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap(); + } + fn advance(&mut self, ntime: u64) { for ntime in 0..ntime { - self.send.send((self.id, MsgToHost::CurrentTimestep(ntime))); + self.send + .send((self.id, MsgToHost::CurrentTimestep(ntime))) + .unwrap(); let metrics = &self.grid.1; let wb = &mut self.wb.0; let sbp = &self.sbp;