From d0901f5755ed6e4f4f3c09c869d76b3da1fccfe8 Mon Sep 17 00:00:00 2001 From: Magnus Ulimoen Date: Sat, 21 Aug 2021 09:29:45 +0000 Subject: [PATCH] SAT boundaries for multi-thread fixing --- multigrid/Cargo.toml | 1 + multigrid/src/system.rs | 305 +++++++++++++++++----------------------- sbp/src/utils.rs | 12 ++ 3 files changed, 140 insertions(+), 178 deletions(-) diff --git a/multigrid/Cargo.toml b/multigrid/Cargo.toml index 66b7177..01dc0df 100644 --- a/multigrid/Cargo.toml +++ b/multigrid/Cargo.toml @@ -22,3 +22,4 @@ crossbeam-channel = "0.5.0" crossbeam-utils = "0.8.5" parking_lot = "0.11.1" lock_api = "0.4.4" +arrayvec = "0.7.1" diff --git a/multigrid/src/system.rs b/multigrid/src/system.rs index fdb9e81..c4c96cc 100644 --- a/multigrid/src/system.rs +++ b/multigrid/src/system.rs @@ -160,19 +160,10 @@ impl BaseSystem { // Build up the boundary conditions let mut pull = Vec::>::with_capacity(nthreads); - for wb in &self.boundary_conditions { - let data = wb.as_ref().map(|bc| { - if let euler::BoundaryCharacteristic::Grid(_) - | euler::BoundaryCharacteristic::Interpolate(_, _) = bc - { - CommunicatorData::NotAvailable - } else { - CommunicatorData::NotApplicable - } - }); + for _ in 0..nthreads { pull.push(Arc::new(Communicator { cvar: Condvar::new(), - data: Mutex::new(data), + data: Mutex::new(Direction::splat(()).map(|_| arrayvec::ArrayVec::new())), })); } @@ -721,11 +712,7 @@ pub enum DistributedBoundaryConditions { Channel, } -enum CommunicatorData { - NotApplicable, - NotAvailable, - Some(Array2), -} +type CommunicatorData = arrayvec::ArrayVec, 2>; struct Communicator { /// Waker for this grid, neighbours should have a reference @@ -898,9 +885,7 @@ impl DistributedSystemPart { wb.assign(&this); { let mut s = s.data.lock(); - let none = - std::mem::replace(sel(&mut s), CommunicatorData::Some(wb)); - assert!(matches!(none, CommunicatorData::NotAvailable)); + sel(&mut s).push(wb); } s.cvar.notify_one(); } @@ -987,16 +972,69 @@ impl DistributedSystemPart { { let mut data = pull.data.lock(); - 'check_boundaries: loop { - if boundaries_remaining.north { - if let CommunicatorData::Some(boundary) = std::mem::replace( - &mut (*data).north, - CommunicatorData::NotAvailable, - ) { - lock_api::MutexGuard::unlocked(&mut data, || { - let wb0 = workbuffer_edges.0.north_mut(); - let wb1 = workbuffer_edges.1.north.insert(boundary); - match boundary_conditions.north() { + 'check_boundaries: while boundaries_remaining.any() { + let boundaries = + boundaries_remaining + .zip(data.as_mut()) + .map( + |(remains, data)| { + if remains { + data.pop_at(0) + } else { + None + } + }, + ); + if boundaries.as_ref().map(Option::is_none).all() { + // Park thread while waiting for boundaries + pull.cvar.wait(&mut data); + continue 'check_boundaries; + } + // While we are waiting we can unlock mutex + lock_api::MutexGuard::unlocked(&mut data, || { + if let Some(boundary) = boundaries.north { + boundaries_remaining.north = false; + let wb0 = workbuffer_edges.0.north_mut(); + let wb1 = workbuffer_edges.1.north.insert(boundary); + match boundary_conditions.north() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); + } + } + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.north.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = + Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); + workbuffer_edges.1.north = Some(wb); + } + _ => unreachable!(), + } + euler::SAT_north( + sbp.deref(), + k.north_mut(), + y, + metrics, + wb0.view(), + ); + }; + + if boundaries_remaining.south { + boundaries_remaining.south = false; + if let Some(boundary) = boundaries.south { + let wb0 = workbuffer_edges.0.south_mut(); + let wb1 = workbuffer_edges.1.south.insert(boundary); + match boundary_conditions.south() { DistributedBoundaryConditions::Channel => { std::mem::swap(wb0, wb1); } @@ -1012,176 +1050,87 @@ impl DistributedSystemPart { } } // Reshape edge buffer to correct size - let wb = workbuffer_edges.1.north.take().unwrap(); + let wb = workbuffer_edges.1.south.take().unwrap(); let mut vec = wb.into_raw_vec(); vec.resize(wb0.len(), 0.0); let wb = Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); - workbuffer_edges.1.north = Some(wb); + workbuffer_edges.1.south = Some(wb); } _ => unreachable!(), } - euler::SAT_north( + euler::SAT_south( sbp.deref(), - k.north_mut(), + k.south_mut(), y, metrics, wb0.view(), ); - boundaries_remaining.north = false; - }); - continue 'check_boundaries; + }; } - if boundaries_remaining.south { - if let CommunicatorData::Some(boundary) = std::mem::replace( - &mut (*data).south, - CommunicatorData::NotAvailable, - ) { - lock_api::MutexGuard::unlocked(&mut data, || { - let wb0 = workbuffer_edges.0.south_mut(); - let wb1 = workbuffer_edges.1.south.insert(boundary); - match boundary_conditions.south() { - DistributedBoundaryConditions::Channel => { - std::mem::swap(wb0, wb1); + if let Some(boundary) = boundaries.east { + boundaries_remaining.east = false; + let wb0 = workbuffer_edges.0.east_mut(); + let wb1 = workbuffer_edges.1.east.insert(boundary); + match boundary_conditions.east() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); } - DistributedBoundaryConditions::Interpolate(int_op) => { - let is_fine2coarse = - wb1.shape()[1] > wb0.shape()[2]; - for (to, from) in - wb0.outer_iter_mut().zip(wb1.outer_iter()) - { - if is_fine2coarse { - int_op.fine2coarse(from, to); - } else { - int_op.coarse2fine(from, to); - } - } - // Reshape edge buffer to correct size - let wb = workbuffer_edges.1.south.take().unwrap(); - let mut vec = wb.into_raw_vec(); - vec.resize(wb0.len(), 0.0); - let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) - .unwrap(); - workbuffer_edges.1.south = Some(wb); - } - _ => unreachable!(), } - euler::SAT_south( - sbp.deref(), - k.south_mut(), - y, - metrics, - wb0.view(), - ); - boundaries_remaining.south = false; - }); - continue 'check_boundaries; + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.east.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = + Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); + workbuffer_edges.1.east = Some(wb); + } + _ => unreachable!(), } - } + euler::SAT_east(sbp.deref(), k.east_mut(), y, metrics, wb0.view()); + }; - if boundaries_remaining.east { - if let CommunicatorData::Some(boundary) = std::mem::replace( - &mut (*data).east, - CommunicatorData::NotAvailable, - ) { - lock_api::MutexGuard::unlocked(&mut data, || { - let wb0 = workbuffer_edges.0.east_mut(); - let wb1 = workbuffer_edges.1.east.insert(boundary); - match boundary_conditions.east() { - DistributedBoundaryConditions::Channel => { - std::mem::swap(wb0, wb1); + if let Some(boundary) = boundaries.west { + boundaries_remaining.west = false; + let wb0 = workbuffer_edges.0.west_mut(); + let wb1 = workbuffer_edges.1.west.insert(boundary); + match boundary_conditions.west() { + DistributedBoundaryConditions::Channel => { + std::mem::swap(wb0, wb1); + } + DistributedBoundaryConditions::Interpolate(int_op) => { + let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; + for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter()) + { + if is_fine2coarse { + int_op.fine2coarse(from, to); + } else { + int_op.coarse2fine(from, to); } - DistributedBoundaryConditions::Interpolate(int_op) => { - let is_fine2coarse = - wb1.shape()[1] > wb0.shape()[2]; - for (to, from) in - wb0.outer_iter_mut().zip(wb1.outer_iter()) - { - if is_fine2coarse { - int_op.fine2coarse(from, to); - } else { - int_op.coarse2fine(from, to); - } - } - // Reshape edge buffer to correct size - let wb = workbuffer_edges.1.east.take().unwrap(); - let mut vec = wb.into_raw_vec(); - vec.resize(wb0.len(), 0.0); - let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) - .unwrap(); - workbuffer_edges.1.east = Some(wb); - } - _ => unreachable!(), } - euler::SAT_east( - sbp.deref(), - k.east_mut(), - y, - metrics, - wb0.view(), - ); - boundaries_remaining.east = false; - }); - continue 'check_boundaries; + // Reshape edge buffer to correct size + let wb = workbuffer_edges.1.west.take().unwrap(); + let mut vec = wb.into_raw_vec(); + vec.resize(wb0.len(), 0.0); + let wb = + Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); + workbuffer_edges.1.west = Some(wb); + } + _ => unreachable!(), } - } - - if boundaries_remaining.west { - if let CommunicatorData::Some(boundary) = std::mem::replace( - &mut (*data).west, - CommunicatorData::NotAvailable, - ) { - lock_api::MutexGuard::unlocked(&mut data, || { - let wb0 = workbuffer_edges.0.west_mut(); - let wb1 = workbuffer_edges.1.west.insert(boundary); - match boundary_conditions.west() { - DistributedBoundaryConditions::Channel => { - std::mem::swap(wb0, wb1); - } - DistributedBoundaryConditions::Interpolate(int_op) => { - let is_fine2coarse = - wb1.shape()[1] > wb0.shape()[2]; - for (to, from) in - wb0.outer_iter_mut().zip(wb1.outer_iter()) - { - if is_fine2coarse { - int_op.fine2coarse(from, to); - } else { - int_op.coarse2fine(from, to); - } - } - // Reshape edge buffer to correct size - let wb = workbuffer_edges.1.west.take().unwrap(); - let mut vec = wb.into_raw_vec(); - vec.resize(wb0.len(), 0.0); - let wb = Array2::from_shape_vec(wb0.raw_dim(), vec) - .unwrap(); - workbuffer_edges.1.west = Some(wb); - } - _ => unreachable!(), - } - euler::SAT_west( - sbp.deref(), - k.west_mut(), - y, - metrics, - wb0.view(), - ); - boundaries_remaining.west = false; - }); - continue 'check_boundaries; - } - } - } - if !boundaries_remaining.any() { - break 'check_boundaries; - } - println!("{:?}", boundaries_remaining); - // We have no available boundaries yet, can wait until - // notified by other threads. Early continues - // ensures the boundary has not been notified earlier - pull.cvar.wait(&mut data); + euler::SAT_west(sbp.deref(), k.west_mut(), y, metrics, wb0.view()); + }; + }); } } }; diff --git a/sbp/src/utils.rs b/sbp/src/utils.rs index 168a9ba..bd87a7b 100644 --- a/sbp/src/utils.rs +++ b/sbp/src/utils.rs @@ -63,6 +63,18 @@ impl Direction { west: self.east, } } + + pub fn splat(t: T) -> Self + where + T: Copy, + { + Self { + north: t, + south: t, + east: t, + west: t, + } + } } impl Direction> {