From d2c811d3af009bb907dd20229278419c28cedf4d Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Fri, 20 Aug 2021 15:57:12 +0000 Subject: [PATCH] Rework wait primitive to condvar --- euler/src/lib.rs | 53 ++-- multigrid/Cargo.toml | 2 + multigrid/src/system.rs | 685 ++++++++++++++++++++++------------------ sbp/src/utils.rs | 19 ++ 4 files changed, 421 insertions(+), 338 deletions(-) diff --git a/euler/src/lib.rs b/euler/src/lib.rs index f712fb1..e4cf538 100644 --- a/euler/src/lib.rs +++ b/euler/src/lib.rs @@ -301,22 +301,18 @@ impl Field { pub fn west(&self) -> ArrayView2 { self.slice(s![.., .., 0]) } - #[allow(unused)] - fn north_mut(&mut self) -> ArrayViewMut2 { + pub fn north_mut(&mut self) -> ArrayViewMut2 { let ny = self.ny(); self.slice_mut(s![.., ny - 1, ..]) } - #[allow(unused)] - fn south_mut(&mut self) -> ArrayViewMut2 { + pub fn south_mut(&mut self) -> ArrayViewMut2 { self.slice_mut(s![.., 0, ..]) } - #[allow(unused)] - fn east_mut(&mut self) -> ArrayViewMut2 { + pub fn east_mut(&mut self) -> ArrayViewMut2 { let nx = self.nx(); self.slice_mut(s![.., .., nx - 1]) } - #[allow(unused)] - fn west_mut(&mut self) -> ArrayViewMut2 { + pub fn west_mut(&mut self) -> ArrayViewMut2 { self.slice_mut(s![.., .., 0]) } @@ -463,18 +459,18 @@ impl Diff { pub fn zeros((ny, nx): (usize, usize)) -> Self { Self(Array3::zeros((4, ny, nx))) } - fn north_mut(&mut self) -> ArrayViewMut2 { + pub fn north_mut(&mut self) -> ArrayViewMut2 { let ny = self.shape()[1]; self.0.slice_mut(s![.., ny - 1, ..]) } - fn south_mut(&mut self) -> ArrayViewMut2 { + pub fn south_mut(&mut self) -> ArrayViewMut2 { self.0.slice_mut(s![.., 0, ..]) } - fn east_mut(&mut self) -> ArrayViewMut2 { + pub fn east_mut(&mut self) -> ArrayViewMut2 { let nx = self.shape()[2]; self.0.slice_mut(s![.., .., nx - 1]) } - fn west_mut(&mut self) -> ArrayViewMut2 { + pub fn west_mut(&mut self) -> ArrayViewMut2 { self.0.slice_mut(s![.., .., 0]) } } @@ -1010,16 +1006,25 @@ pub fn SAT_characteristics( metrics: &Metrics, boundaries: &BoundaryTerms, ) { - SAT_north(op, k, y, metrics, boundaries.north); - SAT_south(op, k, y, metrics, boundaries.south); - SAT_east(op, k, y, metrics, boundaries.east); - SAT_west(op, k, y, metrics, boundaries.west); + SAT_north(op, k.north_mut(), y, metrics, boundaries.north); + SAT_south(op, k.south_mut(), y, metrics, boundaries.south); + SAT_east(op, k.east_mut(), y, metrics, boundaries.east); + SAT_west(op, k.west_mut(), y, metrics, boundaries.west); } +pub const SAT_FUNCTIONS: Direction< + fn(&dyn SbpOperator2d, ArrayViewMut2, &Field, &Metrics, ArrayView2), +> = Direction { + north: SAT_north, + south: SAT_south, + west: SAT_west, + east: SAT_east, +}; + #[allow(non_snake_case)] pub fn SAT_north( op: &dyn SbpOperator2d, - k: &mut Diff, + k: ArrayViewMut2, y: &Field, metrics: &Metrics, boundary: ArrayView2, @@ -1035,7 +1040,7 @@ pub fn SAT_north( let tau = 1.0; let slice = s![y.ny() - 1, ..]; SAT_characteristic( - k.north_mut(), + k, y.north(), boundary, hi, @@ -1050,7 +1055,7 @@ pub fn SAT_north( #[allow(non_snake_case)] pub fn SAT_south( op: &dyn SbpOperator2d, - k: &mut Diff, + k: ArrayViewMut2, y: &Field, metrics: &Metrics, boundary: ArrayView2, @@ -1065,7 +1070,7 @@ pub fn SAT_south( let tau = -1.0; let slice = s![0, ..]; SAT_characteristic( - k.south_mut(), + k, y.south(), boundary, hi, @@ -1080,7 +1085,7 @@ pub fn SAT_south( #[allow(non_snake_case)] pub fn SAT_west( op: &dyn SbpOperator2d, - k: &mut Diff, + k: ArrayViewMut2, y: &Field, metrics: &Metrics, boundary: ArrayView2, @@ -1096,7 +1101,7 @@ pub fn SAT_west( let tau = -1.0; let slice = s![.., 0]; SAT_characteristic( - k.west_mut(), + k, y.west(), boundary, hi, @@ -1111,7 +1116,7 @@ pub fn SAT_west( #[allow(non_snake_case)] pub fn SAT_east( op: &dyn SbpOperator2d, - k: &mut Diff, + k: ArrayViewMut2, y: &Field, metrics: &Metrics, boundary: ArrayView2, @@ -1126,7 +1131,7 @@ pub fn SAT_east( let tau = 1.0; let slice = s![.., y.nx() - 1]; SAT_characteristic( - k.east_mut(), + k, y.east(), boundary, hi, diff --git a/multigrid/Cargo.toml b/multigrid/Cargo.toml index 112eaa6..66b7177 100644 --- a/multigrid/Cargo.toml +++ b/multigrid/Cargo.toml @@ -20,3 +20,5 @@ argh = "0.1.4" evalexpr = "6.3.0" crossbeam-channel = "0.5.0" crossbeam-utils = "0.8.5" +parking_lot = "0.11.1" +lock_api = "0.4.4" diff --git a/multigrid/src/system.rs b/multigrid/src/system.rs index 5bb2d8b..fdb9e81 100644 --- a/multigrid/src/system.rs +++ b/multigrid/src/system.rs @@ -1,15 +1,17 @@ use crate::parsing; use crate::utils::Direction; use core::ops::Deref; -use crossbeam_channel::{Receiver, Select, Sender}; +use crossbeam_channel::{Receiver, Sender}; use euler::{ eval::{self, Evaluator}, Diff, Field, VortexParameters, WorkBuffers, }; use ndarray::Array2; +use parking_lot::{Condvar, Mutex}; use sbp::grid::{Grid, Metrics}; use sbp::operators::{InterpolationOperator, SbpOperator2d}; use sbp::*; +use std::sync::Arc; pub struct BaseSystem { pub names: Vec, @@ -157,56 +159,37 @@ impl BaseSystem { let nthreads = self.grids.len(); // Build up the boundary conditions - let mut push_channels: Vec>>>> = - Vec::with_capacity(nthreads); - let mut pull_channels: Vec>>>> = - vec![Direction::default(); nthreads]; + let mut pull = Vec::>::with_capacity(nthreads); + for wb in &self.boundary_conditions { + let data = wb.as_ref().map(|bc| { + if let euler::BoundaryCharacteristic::Grid(_) + | euler::BoundaryCharacteristic::Interpolate(_, _) = bc + { + CommunicatorData::NotAvailable + } else { + CommunicatorData::NotApplicable + } + }); + pull.push(Arc::new(Communicator { + cvar: Condvar::new(), + data: Mutex::new(data), + })); + } // Build the set of communicators between boundaries + let mut push = Vec::>>>::with_capacity(nthreads); for wb in &self.boundary_conditions { - let mut local_push = Direction::default(); - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .south_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.north_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .north_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.south_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .west_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.east_mut() = Some(s); - } - if let euler::BoundaryCharacteristic::Grid(i) - | euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west() - { - let (s, r) = crossbeam_channel::bounded(1); - pull_channels[*i] - .east_mut() - .replace(r) - .and_then::<(), _>(|_| panic!("channel is already present")); - *local_push.west_mut() = Some(s); - } + let local_push = wb.as_ref().map(|bc| { + if let euler::BoundaryCharacteristic::Grid(i) + | euler::BoundaryCharacteristic::Interpolate(i, _) = bc + { + Some(pull[*i].clone()) + } else { + None + } + }); - push_channels.push(local_push); + push.push(local_push); } let (master_send, master_recv) = crossbeam_channel::unbounded(); @@ -214,25 +197,23 @@ impl BaseSystem { let mut tids = Vec::with_capacity(nthreads); let mut communicators = Vec::with_capacity(nthreads); - for (id, (((((name, grid), sbp), bt), chan), push)) in self + for (id, (((((name, grid), sbp), bt), pull), push)) in self .names .into_iter() .zip(self.grids.into_iter()) .zip(self.operators.into_iter()) .zip(self.boundary_conditions) - .zip(pull_channels) - .zip(push_channels) + .zip(pull) + .zip(push) .enumerate() { let builder = std::thread::Builder::new().name(format!("mg: {}", name)); - let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt { + let boundary_conditions = bt.map(|bt| match bt { euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This, - euler::BoundaryCharacteristic::Grid(_) => { - DistributedBoundaryConditions::Channel(chan.unwrap()) - } + euler::BoundaryCharacteristic::Grid(_) => DistributedBoundaryConditions::Channel, euler::BoundaryCharacteristic::Interpolate(_, int_op) => { - DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op) + DistributedBoundaryConditions::Interpolate(int_op) } euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(), euler::BoundaryCharacteristic::Vortex(vp) => { @@ -319,6 +300,7 @@ impl BaseSystem { grid: (grid, metrics), output: g, push, + pull, sbp, t: time, dt: Float::NAN, @@ -331,8 +313,20 @@ impl BaseSystem { send: master_send, wb, - wb_ns: Array2::zeros((4, nx)), - wb_ew: Array2::zeros((4, ny)), + workbuffer_edges: ( + Direction { + north: Array2::zeros((4, nx)), + south: Array2::zeros((4, nx)), + east: Array2::zeros((4, ny)), + west: Array2::zeros((4, ny)), + }, + Direction { + north: Some(Array2::zeros((4, nx))), + south: Some(Array2::zeros((4, nx))), + east: Some(Array2::zeros((4, ny))), + west: Some(Array2::zeros((4, ny))), + }, + ), progressbar: None, }; @@ -723,19 +717,34 @@ pub enum DistributedBoundaryConditions { Vortex(VortexParameters), Eval(std::sync::Arc>), - Interpolate(Receiver>, Box), - Channel(Receiver>), + Interpolate(Box), + Channel, } -type PushCommunicator = Option>>; +enum CommunicatorData { + NotApplicable, + NotAvailable, + Some(Array2), +} + +struct Communicator { + /// Waker for this grid, neighbours should have a reference + /// and notify when a boundary has been put + cvar: Condvar, + /// Internal data exchange, is None on missing data, inner type + /// can be set to None when grabbing the boundary + data: Mutex>, +} struct DistributedSystemPart { grid: (Grid, Metrics), sbp: Box, boundary_conditions: Direction, + /// Channel pullers + pull: Arc, /// Subscribers to the boundaries of self - push: Direction, + push: Direction>>, current: Field, fut: Field, @@ -753,10 +762,13 @@ struct DistributedSystemPart { k: [Diff; 4], wb: WorkBuffers, - /// Work buffer for north/south boundary - wb_ns: Array2, - /// Work buffer for east/west boundary - wb_ew: Array2, + /// Work buffer for boundaries + /// + // Option: This can be sent from the current thread to another, + // Will be replenished by arriving boundary conditions for + // zero-allocation in loop (no global locks). + // Should never be None on entry to loop + workbuffer_edges: (Direction>, Direction>>), progressbar: Option, } @@ -857,25 +869,42 @@ impl DistributedSystemPart { let wb = &mut self.wb.0; let sbp = &self.sbp; let push = &self.push; + let pull = &self.pull; let boundary_conditions = &self.boundary_conditions; let grid = &self.grid.0; - let wb_ns = &mut self.wb_ns; - let wb_ew = &mut self.wb_ew; + let workbuffer_edges = &mut self.workbuffer_edges; let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| { - // Send off the boundaries optimistically, in case some grid is ready - if let Some(s) = &push.north { - s.send(y.north().to_owned()).unwrap() - } - if let Some(s) = &push.south { - s.send(y.south().to_owned()).unwrap() - } - if let Some(s) = &push.east { - s.send(y.east().to_owned()).unwrap() - } - if let Some(s) = &push.west { - s.send(y.west().to_owned()).unwrap() - } + // Send off the boundaries eagerly, in case neighbouring grid is ready + push.as_ref() + .zip(workbuffer_edges.1.as_mut()) + .zip( + Direction::) -> &mut CommunicatorData> { + north: |x| x.south_mut(), + south: |x| x.north_mut(), + east: |x| x.west_mut(), + west: |x| x.east_mut(), + }, + ) + .zip(Direction { + north: y.north(), + south: y.south(), + west: y.west(), + east: y.east(), + }) + .map(|(((push, wb), sel), this)| { + if let Some(s) = push { + let mut wb = wb.take().unwrap(); + wb.assign(&this); + { + let mut s = s.data.lock(); + let none = + std::mem::replace(sel(&mut s), CommunicatorData::Some(wb)); + assert!(matches!(none, CommunicatorData::NotAvailable)); + } + s.cvar.notify_one(); + } + }); // This computation does not depend on the boundaries euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb); @@ -883,249 +912,277 @@ impl DistributedSystemPart { // Get boundaries, but be careful and maximise the amount of work which can be // performed before we have all of them, whilst ensuring threads can sleep for as // long as possible - let mut select = Select::new(); - let mut selectable = 0; - let recv_north = match boundary_conditions.north() { - DistributedBoundaryConditions::Channel(r) - | DistributedBoundaryConditions::Interpolate(r, _) => { - selectable += 1; - Some(select.recv(r)) - } - DistributedBoundaryConditions::This => { - euler::SAT_north(sbp.deref(), k, y, metrics, y.south()); - None - } - DistributedBoundaryConditions::Vortex(vp) => { - let mut fiter = wb_ns.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.north(); - vp.evaluate(time, gx, gy, rho, rhou, rhov, e); - - euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); - None - } - DistributedBoundaryConditions::Eval(eval) => { - let mut fiter = wb_ns.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.north(); - eval.evaluate(time, gx, gy, rho, rhou, rhov, e); - euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); - None - } - }; - let recv_south = match boundary_conditions.south() { - DistributedBoundaryConditions::Channel(r) - | DistributedBoundaryConditions::Interpolate(r, _) => { - selectable += 1; - Some(select.recv(r)) - } - DistributedBoundaryConditions::This => { - euler::SAT_south(sbp.deref(), k, y, metrics, y.north()); - None - } - DistributedBoundaryConditions::Vortex(vp) => { - let mut fiter = wb_ns.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.south(); - vp.evaluate(time, gx, gy, rho, rhou, rhov, e); - - euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view()); - None - } - DistributedBoundaryConditions::Eval(eval) => { - let mut fiter = wb_ns.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.south(); - eval.evaluate(time, gx, gy, rho, rhou, rhov, e); - euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view()); - None - } - }; - let recv_east = match boundary_conditions.east() { - DistributedBoundaryConditions::Channel(r) - | DistributedBoundaryConditions::Interpolate(r, _) => { - selectable += 1; - Some(select.recv(r)) - } - DistributedBoundaryConditions::This => { - euler::SAT_east(sbp.deref(), k, y, metrics, y.west()); - None - } - DistributedBoundaryConditions::Vortex(vp) => { - let mut fiter = wb_ew.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.east(); - vp.evaluate(time, gx, gy, rho, rhou, rhov, e); - - euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view()); - None - } - DistributedBoundaryConditions::Eval(eval) => { - let mut fiter = wb_ew.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.east(); - eval.evaluate(time, gx, gy, rho, rhou, rhov, e); - euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view()); - None - } - }; - let recv_west = match boundary_conditions.west() { - DistributedBoundaryConditions::Channel(r) - | DistributedBoundaryConditions::Interpolate(r, _) => { - selectable += 1; - Some(select.recv(r)) - } - DistributedBoundaryConditions::This => { - euler::SAT_west(sbp.deref(), k, y, metrics, y.east()); - None - } - DistributedBoundaryConditions::Vortex(vp) => { - let mut fiter = wb_ew.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.west(); - vp.evaluate(time, gx, gy, rho, rhou, rhov, e); - - euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); - None - } - DistributedBoundaryConditions::Eval(eval) => { - let mut fiter = wb_ew.outer_iter_mut(); - let (rho, rhou, rhov, e) = ( - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - fiter.next().unwrap(), - ); - let (gx, gy) = grid.west(); - eval.evaluate(time, gx, gy, rho, rhou, rhov, e); - euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); - None - } - }; - - // Get an item off each channel, waiting minimally before processing that boundary. - // The waiting ensures other grids can be processed by the core in case of - // oversubscription (in case of a more grids than core scenario) - // This minimises the amount of time waiting on boundary conditions - while selectable != 0 { - let s = select.select(); - let sindex = s.index(); - match Some(sindex) { - x if x == recv_north => match boundary_conditions.north() { - DistributedBoundaryConditions::Channel(r) => { - let r = s.recv(r).unwrap(); - euler::SAT_north(sbp.deref(), k, y, metrics, r.view()); + let computed = boundary_conditions + .as_ref() + .zip(euler::SAT_FUNCTIONS) + .zip(workbuffer_edges.0.as_mut()) + .zip(workbuffer_edges.1.as_mut()) + .zip(Direction { + north: y.south(), + south: y.north(), + east: y.west(), + west: y.east(), + }) + .zip(Direction { + north: grid.north(), + south: grid.south(), + east: grid.east(), + west: grid.west(), + }) + .map(|(((((bc, sat), wb0), wb1), self_edge), grid)| { + wb0.fill(0.0); + match bc { + DistributedBoundaryConditions::Channel + | DistributedBoundaryConditions::Interpolate(_) => false, + DistributedBoundaryConditions::This => { + sat(sbp.deref(), wb0.view_mut(), y, metrics, self_edge); + true } - DistributedBoundaryConditions::Interpolate(r, int_op) => { - let r = s.recv(r).unwrap(); - let is_fine2coarse = r.shape()[1] > wb_ns.shape()[1]; - for (mut to, from) in wb_ns.outer_iter_mut().zip(r.outer_iter()) { - if is_fine2coarse { - int_op.fine2coarse(from.view(), to.view_mut()); - } else { - int_op.coarse2fine(from.view(), to.view_mut()); + DistributedBoundaryConditions::Vortex(vp) => { + let wb1 = wb1.as_mut().unwrap(); + let mut fiter = wb1.outer_iter_mut(); + let (rho, rhou, rhov, e) = ( + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + ); + let (gx, gy) = grid; + vp.evaluate(time, gx, gy, rho, rhou, rhov, e); + + sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view()); + true + } + DistributedBoundaryConditions::Eval(eval) => { + let wb1 = wb1.as_mut().unwrap(); + let mut fiter = wb1.outer_iter_mut(); + let (rho, rhou, rhov, e) = ( + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + fiter.next().unwrap(), + ); + let (gx, gy) = grid; + eval.evaluate(time, gx, gy, rho, rhou, rhov, e); + sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view()); + true + } + } + }); + + if computed.north { + k.north_mut().scaled_add(1.0, &workbuffer_edges.0.north()); + } + if computed.south { + k.south_mut().scaled_add(1.0, &workbuffer_edges.0.south()); + } + if computed.east { + k.east_mut().scaled_add(1.0, &workbuffer_edges.0.east()); + } + if computed.west { + k.west_mut().scaled_add(1.0, &workbuffer_edges.0.west()); + } + + let mut boundaries_remaining = computed.map(|b| !b); + + { + let mut data = pull.data.lock(); + 'check_boundaries: loop { + if boundaries_remaining.north { + if let CommunicatorData::Some(boundary) = std::mem::replace( + &mut (*data).north, + CommunicatorData::NotAvailable, + ) { + lock_api::MutexGuard::unlocked(&mut data, || { + let wb0 = workbuffer_edges.0.north_mut(); + let wb1 = workbuffer_edges.1.north.insert(boundary); + match boundary_conditions.north() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in + wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.north.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = + Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); + workbuffer_edges.1.north = Some(wb); + } + _ => unreachable!(), } + euler::SAT_north( + sbp.deref(), + k.north_mut(), + y, + metrics, + wb0.view(), + ); + boundaries_remaining.north = false; + }); + continue 'check_boundaries; + } + + if boundaries_remaining.south { + if let CommunicatorData::Some(boundary) = std::mem::replace( + &mut (*data).south, + CommunicatorData::NotAvailable, + ) { + lock_api::MutexGuard::unlocked(&mut data, || { + let wb0 = workbuffer_edges.0.south_mut(); + let wb1 = workbuffer_edges.1.south.insert(boundary); + match boundary_conditions.south() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = + wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in + wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.south.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) + .unwrap(); + workbuffer_edges.1.south = Some(wb); + } + _ => unreachable!(), + } + euler::SAT_south( + sbp.deref(), + k.south_mut(), + y, + metrics, + wb0.view(), + ); + boundaries_remaining.south = false; + }); + continue 'check_boundaries; } - euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); } - _ => unreachable!(), - }, - x if x == recv_south => match boundary_conditions.south() { - DistributedBoundaryConditions::Channel(r) => { - let r = s.recv(r).unwrap(); - euler::SAT_south(sbp.deref(), k, y, metrics, r.view()); - } - DistributedBoundaryConditions::Interpolate(r, int_op) => { - let r = s.recv(r).unwrap(); - let is_fine2coarse = r.shape()[1] > wb_ns.shape()[1]; - for (mut to, from) in wb_ns.outer_iter_mut().zip(r.outer_iter()) { - if is_fine2coarse { - int_op.fine2coarse(from.view(), to.view_mut()); - } else { - int_op.coarse2fine(from.view(), to.view_mut()); - } + + if boundaries_remaining.east { + if let CommunicatorData::Some(boundary) = std::mem::replace( + &mut (*data).east, + CommunicatorData::NotAvailable, + ) { + lock_api::MutexGuard::unlocked(&mut data, || { + let wb0 = workbuffer_edges.0.east_mut(); + let wb1 = workbuffer_edges.1.east.insert(boundary); + match boundary_conditions.east() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = + wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in + wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.east.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) + .unwrap(); + workbuffer_edges.1.east = Some(wb); + } + _ => unreachable!(), + } + euler::SAT_east( + sbp.deref(), + k.east_mut(), + y, + metrics, + wb0.view(), + ); + boundaries_remaining.east = false; + }); + continue 'check_boundaries; } - euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view()); } - _ => unreachable!(), - }, - x if x == recv_west => match boundary_conditions.west() { - DistributedBoundaryConditions::Channel(r) => { - let r = s.recv(r).unwrap(); - euler::SAT_west(sbp.deref(), k, y, metrics, r.view()); - } - DistributedBoundaryConditions::Interpolate(r, int_op) => { - let r = s.recv(r).unwrap(); - let is_fine2coarse = r.shape()[1] > wb_ew.shape()[1]; - for (mut to, from) in wb_ew.outer_iter_mut().zip(r.outer_iter()) { - if is_fine2coarse { - int_op.fine2coarse(from.view(), to.view_mut()); - } else { - int_op.coarse2fine(from.view(), to.view_mut()); - } + + if boundaries_remaining.west { + if let CommunicatorData::Some(boundary) = std::mem::replace( + &mut (*data).west, + CommunicatorData::NotAvailable, + ) { + lock_api::MutexGuard::unlocked(&mut data, || { + let wb0 = workbuffer_edges.0.west_mut(); + let wb1 = workbuffer_edges.1.west.insert(boundary); + match boundary_conditions.west() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = + wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in + wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.west.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) + .unwrap(); + workbuffer_edges.1.west = Some(wb); + } + _ => unreachable!(), + } + euler::SAT_west( + sbp.deref(), + k.west_mut(), + y, + metrics, + wb0.view(), + ); + boundaries_remaining.west = false; + }); + continue 'check_boundaries; } - euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); } - _ => unreachable!(), - }, - x if x == recv_east => match boundary_conditions.east() { - DistributedBoundaryConditions::Channel(r) => { - let r = s.recv(r).unwrap(); - euler::SAT_east(sbp.deref(), k, y, metrics, r.view()); - } - DistributedBoundaryConditions::Interpolate(r, int_op) => { - let r = s.recv(r).unwrap(); - let is_fine2coarse = r.shape()[1] > wb_ew.shape()[1]; - for (mut to, from) in wb_ew.outer_iter_mut().zip(r.outer_iter()) { - if is_fine2coarse { - int_op.fine2coarse(from.view(), to.view_mut()); - } else { - int_op.coarse2fine(from.view(), to.view_mut()); - } - } - euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view()); - } - _ => unreachable!(), - }, - _ => unreachable!(), + } + if !boundaries_remaining.any() { + break 'check_boundaries; + } + println!("{:?}", boundaries_remaining); + // We have no available boundaries yet, can wait until + // notified by other threads. Early continues + // ensures the boundary has not been notified earlier + pull.cvar.wait(&mut data); } - select.remove(sindex); - selectable -= 1; } }; integrate::integrate::( diff --git a/sbp/src/utils.rs b/sbp/src/utils.rs index 8d1f025..168a9ba 100644 --- a/sbp/src/utils.rs +++ b/sbp/src/utils.rs @@ -53,6 +53,16 @@ impl Direction { east: (self.east, other.east), } } + + /// Flips all direction through origo + pub fn opposite(self) -> Self { + Self { + north: self.south, + south: self.north, + east: self.west, + west: self.east, + } + } } impl Direction> { @@ -96,6 +106,15 @@ impl Direction { } } +impl Direction { + pub fn all(&self) -> bool { + self.north && self.south && self.east && self.west + } + pub fn any(&self) -> bool { + self.north || self.south || self.east || self.west + } +} + /// Linearly spaced parameters, apart from the boundaries which /// only have a distance of `h/2` from the boundary pub fn h2linspace(start: Float, end: Float, n: usize) -> ndarray::Array1 {