From 0a5647df3a9cd36f5a272526dd9bbe23518eb2ad Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Sat, 7 Aug 2021 17:30:00 +0000 Subject: [PATCH] Compilable state --- multigrid/src/main.rs | 63 +++-- multigrid/src/system.rs | 528 ++++++++++++++++++++++------------------ 2 files changed, 320 insertions(+), 271 deletions(-) diff --git a/multigrid/src/main.rs b/multigrid/src/main.rs index 01e37e4..ed16796 100644 --- a/multigrid/src/main.rs +++ b/multigrid/src/main.rs @@ -86,42 +86,28 @@ fn main() { operators, boundary_conditions, initial_conditions.clone(), + opt.output.clone(), ); - let mut sys = basesystem.create(); // System::new(grids, grid_connections, operators); - let dt = sys.max_dt(); - - let ntime = (integration_time / dt).round() as u64; - - if opt.distribute { - let sys = sys.distribute(ntime); - let timer = if opt.timings { - Some(std::time::Instant::now()) - } else { - None - }; - sys.run(); - if let Some(timer) = timer { - let duration = timer.elapsed(); - println!("Duration: {:?}", duration); - } - return; - } - - let should_output = |itime| { - opt.number_of_outputs.map_or(false, |num_out| { - if num_out == 0 { - false - } else { - itime % (std::cmp::max(ntime / (num_out - 1), 1)) == 0 - } - }) + let sys = if opt.distribute { + basesystem.create_distributed() + } else { + basesystem.create() }; - let output = File::create(&opt.output, sys.grids.as_slice(), names).unwrap(); - let mut output = OutputThread::new(output); - output.add_timestep(0, &sys.fnow); + let dt = sys.max_dt(); + let ntime = (integration_time / dt).round() as u64; + let steps_between_outputs = if let Some(n) = opt.number_of_outputs { + std::cmp::max(n / ntime, 1) + } else { + ntime + }; + + sys.output(0); + //let output = File::create(&opt.output, sys.grids.as_slice(), names).unwrap(); + //let mut output = OutputThread::new(output); + //output.add_timestep(0, &sys.fnow); let progressbar = progressbar(opt.no_progressbar, ntime); @@ -131,6 +117,16 @@ fn main() { None }; + let mut itime = 0; + while itime < ntime { + sys.advance(steps_between_outputs); + progressbar.inc(1); + + itime += steps_between_outputs; + sys.output(itime); + } + + /* for itime in 0..ntime { if should_output(itime) { output.add_timestep(itime, &sys.fnow); @@ -138,6 +134,7 @@ fn main() { progressbar.inc(1); sys.advance(dt); } + */ progressbar.finish_and_clear(); let mut outinfo = OutputInformation { @@ -150,8 +147,9 @@ fn main() { outinfo.time_elapsed = Some(duration); } - output.add_timestep(ntime, &sys.fnow); + //output.add_timestep(ntime, &sys.fnow); + /* if opt.error { let time = ntime as Float * dt; let mut e = 0.0; @@ -171,6 +169,7 @@ fn main() { } outinfo.error = Some(e); } + */ if opt.output_json { println!("{}", json5::to_string(&outinfo).unwrap()); diff --git a/multigrid/src/system.rs b/multigrid/src/system.rs index 1b778c6..8058b4b 100644 --- a/multigrid/src/system.rs +++ b/multigrid/src/system.rs @@ -1,5 +1,6 @@ use crate::parsing; use crate::utils::Direction; +use core::ops::Deref; use crossbeam_channel::{Receiver, Select, Sender}; use euler::{ eval::{self, Evaluator}, @@ -9,7 +10,6 @@ use ndarray::Array2; use sbp::grid::{Grid, Metrics}; use sbp::operators::{InterpolationOperator, SbpOperator2d}; use sbp::*; -use std::sync::{Arc, Barrier}; pub struct BaseSystem { pub names: Vec, @@ -18,6 +18,7 @@ pub struct BaseSystem { pub boundary_conditions: Vec, pub initial_conditions: crate::parsing::InitialConditions, pub operators: Vec>, + pub output: hdf5::File, } impl BaseSystem { @@ -28,7 +29,15 @@ impl BaseSystem { operators: Vec>, boundary_conditions: Vec, initial_conditions: crate::parsing::InitialConditions, + output: std::path::PathBuf, ) -> Self { + let output = hdf5::File::create(output).unwrap(); + output + .new_dataset::() + .resizable(true) + .chunk((1,)) + .create("t", (0,)) + .unwrap(); Self { names, grids, @@ -36,10 +45,54 @@ impl BaseSystem { boundary_conditions, initial_conditions, operators, + output, } } pub fn create(self) -> System { - let mut sys = System::new(self.grids, self.boundary_conditions, self.operators); + let fnow = self + .grids + .iter() + .map(|g| euler::Field::new(g.ny(), g.nx())) + .collect::>(); + let fnext = fnow.clone(); + let wb = self + .grids + .iter() + .map(|g| euler::WorkBuffers::new(g.ny(), g.nx())) + .collect(); + let k = self + .grids + .iter() + .map(|g| euler::Diff::zeros((g.ny(), g.nx()))) + .collect::>(); + let k = [k.clone(), k.clone(), k.clone(), k]; + let metrics = self + .grids + .iter() + .zip(&self.operators) + .map(|(g, op)| g.metrics(&**op).unwrap()) + .collect::>(); + + let eb = self + .boundary_conditions + .iter() + .zip(&self.grids) + .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) + .collect(); + + let sys = SingleThreadedSystem { + fnow, + fnext, + k, + wb, + grids: self.grids, + metrics, + bt: self.boundary_conditions, + eb, + time: self.time, + operators: self.operators, + output: todo!(), + }; match &self.initial_conditions { /* parsing::InitialConditions::File(f) => { @@ -61,11 +114,180 @@ impl BaseSystem { } } } - sys + System::SingleThreaded(sys) + } + + /// Spreads the computation over n threads, in a thread per grid way. + pub fn create_distributed(self) -> System { + let nthreads = self.grids.len(); + // let dt = self.max_dt(); + + // Build up the boundary conditions + let mut push_channels: Vec>>>> = + Vec::with_capacity(nthreads); + let mut pull_channels: Vec>>>> = + vec![Direction::default(); nthreads]; + + // Build the set of communicators between boundaries + for wb in &self.boundary_conditions { + let mut local_push = Direction::default(); + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north() + { + let (s, r) = crossbeam_channel::bounded(1); + pull_channels[*i] + .south_mut() + .replace(r) + .and_then::<(), _>(|_| panic!("channel is already present")); + *local_push.north_mut() = Some(s); + } + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south() + { + let (s, r) = crossbeam_channel::bounded(1); + pull_channels[*i] + .north_mut() + .replace(r) + .and_then::<(), _>(|_| panic!("channel is already present")); + *local_push.south_mut() = Some(s); + } + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east() + { + let (s, r) = crossbeam_channel::bounded(1); + pull_channels[*i] + .west_mut() + .replace(r) + .and_then::<(), _>(|_| panic!("channel is already present")); + *local_push.east_mut() = Some(s); + } + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west() + { + let (s, r) = crossbeam_channel::bounded(1); + pull_channels[*i] + .east_mut() + .replace(r) + .and_then::<(), _>(|_| panic!("channel is already present")); + *local_push.west_mut() = Some(s); + } + + push_channels.push(local_push); + } + + let (master_send, master_recv) = crossbeam_channel::unbounded(); + + let mut tids = Vec::with_capacity(nthreads); + let mut communicators = Vec::with_capacity(nthreads); + + for (id, (((((name, grid), sbp), bt), chan), push)) in self + .names + .into_iter() + .zip(self.grids.into_iter()) + .zip(self.operators.into_iter()) + .zip(self.boundary_conditions) + .zip(pull_channels) + .zip(push_channels) + .enumerate() + { + let builder = std::thread::Builder::new().name(format!("eulersolver: {}", name)); + + let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt { + euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This, + euler::BoundaryCharacteristic::Grid(_) => { + DistributedBoundaryConditions::Channel(chan.unwrap()) + } + euler::BoundaryCharacteristic::Interpolate(_, int_op) => { + DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op) + } + euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(), + euler::BoundaryCharacteristic::Vortex(vp) => { + DistributedBoundaryConditions::Vortex(vp) + } + euler::BoundaryCharacteristic::Eval(eval) => { + DistributedBoundaryConditions::Eval(eval) + } + }); + + let master_send = master_send.clone(); + + let (t_send, t_recv) = crossbeam_channel::unbounded(); + communicators.push(t_send); + let time = self.time; + + tids.push( + builder + .spawn(move || { + let (ny, nx) = (grid.ny(), grid.nx()); + let current = Field::new(ny, nx); + let fut = current.clone(); + let k = [ + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + Diff::zeros((ny, nx)), + ]; + let metrics = grid.metrics(sbp.deref()).unwrap(); + + let wb = WorkBuffers::new(ny, nx); + + let mut sys = DistributedSystemPart { + current, + fut, + k, + boundary_conditions, + grid: (grid, metrics), + output: todo!(), + push, + sbp, + t: time, + dt: Float::NAN, + + name, + id, + + recv: t_recv, + send: master_send, + + wb, + wb_ns: Array2::zeros((4, nx)), + wb_ew: Array2::zeros((4, ny)), + }; + + // init and send maxdt + // receive maxdt + sys.run(); + }) + .unwrap(), + ); + } + + System::MultiThreaded(DistributedSystem { + sys: tids, + recv: master_recv, + send: communicators, + }) } } -pub struct System { +pub enum System { + SingleThreaded(SingleThreadedSystem), + MultiThreaded(DistributedSystem), +} + +impl System { + pub fn max_dt(&self) -> Float { + todo!() + } + pub fn advance(&self, nsteps: u64) { + todo!() + } + pub fn output(&self, ntime: u64) { + todo!() + } +} + +pub struct SingleThreadedSystem { pub fnow: Vec, pub fnext: Vec, pub wb: Vec, @@ -76,9 +298,10 @@ pub struct System { pub eb: Vec, pub time: Float, pub operators: Vec>, + pub output: Vec, } -impl integrate::Integrable for System { +impl integrate::Integrable for SingleThreadedSystem { type State = Vec; type Diff = Vec; @@ -89,52 +312,7 @@ impl integrate::Integrable for System { } } -impl System { - pub fn new( - grids: Vec, - bt: Vec, - operators: Vec>, - ) -> Self { - let fnow = grids - .iter() - .map(|g| euler::Field::new(g.ny(), g.nx())) - .collect::>(); - let fnext = fnow.clone(); - let wb = grids - .iter() - .map(|g| euler::WorkBuffers::new(g.ny(), g.nx())) - .collect(); - let k = grids - .iter() - .map(|g| euler::Diff::zeros((g.ny(), g.nx()))) - .collect::>(); - let k = [k.clone(), k.clone(), k.clone(), k]; - let metrics = grids - .iter() - .zip(&operators) - .map(|(g, op)| g.metrics(&**op).unwrap()) - .collect::>(); - - let eb = bt - .iter() - .zip(&grids) - .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) - .collect(); - - Self { - fnow, - fnext, - k, - wb, - grids, - metrics, - bt, - eb, - time: 0.0, - operators, - } - } - +impl SingleThreadedSystem { pub fn vortex(&mut self, t: Float, vortex_params: &euler::VortexParameters) { for (f, g) in self.fnow.iter_mut().zip(&self.grids) { f.vortex(g.x(), g.y(), t, &vortex_params); @@ -170,7 +348,7 @@ impl System { } }; - integrate::integrate::( + integrate::integrate::( rhs, &self.fnow, &mut self.fnext, @@ -231,191 +409,48 @@ impl System { max_dt } - - /// Spreads the computation over n threads, in a thread per grid way. - /// This system can only be called once for ntime calls. - pub fn distribute(self, ntime: u64) -> DistributedSystem { - let nthreads = self.grids.len(); - let time = 0.0; - // alt: crossbeam::WaitGroup - let b = Arc::new(Barrier::new(nthreads + 1)); - let dt = self.max_dt(); - - // Build up the boundary conditions - let mut push_channels: Vec>>>> = - Vec::with_capacity(nthreads); - let mut pull_channels: Vec>>>> = - vec![Direction::default(); nthreads]; - - // Build the set of communicators between boundaries - for wb in &self.bt { - let mut local_push = Direction::default(); - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .south_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.north_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .north_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.south_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .west_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.east_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .east_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.west_mut() = Some(s); - } - - push_channels.push(local_push); - } - - let mut tids = Vec::new(); - for ((((((((current, fut), grid), metrics), sbp), wb), bt), chan), push) in self - .fnow - .into_iter() - .zip(self.fnext.into_iter()) - .zip(self.grids.into_iter()) - .zip(self.metrics.into_iter()) - .zip(self.operators.into_iter()) - .zip(self.wb.into_iter()) - .zip(self.bt) - .zip(pull_channels) - .zip(push_channels) - { - let builder = std::thread::Builder::new().name(format!("eulersolver: {}", "smth")); - let barrier = b.clone(); - - let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt { - euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This, - euler::BoundaryCharacteristic::Grid(_) => { - DistributedBoundaryConditions::Channel(chan.unwrap()) - } - euler::BoundaryCharacteristic::Interpolate(_, int_op) => { - DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op) - } - euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(), - euler::BoundaryCharacteristic::Vortex(vp) => { - DistributedBoundaryConditions::Vortex(vp) - } - euler::BoundaryCharacteristic::Eval(eval) => { - DistributedBoundaryConditions::Eval(eval) - } - }); - - let (ny, nx) = (grid.ny(), grid.nx()); - - tids.push( - builder - .spawn(move || { - let mut sys = DistributedSystemPart { - barrier, - ntime, - dt, - current, - fut, - k: [ - Diff::zeros((ny, nx)), - Diff::zeros((ny, nx)), - Diff::zeros((ny, nx)), - Diff::zeros((ny, nx)), - ], - boundary_conditions, - grid: (grid, metrics), - _output: (), - push, - sbp, - t: time, - - wb, - wb_ns: Array2::zeros((4, nx)), - wb_ew: Array2::zeros((4, ny)), - }; - sys.advance(); - }) - .unwrap(), - ); - } - // Set up communicators - // Spawn a new communicator - - DistributedSystem { - _ntime: ntime, - start: b, - sys: tids, - } - } } -// single-threaded items: clone_from -// sync points: scaled_add, rhs -// -// Could instead make every thread (of a threadpool?) carry one grid themselves, -// and instead only wait on obtaining bc, which would make synchronisation -// be somewhat minimal -// -// Difficulties/implementation notes: -// * How to get output from each thread? Push to the multi-producer, single consumer queue -// with capacity of 2*N (which ensures internal synchronisation stays active), or many -// channels which are select!'ed. Choose instead a (name, itime, data) tuple, which is -// communicated. Also initialise the output file all at the start, since we know the -// number of time steps, and which timesteps will be used (should_output). -// * How to balance the number of threads/jobs? -// Each grid will be pushed to a thread, and a thread pool created for use by all threads -// combined. Set affinity to limit the number of cores available (could have side effects -// on heat etc, although this would also be the case for multi-threaded code). A -// thread-per-core architecture + the output thread if we have enough cores, else let os -// control the scheduling for us. -// Use cgroups to artificially limit the number of cores. -// Use taskset to artificially limit the number of cores. -// * Mechanism to wait for bc, is select available? Use a channel with 0-4 capacity, each -// other grid additonally pushes their direction to always use the available -// Use select or similar from crossbeam_channel -// * Mechanism to push bc: try_push and then move to next bc if not available. Spawn a -// thread into a threadpool to handle this. -// * Each grid is spawned onto an async threadpool with M threads allocated to itself, this should -// allow async waiting on the channel (or in the future from process/internet/bc processor) -// * BC: A local thread can have a handle pub struct DistributedSystem { - /// Simple messaging system to be replaced by a more sophisticated system (e.g. run 5 steps, - /// collect, initialise, return something to main) - /// This simply waits until all threads are ready, then starts the computation on all threads - start: Arc, - _ntime: u64, - /// These should be joined to mark the end of the computation + recv: Receiver<(usize, MsgToHost)>, + send: Vec>, + /// All threads should be joined to mark the end of the computation sys: Vec>, } impl DistributedSystem { - pub fn run(self) { - // This should start as we have n thread, but barrier blocks on n+1 - self.start.wait(); - self.sys.into_iter().for_each(|tid| tid.join().unwrap()); + pub fn advance(&mut self, ntime: u64) { + for tid in &self.send { + tid.send(MsgFromHost::Advance(ntime)); + } } + pub fn output(&self) { + todo!() + } +} + +impl Drop for DistributedSystem { + fn drop(&mut self) { + for tid in &self.send { + tid.send(MsgFromHost::Stop); + } + let handles = std::mem::take(&mut self.sys); + for tid in handles { + tid.join().unwrap() + } + } +} + +enum MsgFromHost { + Advance(u64), + MaxDt(Float), + Output(u64), + Stop, +} + +enum MsgToHost { + MaxDt(Float), + CurrentTimestep(u64), } // #[derive(Debug)] @@ -441,23 +476,39 @@ struct DistributedSystemPart { current: Field, fut: Field, - k: [Diff; 4], - wb: WorkBuffers, - barrier: Arc, - _output: (), // hdf5::Dataset eventually, + t: Float, dt: Float, - ntime: u64, - /// Work buffer for boundaries + name: String, + id: usize, + recv: Receiver, + send: Sender<(usize, MsgToHost)>, + + output: hdf5::Dataset, + + k: [Diff; 4], + wb: WorkBuffers, + /// Work buffer for north/south boundary wb_ns: Array2, + /// Work buffer for east/west boundary wb_ew: Array2, } impl DistributedSystemPart { - fn advance(&mut self) { - self.barrier.wait(); - for _i in 0..self.ntime { + fn run(&mut self) { + loop { + match self.recv.recv().unwrap() { + MsgFromHost::MaxDt(dt) => self.dt = dt, + MsgFromHost::Advance(ntime) => self.advance(ntime), + MsgFromHost::Output(ntime) => todo!(), + MsgFromHost::Stop => return, + } + } + } + fn advance(&mut self, ntime: u64) { + for ntime in 0..ntime { + self.send.send((self.id, MsgToHost::CurrentTimestep(ntime))); let metrics = &self.grid.1; let wb = &mut self.wb.0; let sbp = &self.sbp; @@ -482,7 +533,6 @@ impl DistributedSystemPart { s.send(y.west().to_owned()).unwrap() } - use std::ops::Deref; // This computation does not depend on the boundaries euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb);