From fe2621df115b16dc5ea42481a716005b03005d07 Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Mon, 13 Apr 2020 13:31:01 +0200 Subject: [PATCH] move file handling to separate module --- multigrid/src/file.rs | 153 +++++++++++++++++++++++++++++++++++++++++ multigrid/src/main.rs | 155 +----------------------------------------- 2 files changed, 156 insertions(+), 152 deletions(-) create mode 100644 multigrid/src/file.rs diff --git a/multigrid/src/file.rs b/multigrid/src/file.rs new file mode 100644 index 0000000..7aee9e9 --- /dev/null +++ b/multigrid/src/file.rs @@ -0,0 +1,153 @@ +use super::*; + +pub struct OutputThread { + rx: Option>>, + tx: Option)>>, + thread: Option>, +} + +impl OutputThread { + pub fn new(file: File) -> Self { + // Pingpong back and forth a number of Vec to be used for the + // output. The sync_channel applies some backpressure + let (tx_thread, rx) = std::sync::mpsc::channel::>(); + let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec)>(3); + let thread = std::thread::Builder::new() + .name("multigrid_output".to_owned()) + .spawn(move || { + let mut times = Vec::::new(); + + for (ntime, fields) in rx_thread.iter() { + if !times.contains(&ntime) { + file.add_timestep(ntime, fields.as_slice()).unwrap(); + times.push(ntime); + } + tx_thread.send(fields).unwrap(); + } + }) + .unwrap(); + + Self { + tx: Some(tx), + rx: Some(rx), + thread: Some(thread), + } + } + + pub fn add_timestep(&mut self, ntime: u64, fields: &[euler::Field]) { + match self.rx.as_ref().unwrap().try_recv() { + Ok(mut copy_fields) => { + for (from, to) in fields.iter().zip(copy_fields.iter_mut()) { + to.assign(&from); + } + self.tx + .as_ref() + .unwrap() + .send((ntime, copy_fields)) + .unwrap(); + } + Err(std::sync::mpsc::TryRecvError::Empty) => { + let fields = fields.to_vec(); + self.tx.as_ref().unwrap().send((ntime, fields)).unwrap(); + } + Err(e) => panic!("{:?}", e), + }; + } +} + +impl Drop for OutputThread { + fn drop(&mut self) { + let tx = self.tx.take(); + std::mem::drop(tx); + let thread = self.thread.take().unwrap(); + thread.join().unwrap(); + } +} + +#[derive(Debug, Clone)] +pub struct File(hdf5::File); + +impl File { + pub fn create>( + path: P, + grids: &[sbp::grid::Grid], + ) -> Result> { + let file = hdf5::File::create(path.as_ref())?; + let _tds = file + .new_dataset::() + .resizable(true) + .chunk((1,)) + .create("t", (0,))?; + + for (i, grid) in grids.iter().enumerate() { + let g = file.create_group(&i.to_string())?; + g.link_soft("/t", "t").unwrap(); + + let add_dim = |name| { + g.new_dataset::() + .chunk((grid.ny(), grid.nx())) + .gzip(9) + .create(name, (grid.ny(), grid.nx())) + }; + let xds = add_dim("x")?; + xds.write(grid.x())?; + let yds = add_dim("y")?; + yds.write(grid.y())?; + + let add_var = |name| { + g.new_dataset::() + .gzip(3) + .shuffle(true) + .chunk((1, grid.ny(), grid.nx())) + .resizable_idx(&[true, false, false]) + .create(name, (0, grid.ny(), grid.nx())) + }; + add_var("rho")?; + add_var("rhou")?; + add_var("rhov")?; + add_var("e")?; + } + + Ok(Self(file)) + } + + pub fn add_timestep( + &self, + t: u64, + fields: &[euler::Field], + ) -> Result<(), Box> { + let file = &self.0; + let tds = file.dataset("t")?; + let tpos = tds.size(); + tds.resize((tpos + 1,))?; + tds.write_slice(&[t], ndarray::s![tpos..tpos + 1])?; + + for (i, fnow) in fields.iter().enumerate() { + let g = file.group(&i.to_string())?; + let (tpos, ny, nx) = { + let ds = g.dataset("rho")?; + let shape = ds.shape(); + (shape[0], shape[1], shape[2]) + }; + + let rhods = g.dataset("rho")?; + let rhouds = g.dataset("rhou")?; + let rhovds = g.dataset("rhov")?; + let eds = g.dataset("e")?; + + let (rho, rhou, rhov, e) = fnow.components(); + rhods.resize((tpos + 1, ny, nx))?; + rhods.write_slice(rho, ndarray::s![tpos, .., ..])?; + + rhouds.resize((tpos + 1, ny, nx))?; + rhouds.write_slice(rhou, ndarray::s![tpos, .., ..])?; + + rhovds.resize((tpos + 1, ny, nx))?; + rhovds.write_slice(rhov, ndarray::s![tpos, .., ..])?; + + eds.resize((tpos + 1, ny, nx))?; + eds.write_slice(e, ndarray::s![tpos, .., ..])?; + } + Ok(()) + } +} diff --git a/multigrid/src/main.rs b/multigrid/src/main.rs index 30a517d..4f2684f 100644 --- a/multigrid/src/main.rs +++ b/multigrid/src/main.rs @@ -3,6 +3,9 @@ use sbp::utils::json_to_grids; use sbp::*; use structopt::StructOpt; +mod file; +use file::*; + struct System { fnow: Vec, fnext: Vec, @@ -339,155 +342,3 @@ fn progressbar(dummy: bool, ntime: u64) -> indicatif::ProgressBar { ) } } - -struct OutputThread { - rx: Option>>, - tx: Option)>>, - thread: Option>, -} - -impl OutputThread { - fn new(file: File) -> Self { - // Pingpong back and forth a number of Vec to be used for the - // output. The sync_channel applies some backpressure - let (tx_thread, rx) = std::sync::mpsc::channel::>(); - let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec)>(3); - let thread = std::thread::Builder::new() - .name("multigrid_output".to_owned()) - .spawn(move || { - let mut times = Vec::::new(); - - for (ntime, fields) in rx_thread.iter() { - if !times.contains(&ntime) { - file.add_timestep(ntime, fields.as_slice()).unwrap(); - times.push(ntime); - } - tx_thread.send(fields).unwrap(); - } - }) - .unwrap(); - - Self { - tx: Some(tx), - rx: Some(rx), - thread: Some(thread), - } - } - - fn add_timestep(&mut self, ntime: u64, fields: &[euler::Field]) { - match self.rx.as_ref().unwrap().try_recv() { - Ok(mut copy_fields) => { - for (from, to) in fields.iter().zip(copy_fields.iter_mut()) { - to.assign(&from); - } - self.tx - .as_ref() - .unwrap() - .send((ntime, copy_fields)) - .unwrap(); - } - Err(std::sync::mpsc::TryRecvError::Empty) => { - let fields = fields.to_vec(); - self.tx.as_ref().unwrap().send((ntime, fields)).unwrap(); - } - Err(e) => panic!("{:?}", e), - }; - } -} - -impl Drop for OutputThread { - fn drop(&mut self) { - let tx = self.tx.take(); - std::mem::drop(tx); - let thread = self.thread.take().unwrap(); - thread.join().unwrap(); - } -} - -#[derive(Debug, Clone)] -struct File(hdf5::File); - -impl File { - fn create>( - path: P, - grids: &[sbp::grid::Grid], - ) -> Result> { - let file = hdf5::File::create(path.as_ref())?; - let _tds = file - .new_dataset::() - .resizable(true) - .chunk((1,)) - .create("t", (0,))?; - - for (i, grid) in grids.iter().enumerate() { - let g = file.create_group(&i.to_string())?; - g.link_soft("/t", "t").unwrap(); - - let add_dim = |name| { - g.new_dataset::() - .chunk((grid.ny(), grid.nx())) - .gzip(9) - .create(name, (grid.ny(), grid.nx())) - }; - let xds = add_dim("x")?; - xds.write(grid.x())?; - let yds = add_dim("y")?; - yds.write(grid.y())?; - - let add_var = |name| { - g.new_dataset::() - .gzip(3) - .shuffle(true) - .chunk((1, grid.ny(), grid.nx())) - .resizable_idx(&[true, false, false]) - .create(name, (0, grid.ny(), grid.nx())) - }; - add_var("rho")?; - add_var("rhou")?; - add_var("rhov")?; - add_var("e")?; - } - - Ok(Self(file)) - } - - fn add_timestep( - &self, - t: u64, - fields: &[euler::Field], - ) -> Result<(), Box> { - let file = &self.0; - let tds = file.dataset("t")?; - let tpos = tds.size(); - tds.resize((tpos + 1,))?; - tds.write_slice(&[t], ndarray::s![tpos..tpos + 1])?; - - for (i, fnow) in fields.iter().enumerate() { - let g = file.group(&i.to_string())?; - let (tpos, ny, nx) = { - let ds = g.dataset("rho")?; - let shape = ds.shape(); - (shape[0], shape[1], shape[2]) - }; - - let rhods = g.dataset("rho")?; - let rhouds = g.dataset("rhou")?; - let rhovds = g.dataset("rhov")?; - let eds = g.dataset("e")?; - - let (rho, rhou, rhov, e) = fnow.components(); - rhods.resize((tpos + 1, ny, nx))?; - rhods.write_slice(rho, ndarray::s![tpos, .., ..])?; - - rhouds.resize((tpos + 1, ny, nx))?; - rhouds.write_slice(rhou, ndarray::s![tpos, .., ..])?; - - rhovds.resize((tpos + 1, ny, nx))?; - rhovds.write_slice(rhov, ndarray::s![tpos, .., ..])?; - - eds.resize((tpos + 1, ny, nx))?; - eds.write_slice(e, ndarray::s![tpos, .., ..])?; - } - Ok(()) - } -}