diff --git a/euler/src/eval.rs b/euler/src/eval.rs index 5e4c7cc..02344f6 100644 --- a/euler/src/eval.rs +++ b/euler/src/eval.rs @@ -5,6 +5,7 @@ use super::GAMMA; use ndarray::{azip, ArrayView, ArrayViewMut, Dimension}; pub trait Evaluator: Send + Sync { + #[allow(clippy::too_many_arguments)] fn evaluate( &self, t: Float, @@ -55,6 +56,7 @@ pub trait EvaluatorPressure: Send + Sync { rho: ArrayView, out: ArrayViewMut, ); + #[allow(clippy::too_many_arguments)] fn p( &self, t: Float, @@ -70,6 +72,7 @@ pub trait EvaluatorPressure: Send + Sync { impl<'a, D: Dimension, BP: EvaluatorPressure> Evaluator for EvaluatorPressureWrapper<'a, D, BP> { + #[allow(clippy::many_single_char_names)] fn evaluate( &self, t: Float, diff --git a/euler/src/lib.rs b/euler/src/lib.rs index 32e299c..e4cf538 100644 --- a/euler/src/lib.rs +++ b/euler/src/lib.rs @@ -301,22 +301,18 @@ impl Field { pub fn west(&self) -> ArrayView2 { self.slice(s![.., .., 0]) } - #[allow(unused)] - fn north_mut(&mut self) -> ArrayViewMut2 { + pub fn north_mut(&mut self) -> ArrayViewMut2 { let ny = self.ny(); self.slice_mut(s![.., ny - 1, ..]) } - #[allow(unused)] - fn south_mut(&mut self) -> ArrayViewMut2 { + pub fn south_mut(&mut self) -> ArrayViewMut2 { self.slice_mut(s![.., 0, ..]) } - #[allow(unused)] - fn east_mut(&mut self) -> ArrayViewMut2 { + pub fn east_mut(&mut self) -> ArrayViewMut2 { let nx = self.nx(); self.slice_mut(s![.., .., nx - 1]) } - #[allow(unused)] - fn west_mut(&mut self) -> ArrayViewMut2 { + pub fn west_mut(&mut self) -> ArrayViewMut2 { self.slice_mut(s![.., .., 0]) } @@ -330,6 +326,7 @@ impl Field { let (rho, rhou, rhov, e) = self.components_mut(); vortex_param.evaluate(time, x, y, rho, rhou, rhov, e) } + #[allow(clippy::erasing_op, clippy::identity_op)] fn iter(&self) -> impl ExactSizeIterator + '_ { let n = self.nx() * self.ny(); let slice = self.0.as_slice().unwrap(); @@ -462,18 +459,18 @@ impl Diff { pub fn zeros((ny, nx): (usize, usize)) -> Self { Self(Array3::zeros((4, ny, nx))) } - fn north_mut(&mut self) -> ArrayViewMut2 { + pub fn north_mut(&mut self) -> ArrayViewMut2 { let ny = self.shape()[1]; self.0.slice_mut(s![.., ny - 1, ..]) } - fn south_mut(&mut self) -> ArrayViewMut2 { + pub fn south_mut(&mut self) -> ArrayViewMut2 { self.0.slice_mut(s![.., 0, ..]) } - fn east_mut(&mut self) -> ArrayViewMut2 { + pub fn east_mut(&mut self) -> ArrayViewMut2 { let nx = self.shape()[2]; self.0.slice_mut(s![.., .., nx - 1]) } - fn west_mut(&mut self) -> ArrayViewMut2 { + pub fn west_mut(&mut self) -> ArrayViewMut2 { self.0.slice_mut(s![.., .., 0]) } } @@ -517,6 +514,59 @@ pub fn RHS_trad( SAT_characteristics(op, k, y, metrics, boundaries); } +#[allow(non_snake_case)] +pub fn RHS_no_SAT( + op: &dyn SbpOperator2d, + k: &mut Diff, + y: &Field, + metrics: &Metrics, + tmp: &mut (Field, Field, Field, Field, Field, Field), +) { + let ehat = &mut tmp.0; + let fhat = &mut tmp.1; + fluxes((ehat, fhat), y, metrics, &mut tmp.2); + let dE = &mut tmp.2; + let dF = &mut tmp.3; + + op.diffxi(ehat.rho(), dE.rho_mut()); + op.diffxi(ehat.rhou(), dE.rhou_mut()); + op.diffxi(ehat.rhov(), dE.rhov_mut()); + op.diffxi(ehat.e(), dE.e_mut()); + + op.diffeta(fhat.rho(), dF.rho_mut()); + op.diffeta(fhat.rhou(), dF.rhou_mut()); + op.diffeta(fhat.rhov(), dF.rhov_mut()); + op.diffeta(fhat.e(), dF.e_mut()); + + if let Some(diss_op) = op.upwind() { + let ad_xi = &mut tmp.4; + let ad_eta = &mut tmp.5; + upwind_dissipation( + &*diss_op, + (ad_xi, ad_eta), + y, + metrics, + (&mut tmp.0, &mut tmp.1), + ); + + azip!((out in &mut k.0, + eflux in &dE.0, + fflux in &dF.0, + ad_xi in &ad_xi.0, + ad_eta in &ad_eta.0, + detj in &metrics.detj().broadcast((4, y.ny(), y.nx())).unwrap()) { + *out = (-eflux - fflux + ad_xi + ad_eta)/detj + }); + } else { + azip!((out in &mut k.0, + eflux in &dE.0, + fflux in &dF.0, + detj in &metrics.detj().broadcast((4, y.ny(), y.nx())).unwrap()) { + *out = (-eflux - fflux )/detj + }); + } +} + #[allow(non_snake_case)] pub fn RHS_upwind( op: &dyn SbpOperator2d, @@ -949,104 +999,148 @@ fn vortexify( #[allow(non_snake_case)] /// Boundary conditions (SAT) -fn SAT_characteristics( +pub fn SAT_characteristics( op: &dyn SbpOperator2d, k: &mut Diff, y: &Field, metrics: &Metrics, boundaries: &BoundaryTerms, +) { + SAT_north(op, k.north_mut(), y, metrics, boundaries.north); + SAT_south(op, k.south_mut(), y, metrics, boundaries.south); + SAT_east(op, k.east_mut(), y, metrics, boundaries.east); + SAT_west(op, k.west_mut(), y, metrics, boundaries.west); +} + +pub const SAT_FUNCTIONS: Direction< + fn(&dyn SbpOperator2d, ArrayViewMut2, &Field, &Metrics, ArrayView2), +> = Direction { + north: SAT_north, + south: SAT_south, + west: SAT_west, + east: SAT_east, +}; + +#[allow(non_snake_case)] +pub fn SAT_north( + op: &dyn SbpOperator2d, + k: ArrayViewMut2, + y: &Field, + metrics: &Metrics, + boundary: ArrayView2, ) { let ny = y.ny(); + + let hi = if op.is_h2eta() { + (ny - 2) as Float / op.heta()[0] + } else { + (ny - 1) as Float / op.heta()[0] + }; + let sign = -1.0; + let tau = 1.0; + let slice = s![y.ny() - 1, ..]; + SAT_characteristic( + k, + y.north(), + boundary, + hi, + sign, + tau, + metrics.detj().slice(slice), + metrics.detj_deta_dx().slice(slice), + metrics.detj_deta_dy().slice(slice), + ); +} + +#[allow(non_snake_case)] +pub fn SAT_south( + op: &dyn SbpOperator2d, + k: ArrayViewMut2, + y: &Field, + metrics: &Metrics, + boundary: ArrayView2, +) { + let ny = y.ny(); + let hi = if op.is_h2eta() { + (ny - 2) as Float / op.heta()[0] + } else { + (ny - 1) as Float / op.heta()[0] + }; + let sign = 1.0; + let tau = -1.0; + let slice = s![0, ..]; + SAT_characteristic( + k, + y.south(), + boundary, + hi, + sign, + tau, + metrics.detj().slice(slice), + metrics.detj_deta_dx().slice(slice), + metrics.detj_deta_dy().slice(slice), + ); +} + +#[allow(non_snake_case)] +pub fn SAT_west( + op: &dyn SbpOperator2d, + k: ArrayViewMut2, + y: &Field, + metrics: &Metrics, + boundary: ArrayView2, +) { let nx = y.nx(); - // North boundary - { - let hi = if op.is_h2eta() { - (ny - 2) as Float / op.heta()[0] - } else { - (ny - 1) as Float / op.heta()[0] - }; - let sign = -1.0; - let tau = 1.0; - let slice = s![y.ny() - 1, ..]; - SAT_characteristic( - k.north_mut(), - y.north(), - boundaries.north, - hi, - sign, - tau, - metrics.detj().slice(slice), - metrics.detj_deta_dx().slice(slice), - metrics.detj_deta_dy().slice(slice), - ); - } - // South boundary - { - let hi = if op.is_h2eta() { - (ny - 2) as Float / op.heta()[0] - } else { - (ny - 1) as Float / op.heta()[0] - }; - let sign = 1.0; - let tau = -1.0; - let slice = s![0, ..]; - SAT_characteristic( - k.south_mut(), - y.south(), - boundaries.south, - hi, - sign, - tau, - metrics.detj().slice(slice), - metrics.detj_deta_dx().slice(slice), - metrics.detj_deta_dy().slice(slice), - ); - } - // West Boundary - { - let hi = if op.is_h2xi() { - (nx - 2) as Float / op.hxi()[0] - } else { - (nx - 1) as Float / op.hxi()[0] - }; - let sign = 1.0; - let tau = -1.0; - let slice = s![.., 0]; - SAT_characteristic( - k.west_mut(), - y.west(), - boundaries.west, - hi, - sign, - tau, - metrics.detj().slice(slice), - metrics.detj_dxi_dx().slice(slice), - metrics.detj_dxi_dy().slice(slice), - ); - } - // East Boundary - { - let hi = if op.is_h2xi() { - (nx - 2) as Float / op.hxi()[0] - } else { - (nx - 1) as Float / op.hxi()[0] - }; - let sign = -1.0; - let tau = 1.0; - let slice = s![.., y.nx() - 1]; - SAT_characteristic( - k.east_mut(), - y.east(), - boundaries.east, - hi, - sign, - tau, - metrics.detj().slice(slice), - metrics.detj_dxi_dx().slice(slice), - metrics.detj_dxi_dy().slice(slice), - ); - } + let hi = if op.is_h2xi() { + (nx - 2) as Float / op.hxi()[0] + } else { + (nx - 1) as Float / op.hxi()[0] + }; + let sign = 1.0; + let tau = -1.0; + let slice = s![.., 0]; + SAT_characteristic( + k, + y.west(), + boundary, + hi, + sign, + tau, + metrics.detj().slice(slice), + metrics.detj_dxi_dx().slice(slice), + metrics.detj_dxi_dy().slice(slice), + ); +} + +#[allow(non_snake_case)] +pub fn SAT_east( + op: &dyn SbpOperator2d, + k: ArrayViewMut2, + y: &Field, + metrics: &Metrics, + boundary: ArrayView2, +) { + let nx = y.nx(); + let hi = if op.is_h2xi() { + (nx - 2) as Float / op.hxi()[0] + } else { + (nx - 1) as Float / op.hxi()[0] + }; + let sign = -1.0; + let tau = 1.0; + let slice = s![.., y.nx() - 1]; + SAT_characteristic( + k, + y.east(), + boundary, + hi, + sign, + tau, + metrics.detj().slice(slice), + metrics.detj_dxi_dx().slice(slice), + metrics.detj_dxi_dy().slice(slice), + ); } #[allow(non_snake_case)] diff --git a/multigrid/Cargo.toml b/multigrid/Cargo.toml index 91490c4..01dc0df 100644 --- a/multigrid/Cargo.toml +++ b/multigrid/Cargo.toml @@ -11,10 +11,15 @@ euler = { path = "../euler", features = ["serde1"] } hdf5 = "0.7.0" integrate = { path = "../utils/integrate" } rayon = "1.3.0" -indicatif = "0.15.0" +indicatif = "0.17.0-beta.1" ndarray = { version = "0.14.0", features = ["serde"] } serde = { version = "1.0.115", features = ["derive"] } json5 = "0.3.0" indexmap = { version = "1.5.2", features = ["serde-1"] } argh = "0.1.4" evalexpr = "6.3.0" +crossbeam-channel = "0.5.0" +crossbeam-utils = "0.8.5" +parking_lot = "0.11.1" +lock_api = "0.4.4" +arrayvec = "0.7.1" diff --git a/multigrid/src/eval.rs b/multigrid/src/eval.rs index 8b4b903..52c0857 100644 --- a/multigrid/src/eval.rs +++ b/multigrid/src/eval.rs @@ -10,6 +10,7 @@ pub enum Evaluator { } impl euler::eval::Evaluator for Evaluator { + #[allow(clippy::many_single_char_names)] fn evaluate( &self, t: Float, diff --git a/multigrid/src/eval/evalexpr.rs b/multigrid/src/eval/evalexpr.rs index e999252..ea50b71 100644 --- a/multigrid/src/eval/evalexpr.rs +++ b/multigrid/src/eval/evalexpr.rs @@ -79,6 +79,7 @@ pub struct EvaluatorConservation { } impl euler::eval::Evaluator for Evaluator { + #[allow(clippy::many_single_char_names)] fn evaluate( &self, t: Float, @@ -267,6 +268,7 @@ impl euler::eval::EvaluatorPressure for EvaluatorPressure { }) } + #[allow(clippy::many_single_char_names)] fn p( &self, t: Float, diff --git a/multigrid/src/file.rs b/multigrid/src/file.rs deleted file mode 100644 index 6be9569..0000000 --- a/multigrid/src/file.rs +++ /dev/null @@ -1,155 +0,0 @@ -use super::*; - -pub struct OutputThread { - rx: Option>>, - tx: Option)>>, - thread: Option>, -} - -impl OutputThread { - pub fn new(file: File) -> Self { - // Pingpong back and forth a number of Vec to be used for the - // output. The sync_channel applies some backpressure - let (tx_thread, rx) = std::sync::mpsc::channel::>(); - let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec)>(3); - let thread = std::thread::Builder::new() - .name("multigrid_output".to_owned()) - .spawn(move || { - let mut times = Vec::::new(); - - for (ntime, fields) in rx_thread.iter() { - if !times.contains(&ntime) { - file.add_timestep(ntime, fields.as_slice()).unwrap(); - times.push(ntime); - } - tx_thread.send(fields).unwrap(); - } - }) - .unwrap(); - - Self { - tx: Some(tx), - rx: Some(rx), - thread: Some(thread), - } - } - - pub fn add_timestep(&mut self, ntime: u64, fields: &[euler::Field]) { - match self.rx.as_ref().unwrap().try_recv() { - Ok(mut copy_fields) => { - for (from, to) in fields.iter().zip(copy_fields.iter_mut()) { - to.clone_from(&from); - } - self.tx - .as_ref() - .unwrap() - .send((ntime, copy_fields)) - .unwrap(); - } - Err(std::sync::mpsc::TryRecvError::Empty) => { - let fields = fields.to_vec(); - self.tx.as_ref().unwrap().send((ntime, fields)).unwrap(); - } - Err(e) => panic!("{:?}", e), - }; - } -} - -impl Drop for OutputThread { - fn drop(&mut self) { - let tx = self.tx.take(); - std::mem::drop(tx); - let thread = self.thread.take().unwrap(); - thread.join().unwrap(); - } -} - -#[derive(Debug, Clone)] -pub struct File(hdf5::File, Vec); - -impl File { - pub fn create>( - path: P, - grids: &[sbp::grid::Grid], - names: Vec, - ) -> Result> { - assert_eq!(grids.len(), names.len()); - let file = hdf5::File::create(path.as_ref())?; - let _tds = file - .new_dataset::() - .resizable(true) - .chunk((1,)) - .create("t", (0,))?; - - for (name, grid) in names.iter().zip(grids.iter()) { - let g = file.create_group(name)?; - g.link_soft("/t", "t").unwrap(); - - let add_dim = |name| { - g.new_dataset::() - .chunk((grid.ny(), grid.nx())) - .gzip(9) - .create(name, (grid.ny(), grid.nx())) - }; - let xds = add_dim("x")?; - xds.write(grid.x())?; - let yds = add_dim("y")?; - yds.write(grid.y())?; - - let add_var = |name| { - g.new_dataset::() - .gzip(3) - .shuffle(true) - .chunk((1, grid.ny(), grid.nx())) - .resizable(true) - .create(name, (0, grid.ny(), grid.nx())) - }; - add_var("rho")?; - add_var("rhou")?; - add_var("rhov")?; - add_var("e")?; - } - - Ok(Self(file, names)) - } - - pub fn add_timestep( - &self, - t: u64, - fields: &[euler::Field], - ) -> Result<(), Box> { - let file = &self.0; - let tds = file.dataset("t")?; - let tpos = tds.size(); - tds.resize((tpos + 1,))?; - tds.write_slice(&[t], ndarray::s![tpos..tpos + 1])?; - - for (groupname, fnow) in self.1.iter().zip(fields.iter()) { - let g = file.group(groupname)?; - let (tpos, ny, nx) = { - let ds = g.dataset("rho")?; - let shape = ds.shape(); - (shape[0], shape[1], shape[2]) - }; - - let rhods = g.dataset("rho")?; - let rhouds = g.dataset("rhou")?; - let rhovds = g.dataset("rhov")?; - let eds = g.dataset("e")?; - - let (rho, rhou, rhov, e) = fnow.components(); - rhods.resize((tpos + 1, ny, nx))?; - rhods.write_slice(rho, ndarray::s![tpos, .., ..])?; - - rhouds.resize((tpos + 1, ny, nx))?; - rhouds.write_slice(rhou, ndarray::s![tpos, .., ..])?; - - rhovds.resize((tpos + 1, ny, nx))?; - rhovds.write_slice(rhov, ndarray::s![tpos, .., ..])?; - - eds.resize((tpos + 1, ny, nx))?; - eds.write_slice(e, ndarray::s![tpos, .., ..])?; - } - Ok(()) - } -} diff --git a/multigrid/src/main.rs b/multigrid/src/main.rs index c3b3894..9b83ecd 100644 --- a/multigrid/src/main.rs +++ b/multigrid/src/main.rs @@ -1,225 +1,17 @@ use argh::FromArgs; -use rayon::prelude::*; -use euler::eval::Evaluator; -use sbp::operators::SbpOperator2d; use sbp::*; -mod file; +mod eval; mod input; mod parsing; -use file::*; -mod eval; - -struct System { - fnow: Vec, - fnext: Vec, - wb: Vec, - k: [Vec; 4], - grids: Vec, - metrics: Vec, - bt: Vec, - eb: Vec, - time: Float, - operators: Vec>, -} - -use std::sync::atomic::{AtomicBool, Ordering}; -pub(crate) static MULTITHREAD: AtomicBool = AtomicBool::new(false); - -impl integrate::Integrable for System { - type State = Vec; - type Diff = Vec; - - fn scaled_add(s: &mut Self::State, o: &Self::Diff, scale: Float) { - if MULTITHREAD.load(Ordering::Acquire) { - s.par_iter_mut() - .zip(o.par_iter()) - .for_each(|(s, o)| euler::Field::scaled_add(s, o, scale)) - } else { - s.iter_mut() - .zip(o.iter()) - .for_each(|(s, o)| euler::Field::scaled_add(s, o, scale)) - } - } -} - -impl System { - fn new( - grids: Vec, - bt: Vec, - operators: Vec>, - ) -> Self { - let fnow = grids - .iter() - .map(|g| euler::Field::new(g.ny(), g.nx())) - .collect::>(); - let fnext = fnow.clone(); - let wb = grids - .iter() - .map(|g| euler::WorkBuffers::new(g.ny(), g.nx())) - .collect(); - let k = grids - .iter() - .map(|g| euler::Diff::zeros((g.ny(), g.nx()))) - .collect::>(); - let k = [k.clone(), k.clone(), k.clone(), k]; - let metrics = grids - .iter() - .zip(&operators) - .map(|(g, op)| g.metrics(&**op).unwrap()) - .collect::>(); - - let eb = bt - .iter() - .zip(&grids) - .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) - .collect(); - - Self { - fnow, - fnext, - k, - wb, - grids, - metrics, - bt, - eb, - time: 0.0, - operators, - } - } - - fn vortex(&mut self, t: Float, vortex_params: &euler::VortexParameters) { - for (f, g) in self.fnow.iter_mut().zip(&self.grids) { - f.vortex(g.x(), g.y(), t, &vortex_params); - } - } - - fn advance(&mut self, dt: Float) { - let metrics = &self.metrics; - let grids = &self.grids; - let bt = &self.bt; - let wb = &mut self.wb; - let eb = &mut self.eb; - let operators = &self.operators; - - let rhs = move |fut: &mut Vec, prev: &Vec, time: Float| { - let prev_all = &prev; - if MULTITHREAD.load(Ordering::Acquire) { - rayon::scope(|s| { - for (((((((fut, prev), wb), grid), metrics), op), bt), eb) in fut - .iter_mut() - .zip(prev.iter()) - .zip(wb.iter_mut()) - .zip(grids) - .zip(metrics.iter()) - .zip(operators.iter()) - .zip(bt.iter()) - .zip(eb.iter_mut()) - { - s.spawn(move |_| { - let bc = euler::boundary_extracts(prev_all, bt, prev, grid, eb, time); - if op.upwind().is_some() { - euler::RHS_upwind(&**op, fut, prev, metrics, &bc, &mut wb.0); - } else { - euler::RHS_trad(&**op, fut, prev, metrics, &bc, &mut wb.0); - } - }) - } - }); - } else { - for (((((((fut, prev), wb), grid), metrics), op), bt), eb) in fut - .iter_mut() - .zip(prev.iter()) - .zip(wb.iter_mut()) - .zip(grids) - .zip(metrics.iter()) - .zip(operators.iter()) - .zip(bt.iter()) - .zip(eb.iter_mut()) - { - let bc = euler::boundary_extracts(prev_all, bt, prev, grid, eb, time); - if op.upwind().is_some() { - euler::RHS_upwind(&**op, fut, prev, metrics, &bc, &mut wb.0); - } else { - euler::RHS_trad(&**op, fut, prev, metrics, &bc, &mut wb.0); - } - } - } - }; - - integrate::integrate::( - rhs, - &self.fnow, - &mut self.fnext, - &mut self.time, - dt, - &mut self.k, - ); - - std::mem::swap(&mut self.fnow, &mut self.fnext); - } - - /// Suggested maximum dt for this problem - fn max_dt(&self) -> Float { - let is_h2 = self - .operators - .iter() - .any(|op| op.is_h2xi() || op.is_h2eta()); - let c_max = if is_h2 { 0.5 } else { 1.0 }; - let mut max_dt: Float = Float::INFINITY; - - for (field, metrics) in self.fnow.iter().zip(self.metrics.iter()) { - let nx = field.nx(); - let ny = field.ny(); - - let rho = field.rho(); - let rhou = field.rhou(); - let rhov = field.rhov(); - - let mut max_u: Float = 0.0; - let mut max_v: Float = 0.0; - - for ((((((rho, rhou), rhov), detj_dxi_dx), detj_dxi_dy), detj_deta_dx), detj_deta_dy) in - rho.iter() - .zip(rhou.iter()) - .zip(rhov.iter()) - .zip(metrics.detj_dxi_dx()) - .zip(metrics.detj_dxi_dy()) - .zip(metrics.detj_deta_dx()) - .zip(metrics.detj_deta_dy()) - { - let u = rhou / rho; - let v = rhov / rho; - - let uhat: Float = detj_dxi_dx * u + detj_dxi_dy * v; - let vhat: Float = detj_deta_dx * u + detj_deta_dy * v; - - max_u = max_u.max(uhat.abs()); - max_v = max_v.max(vhat.abs()); - } - - let dx = 1.0 / nx as Float; - let dy = 1.0 / ny as Float; - - let c_dt = Float::max(max_u / dx, max_v / dy); - - max_dt = Float::min(max_dt, c_max / c_dt); - } - - max_dt - } -} +mod system; #[derive(Debug, FromArgs)] /// Options for configuring and running the solver struct CliOptions { #[argh(positional)] json: std::path::PathBuf, - /// number of simultaneous threads - #[argh(option, short = 'j')] - jobs: Option, /// name of output file #[argh( option, @@ -243,6 +35,9 @@ struct CliOptions { /// in json format #[argh(switch)] output_json: bool, + /// distribute the computation on multiple threads + #[argh(switch)] + distribute: bool, } #[derive(Default, serde::Serialize)] @@ -272,116 +67,87 @@ fn main() { return; } }; - let parsing::RuntimeConfiguration { names, grids, - grid_connections, + boundary_conditions, op: operators, integration_time, initial_conditions, - boundary_conditions: _, } = config.into_runtime(); - let mut sys = System::new(grids, grid_connections, operators); - match &initial_conditions { - /* - parsing::InitialConditions::File(f) => { - for grid in &sys.grids { - // Copy initial conditions from file, requires name of field - todo!() - } - } - */ - parsing::InitialConditions::Vortex(vortexparams) => sys.vortex(0.0, &vortexparams), - parsing::InitialConditions::Expressions(expr) => { - let t = 0.0; - for (grid, field) in sys.grids.iter().zip(sys.fnow.iter_mut()) { - // Evaluate the expressions on all variables - let x = grid.x(); - let y = grid.y(); - let (rho, rhou, rhov, e) = field.components_mut(); - (*expr).evaluate(t, x, y, rho, rhou, rhov, e); - } - } - } + let basesystem = system::BaseSystem::new( + names, + grids, + 0.0, + operators, + boundary_conditions, + initial_conditions, + opt.output.clone(), + ); - let dt = sys.max_dt(); - - let ntime = (integration_time / dt).round() as u64; - - { - let nthreads = opt.jobs.unwrap_or(1); - if nthreads > 1 { - MULTITHREAD.store(true, Ordering::Release); - rayon::ThreadPoolBuilder::new() - .num_threads(nthreads) - .build_global() - .unwrap(); - } - } - - let should_output = |itime| { - opt.number_of_outputs.map_or(false, |num_out| { - if num_out == 0 { - false - } else { - itime % (std::cmp::max(ntime / (num_out - 1), 1)) == 0 - } - }) + let mut sys = if opt.distribute { + basesystem.create_distributed() + } else { + basesystem.create() }; - let output = File::create(&opt.output, sys.grids.as_slice(), names).unwrap(); - let mut output = OutputThread::new(output); - output.add_timestep(0, &sys.fnow); + let dt = sys.max_dt(); + sys.set_dt(dt); + let ntime = (integration_time / dt).round() as u64; + let steps_between_outputs = if let Some(n) = opt.number_of_outputs { + std::cmp::max(n / ntime, 1) + } else { + ntime + }; - let progressbar = progressbar(opt.no_progressbar, ntime); + sys.output(0); + + if !opt.no_progressbar { + sys.add_progressbar(ntime) + } let timer = if opt.timings { + if let system::System::MultiThreaded(sys) = &sys { + sys.synchronise() + } Some(std::time::Instant::now()) } else { None }; - for itime in 0..ntime { - if should_output(itime) { - output.add_timestep(itime, &sys.fnow); + let mut itime = 0; + while itime < ntime { + let nexttime = (itime + steps_between_outputs).max(ntime); + sys.advance(nexttime - itime); + + itime = nexttime; + if itime != ntime { + sys.output(itime); } - progressbar.inc(1); - sys.advance(dt); } - progressbar.finish_and_clear(); + + let timer = timer.map(|timer| { + if let system::System::MultiThreaded(sys) = &sys { + sys.synchronise(); + } + + timer.elapsed() + }); + sys.output(ntime); let mut outinfo = OutputInformation { filename: opt.output, + time_elapsed: timer, ..Default::default() }; - if let Some(timer) = timer { - let duration = timer.elapsed(); - outinfo.time_elapsed = Some(duration); + if !opt.no_progressbar { + sys.finish_progressbar(); } - output.add_timestep(ntime, &sys.fnow); - if opt.error { - let time = ntime as Float * dt; - let mut e = 0.0; - for ((fmod, grid), op) in sys.fnow.iter().zip(&sys.grids).zip(&sys.operators) { - let mut fvort = fmod.clone(); - match &initial_conditions { - parsing::InitialConditions::Vortex(vortexparams) => { - fvort.vortex(grid.x(), grid.y(), time, &vortexparams); - } - parsing::InitialConditions::Expressions(expr) => { - let (rho, rhou, rhov, e) = fvort.components_mut(); - expr.as_ref() - .evaluate(time, grid.x(), grid.y(), rho, rhou, rhov, e) - } - } - e += fmod.h2_err(&fvort, &**op); - } - outinfo.error = Some(e); + outinfo.error = Some(sys.error()) } if opt.output_json { @@ -396,14 +162,10 @@ fn main() { } } -fn progressbar(dummy: bool, ntime: u64) -> indicatif::ProgressBar { - if dummy { - indicatif::ProgressBar::hidden() - } else { - let progressbar = indicatif::ProgressBar::new(ntime); - progressbar.with_style( - indicatif::ProgressStyle::default_bar() - .template("{wide_bar:.cyan/blue} {pos}/{len} ({eta})"), - ) - } +fn progressbar(ntime: u64) -> indicatif::ProgressBar { + let progressbar = indicatif::ProgressBar::new(ntime); + progressbar.with_style( + indicatif::ProgressStyle::default_bar() + .template("{wide_bar:.cyan/blue} {pos}/{len} ({eta})"), + ) } diff --git a/multigrid/src/parsing.rs b/multigrid/src/parsing.rs index f0d5c14..035ff70 100644 --- a/multigrid/src/parsing.rs +++ b/multigrid/src/parsing.rs @@ -27,7 +27,7 @@ pub enum InitialConditions { } #[derive(Clone, Debug)] -pub enum BoundaryConditions { +enum BoundaryConditions { Vortex(euler::VortexParameters), Expressions(std::sync::Arc), NotNeeded, @@ -36,11 +36,10 @@ pub enum BoundaryConditions { pub struct RuntimeConfiguration { pub names: Vec, pub grids: Vec, - pub grid_connections: Vec, + pub boundary_conditions: Vec, pub op: Vec>, pub integration_time: Float, pub initial_conditions: InitialConditions, - pub boundary_conditions: BoundaryConditions, } impl input::Configuration { @@ -165,11 +164,10 @@ impl input::Configuration { Box::new((matcher(eta), matcher(xi))) as Box }) .collect(); - let grid_connections = self + let boundary_conditions = self .grids .iter() - .enumerate() - .map(|(i, (name, g))| { + .map(|(name, g)| { let default_bc = default.boundary_conditions.clone().unwrap_or_default(); use input::BoundaryType; g.boundary_conditions @@ -190,7 +188,7 @@ impl input::Configuration { name ), }, - Some(BoundaryType::This) => euler::BoundaryCharacteristic::Grid(i), + Some(BoundaryType::This) => euler::BoundaryCharacteristic::This, Some(BoundaryType::Vortex) => euler::BoundaryCharacteristic::Vortex( if let BoundaryConditions::Vortex(vortex) = &boundary_conditions { vortex.clone() @@ -227,11 +225,10 @@ impl input::Configuration { RuntimeConfiguration { names, grids, - grid_connections, + boundary_conditions, op, integration_time: self.integration_time, initial_conditions, - boundary_conditions, } } } diff --git a/multigrid/src/system.rs b/multigrid/src/system.rs new file mode 100644 index 0000000..d81fa06 --- /dev/null +++ b/multigrid/src/system.rs @@ -0,0 +1,1192 @@ +use crate::parsing; +use crate::utils::Direction; +use arrayvec::ArrayVec; +use core::ops::Deref; +use crossbeam_channel::{Receiver, Sender}; +use euler::{ + eval::{self, Evaluator}, + Diff, Field, VortexParameters, WorkBuffers, +}; +use ndarray::Array2; +use parking_lot::{Condvar, Mutex}; +use sbp::grid::{Grid, Metrics}; +use sbp::operators::{InterpolationOperator, SbpOperator2d}; +use sbp::*; +use std::sync::Arc; + +pub struct BaseSystem { + pub names: Vec, + pub grids: Vec, + pub time: Float, + pub boundary_conditions: Vec, + pub initial_conditions: crate::parsing::InitialConditions, + pub operators: Vec>, + pub output: hdf5::File, +} + +impl BaseSystem { + pub fn new( + names: Vec, + grids: Vec, + time: Float, + operators: Vec>, + boundary_conditions: Vec, + initial_conditions: crate::parsing::InitialConditions, + output: std::path::PathBuf, + ) -> Self { + let output = hdf5::File::create(output).unwrap(); + output + .new_dataset::() + .resizable(true) + .chunk((1,)) + .create("t", (0,)) + .unwrap(); + Self { + names, + grids, + time, + boundary_conditions, + initial_conditions, + operators, + output, + } + } + #[allow(clippy::many_single_char_names)] + pub fn create(self) -> System { + let fnow = self + .grids + .iter() + .map(|g| euler::Field::new(g.ny(), g.nx())) + .collect::>(); + let fnext = fnow.clone(); + let wb = self + .grids + .iter() + .map(|g| euler::WorkBuffers::new(g.ny(), g.nx())) + .collect(); + let k = self + .grids + .iter() + .map(|g| euler::Diff::zeros((g.ny(), g.nx()))) + .collect::>(); + let k = [k.clone(), k.clone(), k.clone(), k]; + let metrics = self + .grids + .iter() + .zip(&self.operators) + .map(|(g, op)| g.metrics(&**op).unwrap()) + .collect::>(); + + let eb = self + .boundary_conditions + .iter() + .zip(&self.grids) + .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) + .collect(); + + let mut outputs = Vec::with_capacity(self.grids.len()); + for (name, grid) in self.names.iter().zip(&self.grids) { + let g = self.output.create_group(name).unwrap(); + g.link_soft("/t", "t").unwrap(); + + let add_dim = |name| { + g.new_dataset::() + .chunk((grid.ny(), grid.nx())) + .gzip(9) + .create(name, (grid.ny(), grid.nx())) + }; + let xds = add_dim("x").unwrap(); + xds.write(grid.x()).unwrap(); + let yds = add_dim("y").unwrap(); + yds.write(grid.y()).unwrap(); + + let add_var = |name| { + g.new_dataset::() + .gzip(3) + .shuffle(true) + .chunk((1, grid.ny(), grid.nx())) + .resizable(true) + .create(name, (0, grid.ny(), grid.nx())) + }; + add_var("rho").unwrap(); + add_var("rhou").unwrap(); + add_var("rhov").unwrap(); + add_var("e").unwrap(); + outputs.push(g); + } + + let mut sys = SingleThreadedSystem { + fnow, + fnext, + k, + wb, + grids: self.grids, + metrics, + bt: self.boundary_conditions, + eb, + time: self.time, + dt: Float::NAN, + operators: self.operators, + output: (self.output, outputs), + progressbar: None, + initial_conditions: self.initial_conditions.clone(), + }; + match &self.initial_conditions { + /* + parsing::InitialConditions::File(f) => { + for grid in &sys.grids { + // Copy initial conditions from file, requires name of field + todo!() + } + } + */ + parsing::InitialConditions::Vortex(vortexparams) => sys.vortex(0.0, vortexparams), + parsing::InitialConditions::Expressions(expr) => { + let t = 0.0; + for (grid, field) in sys.grids.iter().zip(sys.fnow.iter_mut()) { + // Evaluate the expressions on all variables + let x = grid.x(); + let y = grid.y(); + let (rho, rhou, rhov, e) = field.components_mut(); + (*expr).evaluate(t, x, y, rho, rhou, rhov, e); + } + } + } + System::SingleThreaded(sys) + } + + /// Spreads the computation over n threads, in a thread per grid way. + pub fn create_distributed(self) -> System { + let nthreads = self.grids.len(); + + // Build up the boundary conditions + let mut pull = Vec::>::with_capacity(nthreads); + for _ in 0..nthreads { + pull.push(Arc::new(Communicator { + cvar: Condvar::new(), + data: Mutex::new(Direction::splat(()).map(|_| ArrayVec::new())), + })); + } + + // Build the set of communicators between boundaries + let mut push = Vec::>>>::with_capacity(nthreads); + for wb in &self.boundary_conditions { + let local_push = wb.as_ref().map(|bc| { + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = bc + { + Some(pull[*i].clone()) + } else { + None + } + }); + + push.push(local_push); + } + + let (master_send, master_recv) = crossbeam_channel::unbounded(); + + let mut tids = Vec::with_capacity(nthreads); + let mut communicators = Vec::with_capacity(nthreads); + + for (id, (((((name, grid), sbp), bt), pull), push)) in self + .names + .into_iter() + .zip(self.grids.into_iter()) + .zip(self.operators.into_iter()) + .zip(self.boundary_conditions) + .zip(pull) + .zip(push) + .enumerate() + { + let builder = std::thread::Builder::new().name(format!("mg: {}", name)); + + let boundary_conditions = bt.map(|bt| match bt { + euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This, + euler::BoundaryCharacteristic::Grid(_) => DistributedBoundaryConditions::Channel, + euler::BoundaryCharacteristic::Interpolate(_, int_op) => { + DistributedBoundaryConditions::Interpolate(int_op) + } + euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(), + euler::BoundaryCharacteristic::Vortex(vp) => { + DistributedBoundaryConditions::Vortex(vp) + } + euler::BoundaryCharacteristic::Eval(eval) => { + DistributedBoundaryConditions::Eval(eval) + } + }); + + let master_send = master_send.clone(); + + let (t_send, t_recv) = crossbeam_channel::unbounded(); + communicators.push(t_send); + let time = self.time; + + let output = self.output.clone(); + + let initial_conditions = self.initial_conditions.clone(); + + tids.push( + builder + .spawn(move || { + let (ny, nx) = (grid.ny(), grid.nx()); + let mut current = Field::new(ny, nx); + + match &initial_conditions { + parsing::InitialConditions::Vortex(vortexparams) => { + current.vortex(grid.x(), grid.y(), time, vortexparams) + } + parsing::InitialConditions::Expressions(expr) => { + // Evaluate the expressions on all variables + let x = grid.x(); + let y = grid.y(); + let (rho, rhou, rhov, e) = current.components_mut(); + (*expr).evaluate(time, x, y, rho, rhou, rhov, e); + } + } + let fut = current.clone(); + let k = [ + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + ]; + let metrics = grid.metrics(sbp.deref()).unwrap(); + + let wb = WorkBuffers::new(ny, nx); + + let g = { + let g = output.create_group(&name).unwrap(); + g.link_soft("/t", "t").unwrap(); + let add_dim = |name| { + g.new_dataset::() + .chunk((grid.ny(), grid.nx())) + .gzip(9) + .create(name, (grid.ny(), grid.nx())) + }; + let xds = add_dim("x").unwrap(); + xds.write(grid.x()).unwrap(); + let yds = add_dim("y").unwrap(); + yds.write(grid.y()).unwrap(); + + let add_var = |name| { + g.new_dataset::() + .gzip(3) + .shuffle(true) + .chunk((1, grid.ny(), grid.nx())) + .resizable(true) + .create(name, (0, grid.ny(), grid.nx())) + }; + add_var("rho").unwrap(); + add_var("rhou").unwrap(); + add_var("rhov").unwrap(); + add_var("e").unwrap(); + g + }; + + let mut sys = DistributedSystemPart { + current, + fut, + k, + boundary_conditions, + grid: (grid, metrics), + output: g, + push, + pull, + sbp, + t: time, + dt: Float::NAN, + initial_conditions, + + _name: name, + id, + + recv: t_recv, + send: master_send, + + wb, + workbuffer_edges: { + Direction { + north: (Array2::zeros((4, nx)), Array2::zeros((4, nx))), + south: (Array2::zeros((4, nx)), Array2::zeros((4, nx))), + east: (Array2::zeros((4, ny)), Array2::zeros((4, ny))), + west: (Array2::zeros((4, ny)), Array2::zeros((4, ny))), + } + }, + workbuffer_free: Direction { + north: { + let mut arr = ArrayVec::new(); + for _ in 0..2 { + arr.push(Array2::zeros((4, nx))) + } + arr + }, + south: { + let mut arr = ArrayVec::new(); + for _ in 0..2 { + arr.push(Array2::zeros((4, nx))) + } + arr + }, + east: { + let mut arr = ArrayVec::new(); + for _ in 0..2 { + arr.push(Array2::zeros((4, ny))) + } + arr + }, + west: { + let mut arr = ArrayVec::new(); + for _ in 0..2 { + arr.push(Array2::zeros((4, ny))) + } + arr + }, + }, + + progressbar: None, + }; + + sys.run(); + }) + .unwrap(), + ); + } + + System::MultiThreaded(DistributedSystem { + sys: tids, + recv: master_recv, + send: communicators, + output: self.output, + progressbar: None, + }) + } +} + +pub enum System { + SingleThreaded(SingleThreadedSystem), + MultiThreaded(DistributedSystem), +} + +impl System { + pub fn max_dt(&self) -> Float { + match self { + Self::SingleThreaded(sys) => sys.max_dt(), + Self::MultiThreaded(sys) => { + for send in &sys.send { + send.send(MsgFromHost::DtRequest).unwrap(); + } + let mut max_dt = Float::MAX; + let mut to_receive = sys.sys.len(); + while to_receive != 0 { + let dt = match sys.recv.recv().unwrap() { + (_, MsgToHost::MaxDt(dt)) => dt, + _ => unreachable!(), + }; + max_dt = max_dt.min(dt); + to_receive -= 1; + } + max_dt + } + } + } + pub fn set_dt(&mut self, dt: Float) { + match self { + Self::SingleThreaded(sys) => sys.dt = dt, + Self::MultiThreaded(sys) => { + for tid in &sys.send { + tid.send(MsgFromHost::DtSet(dt)).unwrap() + } + } + } + } + pub fn advance(&mut self, nsteps: u64) { + match self { + Self::SingleThreaded(sys) => sys.advance(nsteps), + Self::MultiThreaded(sys) => sys.advance(nsteps), + } + } + pub fn output(&self, ntime: u64) { + match self { + Self::SingleThreaded(sys) => sys.output(ntime), + Self::MultiThreaded(sys) => sys.output(ntime), + } + } + pub fn add_progressbar(&mut self, ntime: u64) { + match self { + Self::SingleThreaded(sys) => sys.progressbar = Some(super::progressbar(ntime)), + Self::MultiThreaded(sys) => sys.attach_progressbar(ntime), + } + } + pub fn finish_progressbar(&mut self) { + match self { + Self::SingleThreaded(sys) => sys.progressbar.take().unwrap().finish_and_clear(), + Self::MultiThreaded(sys) => { + for tid in &sys.send { + tid.send(MsgFromHost::ProgressbarDrop).unwrap(); + } + sys.synchronise(); + let target = sys.progressbar.take().unwrap(); + target.clear().unwrap(); + } + } + } + pub fn error(&self) -> Float { + match self { + Self::SingleThreaded(sys) => sys.error(), + Self::MultiThreaded(sys) => { + for sender in &sys.send { + sender.send(MsgFromHost::Error).unwrap(); + } + let mut e = 0.0; + for _ in 0..sys.sys.len() { + e += match sys.recv.recv().unwrap() { + (_, MsgToHost::Error(e)) => e, + (_, m) => panic!("Unexpected message: {:?}", m), + } + } + e + } + } + } +} + +pub struct SingleThreadedSystem { + pub fnow: Vec, + pub fnext: Vec, + pub wb: Vec, + pub k: [Vec; 4], + pub grids: Vec, + pub metrics: Vec, + pub bt: Vec, + pub eb: Vec, + pub time: Float, + pub dt: Float, + pub operators: Vec>, + pub output: (hdf5::File, Vec), + pub progressbar: Option, + pub initial_conditions: parsing::InitialConditions, +} + +impl integrate::Integrable for SingleThreadedSystem { + type State = Vec; + type Diff = Vec; + + fn scaled_add(s: &mut Self::State, o: &Self::Diff, scale: Float) { + s.iter_mut() + .zip(o.iter()) + .for_each(|(s, o)| euler::Field::scaled_add(s, o, scale)) + } +} + +impl SingleThreadedSystem { + pub fn vortex(&mut self, t: Float, vortex_params: &euler::VortexParameters) { + for (f, g) in self.fnow.iter_mut().zip(&self.grids) { + f.vortex(g.x(), g.y(), t, vortex_params); + } + } + + pub fn advance(&mut self, nsteps: u64) { + for _ in 0..nsteps { + self.advance_single_step(self.dt); + if let Some(pbar) = &self.progressbar { + pbar.inc(1) + } + } + } + + pub fn advance_single_step(&mut self, dt: Float) { + let metrics = &self.metrics; + let grids = &self.grids; + let bt = &self.bt; + let wb = &mut self.wb; + let eb = &mut self.eb; + let operators = &self.operators; + + let rhs = move |fut: &mut Vec, prev: &Vec, time: Float| { + let prev_all = &prev; + for (((((((fut, prev), wb), grid), metrics), op), bt), eb) in fut + .iter_mut() + .zip(prev.iter()) + .zip(wb.iter_mut()) + .zip(grids) + .zip(metrics.iter()) + .zip(operators.iter()) + .zip(bt.iter()) + .zip(eb.iter_mut()) + { + let bc = euler::boundary_extracts(prev_all, bt, prev, grid, eb, time); + if op.upwind().is_some() { + euler::RHS_upwind(&**op, fut, prev, metrics, &bc, &mut wb.0); + } else { + euler::RHS_trad(&**op, fut, prev, metrics, &bc, &mut wb.0); + } + } + }; + + integrate::integrate::( + rhs, + &self.fnow, + &mut self.fnext, + &mut self.time, + dt, + &mut self.k, + ); + + std::mem::swap(&mut self.fnow, &mut self.fnext); + } + + /// Suggested maximum dt for this problem + pub fn max_dt(&self) -> Float { + let is_h2 = self + .operators + .iter() + .any(|op| op.is_h2xi() || op.is_h2eta()); + let c_max = if is_h2 { 0.5 } else { 1.0 }; + let mut max_dt: Float = Float::INFINITY; + + for (field, metrics) in self.fnow.iter().zip(self.metrics.iter()) { + let nx = field.nx(); + let ny = field.ny(); + + let rho = field.rho(); + let rhou = field.rhou(); + let rhov = field.rhov(); + + let mut max_u: Float = 0.0; + let mut max_v: Float = 0.0; + + for ((((((rho, rhou), rhov), detj_dxi_dx), detj_dxi_dy), detj_deta_dx), detj_deta_dy) in + rho.iter() + .zip(rhou.iter()) + .zip(rhov.iter()) + .zip(metrics.detj_dxi_dx()) + .zip(metrics.detj_dxi_dy()) + .zip(metrics.detj_deta_dx()) + .zip(metrics.detj_deta_dy()) + { + let u = rhou / rho; + let v = rhov / rho; + + let uhat: Float = detj_dxi_dx * u + detj_dxi_dy * v; + let vhat: Float = detj_deta_dx * u + detj_deta_dy * v; + + max_u = max_u.max(uhat.abs()); + max_v = max_v.max(vhat.abs()); + } + + let dx = 1.0 / nx as Float; + let dy = 1.0 / ny as Float; + + let c_dt = Float::max(max_u / dx, max_v / dy); + + max_dt = Float::min(max_dt, c_max / c_dt); + } + + max_dt + } + + pub fn output(&self, ntime: u64) { + let tds = self.output.0.dataset("t").unwrap(); + let tpos = tds.size(); + tds.resize((tpos + 1,)).unwrap(); + tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1]) + .unwrap(); + for (group, fnow) in self.output.1.iter().zip(&self.fnow) { + let (ny, nx) = (fnow.ny(), fnow.nx()); + let rhods = group.dataset("rho").unwrap(); + let rhouds = group.dataset("rhou").unwrap(); + let rhovds = group.dataset("rhov").unwrap(); + let eds = group.dataset("e").unwrap(); + + let (rho, rhou, rhov, e) = fnow.components(); + rhods.resize((tpos + 1, ny, nx)).unwrap(); + rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap(); + + rhouds.resize((tpos + 1, ny, nx)).unwrap(); + rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap(); + + rhovds.resize((tpos + 1, ny, nx)).unwrap(); + rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap(); + + eds.resize((tpos + 1, ny, nx)).unwrap(); + eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap(); + } + } + + pub fn error(&self) -> Float { + let mut e = 0.0; + for ((fmod, grid), op) in self.fnow.iter().zip(&self.grids).zip(&self.operators) { + let mut fvort = fmod.clone(); + match &self.initial_conditions { + parsing::InitialConditions::Vortex(vortexparams) => { + fvort.vortex(grid.x(), grid.y(), self.time, &vortexparams); + } + parsing::InitialConditions::Expressions(expr) => { + let (rho, rhou, rhov, e) = fvort.components_mut(); + expr.as_ref() + .evaluate(self.time, grid.x(), grid.y(), rho, rhou, rhov, e) + } + } + e += fmod.h2_err(&fvort, &**op); + } + e + } +} + +pub struct DistributedSystem { + recv: Receiver<(usize, MsgToHost)>, + send: Vec>, + /// All threads should be joined to mark the end of the computation + sys: Vec>, + output: hdf5::File, + progressbar: Option, +} + +impl DistributedSystem { + pub fn advance(&mut self, ntime: u64) { + for tid in &self.send { + tid.send(MsgFromHost::Advance(ntime)).unwrap(); + } + } + pub fn output(&self, ntime: u64) { + for tid in &self.send { + tid.send(MsgFromHost::Output(ntime)).unwrap(); + } + let tds = self.output.dataset("t").unwrap(); + let tpos = tds.size(); + tds.resize((tpos + 1,)).unwrap(); + tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1]) + .unwrap(); + } + pub fn attach_progressbar(&mut self, ntime: u64) { + let target = indicatif::MultiProgress::new(); + for tid in &self.send { + let pb = super::progressbar(ntime); + let pb = target.add(pb); + tid.send(MsgFromHost::Progressbar(pb)).unwrap(); + } + target.set_move_cursor(true); + self.progressbar = Some(target); + } + fn send_barrier(&self, barrier: &crossbeam_utils::sync::WaitGroup) { + for tid in &self.send { + tid.send(MsgFromHost::Barrier(barrier.clone())).unwrap() + } + } + pub fn synchronise(&self) { + // Syncronise before starting the timer + let barrier = crossbeam_utils::sync::WaitGroup::new(); + self.send_barrier(&barrier); + barrier.wait(); + } +} + +impl Drop for DistributedSystem { + fn drop(&mut self) { + for tid in &self.send { + tid.send(MsgFromHost::Stop).unwrap(); + } + let handles = std::mem::take(&mut self.sys); + for tid in handles { + tid.join().unwrap() + } + } +} + +/// Messages sent from the host to each compute thread +#[derive(Debug)] +enum MsgFromHost { + /// Advance n steps + Advance(u64), + /// Compute the maximum dt allowed by this grid + DtRequest, + /// Set dt + DtSet(Float), + /// Output the current time to file + Output(u64), + /// Stop all computing + Stop, + /// Request the current error + Error, + /// A barrier that must be waited on + Barrier(crossbeam_utils::sync::WaitGroup), + /// Progressbar to report progress + Progressbar(indicatif::ProgressBar), + /// Clear and remove the progressbar + ProgressbarDrop, +} + +/// Messages sent back to the host +#[derive(Debug)] +enum MsgToHost { + /// Maximum dt allowed by the current grid + MaxDt(Float), + /// Error from the current grid + Error(Float), +} + +// #[derive(Debug)] +pub enum DistributedBoundaryConditions { + This, + + Vortex(VortexParameters), + Eval(std::sync::Arc>), + + Interpolate(Box), + Channel, +} + +type CommunicatorData = ArrayVec, 4>; + +struct Communicator { + /// Waker for this grid, neighbours should have a reference + /// and notify when a boundary has been put + cvar: Condvar, + /// Internal data exchange, is None on missing data, inner type + /// can be set to None when grabbing the boundary + data: Mutex>, +} + +struct DistributedSystemPart { + grid: (Grid, Metrics), + sbp: Box, + + boundary_conditions: Direction, + /// Channel pullers + pull: Arc, + /// Subscribers to the boundaries of self + push: Direction>>, + + current: Field, + fut: Field, + + t: Float, + dt: Float, + + _name: String, + id: usize, + recv: Receiver, + send: Sender<(usize, MsgToHost)>, + + output: hdf5::Group, + initial_conditions: crate::parsing::InitialConditions, + + k: [Diff; 4], + wb: WorkBuffers, + /// Work buffer for boundaries + workbuffer_edges: Direction<(Array2, Array2)>, + /// These can be popped and pushed as we communicate data + workbuffer_free: Direction, + + progressbar: Option, +} + +impl DistributedSystemPart { + fn run(&mut self) { + loop { + match self.recv.recv().unwrap() { + MsgFromHost::DtSet(dt) => self.dt = dt, + MsgFromHost::DtRequest => { + let dt = self.max_dt(); + self.send(MsgToHost::MaxDt(dt)).unwrap(); + } + MsgFromHost::Advance(ntime) => self.advance(ntime), + MsgFromHost::Output(ntime) => self.output(ntime), + MsgFromHost::Stop => return, + MsgFromHost::Error => self.send(MsgToHost::Error(self.error())).unwrap(), + MsgFromHost::Barrier(barrier) => barrier.wait(), + MsgFromHost::Progressbar(pbar) => self.progressbar = Some(pbar), + MsgFromHost::ProgressbarDrop => { + let pb = self.progressbar.take().unwrap(); + pb.finish_and_clear() + } + } + } + } + + fn max_dt(&self) -> Float { + let nx = self.current.nx(); + let ny = self.current.ny(); + + let (rho, rhou, rhov, _e) = self.current.components(); + + let mut max_u: Float = 0.0; + let mut max_v: Float = 0.0; + + for ((((((rho, rhou), rhov), detj_dxi_dx), detj_dxi_dy), detj_deta_dx), detj_deta_dy) in rho + .iter() + .zip(rhou.iter()) + .zip(rhov.iter()) + .zip(self.grid.1.detj_dxi_dx()) + .zip(self.grid.1.detj_dxi_dy()) + .zip(self.grid.1.detj_deta_dx()) + .zip(self.grid.1.detj_deta_dy()) + { + let u = rhou / rho; + let v = rhov / rho; + + let uhat: Float = detj_dxi_dx * u + detj_dxi_dy * v; + let vhat: Float = detj_deta_dx * u + detj_deta_dy * v; + + max_u = max_u.max(uhat.abs()); + max_v = max_v.max(vhat.abs()); + } + + let dx = 1.0 / nx as Float; + let dy = 1.0 / ny as Float; + + let c_dt = Float::max(max_u / dx, max_v / dy); + + let c_max = if self.sbp.is_h2xi() || self.sbp.is_h2eta() { + 0.5 + } else { + 1.0 + }; + + c_max / c_dt + } + + fn output(&mut self, _ntime: u64) { + let (ny, nx) = (self.current.ny(), self.current.nx()); + let rhods = self.output.dataset("rho").unwrap(); + let rhouds = self.output.dataset("rhou").unwrap(); + let rhovds = self.output.dataset("rhov").unwrap(); + let eds = self.output.dataset("e").unwrap(); + + let (rho, rhou, rhov, e) = self.current.components(); + let tpos = rhods.size() / (ny * nx); + rhods.resize((tpos + 1, ny, nx)).unwrap(); + rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap(); + + rhouds.resize((tpos + 1, ny, nx)).unwrap(); + rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap(); + + rhovds.resize((tpos + 1, ny, nx)).unwrap(); + rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap(); + + eds.resize((tpos + 1, ny, nx)).unwrap(); + eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap(); + } + + fn advance(&mut self, ntime: u64) { + for _itime in 0..ntime { + if let Some(pbar) = &self.progressbar { + pbar.inc(1) + } + let metrics = &self.grid.1; + let wb = &mut self.wb.0; + let sbp = &self.sbp; + let push = &self.push; + let pull = &self.pull; + let boundary_conditions = &self.boundary_conditions; + let grid = &self.grid.0; + let workbuffer_edges = &mut self.workbuffer_edges; + let workbuffer_free = &mut self.workbuffer_free; + + let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| { + // Send off the boundaries eagerly, in case neighbouring grid is ready + push.as_ref() + .zip(workbuffer_free.as_mut()) + .zip( + Direction::) -> &mut CommunicatorData> { + north: |x| x.south_mut(), + south: |x| x.north_mut(), + east: |x| x.west_mut(), + west: |x| x.east_mut(), + }, + ) + .zip(Direction { + north: y.north(), + south: y.south(), + west: y.west(), + east: y.east(), + }) + .map(|(((push, wb), sel), this)| { + if let Some(s) = push { + let mut wb = wb.pop().unwrap(); + wb.assign(&this); + { + let mut s = s.data.lock(); + sel(&mut s).push(wb); + } + s.cvar.notify_one(); + } + }); + + // This computation does not depend on the boundaries + euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb); + + // Get boundaries, but be careful and maximise the amount of work which can be + // performed before we have all of them, whilst ensuring threads can sleep for as + // long as possible + let computed = boundary_conditions + .as_ref() + .zip(euler::SAT_FUNCTIONS) + .zip(workbuffer_edges.as_mut()) + .zip(Direction { + north: y.south(), + south: y.north(), + east: y.west(), + west: y.east(), + }) + .zip(Direction { + north: grid.north(), + south: grid.south(), + east: grid.east(), + west: grid.west(), + }) + .map(|((((bc, sat), wb), self_edge), grid)| { + wb.0.fill(0.0); + match bc { + DistributedBoundaryConditions::Channel + | DistributedBoundaryConditions::Interpolate(_) => false, + DistributedBoundaryConditions::This => { + sat(sbp.deref(), wb.0.view_mut(), y, metrics, self_edge); + true + } + DistributedBoundaryConditions::Vortex(vp) => { + let mut fiter = wb.1.outer_iter_mut(); + let (rho, rhou, rhov, e) = ( + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + ); + let (gx, gy) = grid; + vp.evaluate(time, gx, gy, rho, rhou, rhov, e); + + sat(sbp.deref(), wb.0.view_mut(), y, metrics, wb.1.view()); + true + } + DistributedBoundaryConditions::Eval(eval) => { + let mut fiter = wb.1.outer_iter_mut(); + let (rho, rhou, rhov, e) = ( + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + ); + let (gx, gy) = grid; + eval.evaluate(time, gx, gy, rho, rhou, rhov, e); + sat(sbp.deref(), wb.0.view_mut(), y, metrics, wb.1.view()); + true + } + } + }); + + if computed.north { + k.north_mut() + .scaled_add(1.0, &workbuffer_edges.north().0.view()); + } + if computed.south { + k.south_mut() + .scaled_add(1.0, &workbuffer_edges.south().0.view()); + } + if computed.east { + k.east_mut() + .scaled_add(1.0, &workbuffer_edges.east().0.view()); + } + if computed.west { + k.west_mut() + .scaled_add(1.0, &workbuffer_edges.west().0.view()); + } + + let mut boundaries_remaining = computed.map(|b| !b); + + { + let mut data = pull.data.lock(); + 'check_boundaries: while boundaries_remaining.any() { + let boundaries = + boundaries_remaining + .zip(data.as_mut()) + .map( + |(remains, data)| { + if remains { + data.pop_at(0) + } else { + None + } + }, + ); + if boundaries.as_ref().map(Option::is_none).all() { + // Park thread while waiting for boundaries + pull.cvar.wait(&mut data); + continue 'check_boundaries; + } + // While we are waiting we can unlock mutex + lock_api::MutexGuard::unlocked(&mut data, || { + if let Some(mut boundary) = boundaries.north { + boundaries_remaining.north = false; + let wb = workbuffer_edges.north_mut(); + let wb_push = workbuffer_free.north_mut(); + match boundary_conditions.north() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(&mut wb.0, &mut boundary); + wb_push.push(boundary); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = boundary.shape()[1] > wb.0.shape()[2]; + for (to, from) in + wb.0.outer_iter_mut().zip(boundary.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let mut vec = boundary.into_raw_vec(); + vec.resize(wb.0.len(), 0.0); + let boundary = + Array2::from_shape_vec(wb.0.raw_dim(), vec).unwrap(); + wb_push.push(boundary) + } + _ => unreachable!(), + } + euler::SAT_north( + sbp.deref(), + k.north_mut(), + y, + metrics, + wb.0.view(), + ); + }; + + if let Some(mut boundary) = boundaries.south { + boundaries_remaining.south = false; + let wb = workbuffer_edges.north_mut(); + let wb_push = workbuffer_free.south_mut(); + match boundary_conditions.south() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(&mut wb.0, &mut boundary); + wb_push.push(boundary); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = boundary.shape()[1] > wb.0.shape()[2]; + for (to, from) in + wb.0.outer_iter_mut().zip(boundary.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let mut vec = boundary.into_raw_vec(); + vec.resize(wb.0.len(), 0.0); + let boundary = + Array2::from_shape_vec(wb.0.raw_dim(), vec).unwrap(); + wb_push.push(boundary); + } + _ => unreachable!(), + } + euler::SAT_south( + sbp.deref(), + k.south_mut(), + y, + metrics, + wb.0.view(), + ); + }; + + if let Some(mut boundary) = boundaries.east { + boundaries_remaining.east = false; + let wb = workbuffer_edges.east_mut(); + let wb_push = workbuffer_free.east_mut(); + match boundary_conditions.east() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(&mut wb.0, &mut boundary); + wb_push.push(boundary); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = boundary.shape()[1] > wb.0.shape()[2]; + for (to, from) in + wb.0.outer_iter_mut().zip(boundary.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let mut vec = boundary.into_raw_vec(); + vec.resize(wb.0.len(), 0.0); + let boundary = + Array2::from_shape_vec(wb.0.raw_dim(), vec).unwrap(); + wb_push.push(boundary); + } + _ => unreachable!(), + } + euler::SAT_east(sbp.deref(), k.east_mut(), y, metrics, wb.0.view()); + }; + + if let Some(mut boundary) = boundaries.west { + boundaries_remaining.west = false; + let wb = workbuffer_edges.west_mut(); + let wb_push = workbuffer_free.west_mut(); + match boundary_conditions.west() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(&mut wb.0, &mut boundary); + wb_push.push(boundary); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = boundary.shape()[1] > wb.0.shape()[2]; + for (to, from) in + wb.0.outer_iter_mut().zip(boundary.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let mut vec = boundary.into_raw_vec(); + vec.resize(wb.0.len(), 0.0); + let boundary = + Array2::from_shape_vec(wb.0.raw_dim(), vec).unwrap(); + wb_push.push(boundary); + } + _ => unreachable!(), + } + euler::SAT_west(sbp.deref(), k.west_mut(), y, metrics, wb.0.view()); + }; + }); + } + } + }; + integrate::integrate::( + rhs, + &self.current, + &mut self.fut, + &mut self.t, + self.dt, + &mut self.k, + ); + std::mem::swap(&mut self.current, &mut self.fut); + } + } + + fn send(&self, msg: MsgToHost) -> Result<(), crossbeam_channel::SendError<(usize, MsgToHost)>> { + self.send.send((self.id, msg)) + } + + fn error(&self) -> Float { + let mut fvort = self.current.clone(); + match &self.initial_conditions { + parsing::InitialConditions::Vortex(vortexparams) => { + fvort.vortex(self.grid.0.x(), self.grid.0.y(), self.t, &vortexparams); + } + parsing::InitialConditions::Expressions(expr) => { + let (rho, rhou, rhov, e) = fvort.components_mut(); + expr.as_ref() + .evaluate(self.t, self.grid.0.x(), self.grid.0.y(), rho, rhou, rhov, e) + } + } + self.current.h2_err(&fvort, &*self.sbp) + } +} diff --git a/sbp/Cargo.toml b/sbp/Cargo.toml index e642625..aac3e27 100644 --- a/sbp/Cargo.toml +++ b/sbp/Cargo.toml @@ -12,7 +12,7 @@ serde = { version = "1.0.115", optional = true, default-features = false, featur num-traits = "0.2.14" float = { path = "../utils/float" } constmatrix = { path = "../utils/constmatrix" } -core_simd = { git = "https://github.com/rust-lang/stdsimd" } +core_simd = { git = "https://github.com/rust-lang/portable-simd" } [features] # Use f32 as precision, default is f64 diff --git a/sbp/src/operators/algos.rs b/sbp/src/operators/algos.rs index 20ae1d5..cb5dee3 100644 --- a/sbp/src/operators/algos.rs +++ b/sbp/src/operators/algos.rs @@ -281,6 +281,7 @@ pub(crate) fn diff_op_2d_fallback( matrix: &BlockMatrix, optype: OperatorType, @@ -392,7 +393,6 @@ pub(crate) fn diff_op_2d_sliceable_y_simd Direction { east: (self.east, other.east), } } + + /// Flips all direction through origo + pub fn opposite(self) -> Self { + Self { + north: self.south, + south: self.north, + east: self.west, + west: self.east, + } + } + + pub fn splat(t: T) -> Self + where + T: Copy, + { + Self { + north: t, + south: t, + east: t, + west: t, + } + } } impl Direction> { @@ -96,6 +118,15 @@ impl Direction { } } +impl Direction { + pub fn all(&self) -> bool { + self.north && self.south && self.east && self.west + } + pub fn any(&self) -> bool { + self.north || self.south || self.east || self.west + } +} + /// Linearly spaced parameters, apart from the boundaries which /// only have a distance of `h/2` from the boundary pub fn h2linspace(start: Float, end: Float, n: usize) -> ndarray::Array1 { diff --git a/utils/integrate/src/lib.rs b/utils/integrate/src/lib.rs index 3785a77..d91afce 100644 --- a/utils/integrate/src/lib.rs +++ b/utils/integrate/src/lib.rs @@ -215,7 +215,7 @@ pub fn integrate( } }; - rhs(&mut k[i], &fut, simtime); + rhs(&mut k[i], fut, simtime); } }