SAT boundaries for multi-thread fixing

This commit is contained in:
Magnus Ulimoen 2021-08-21 09:29:45 +00:00
parent d2c811d3af
commit d0901f5755
3 changed files with 140 additions and 178 deletions

View File

@ -22,3 +22,4 @@ crossbeam-channel = "0.5.0"
crossbeam-utils = "0.8.5" crossbeam-utils = "0.8.5"
parking_lot = "0.11.1" parking_lot = "0.11.1"
lock_api = "0.4.4" lock_api = "0.4.4"
arrayvec = "0.7.1"

View File

@ -160,19 +160,10 @@ impl BaseSystem {
// Build up the boundary conditions // Build up the boundary conditions
let mut pull = Vec::<Arc<Communicator>>::with_capacity(nthreads); let mut pull = Vec::<Arc<Communicator>>::with_capacity(nthreads);
for wb in &self.boundary_conditions { for _ in 0..nthreads {
let data = wb.as_ref().map(|bc| {
if let euler::BoundaryCharacteristic::Grid(_)
| euler::BoundaryCharacteristic::Interpolate(_, _) = bc
{
CommunicatorData::NotAvailable
} else {
CommunicatorData::NotApplicable
}
});
pull.push(Arc::new(Communicator { pull.push(Arc::new(Communicator {
cvar: Condvar::new(), cvar: Condvar::new(),
data: Mutex::new(data), data: Mutex::new(Direction::splat(()).map(|_| arrayvec::ArrayVec::new())),
})); }));
} }
@ -721,11 +712,7 @@ pub enum DistributedBoundaryConditions {
Channel, Channel,
} }
enum CommunicatorData { type CommunicatorData = arrayvec::ArrayVec<Array2<Float>, 2>;
NotApplicable,
NotAvailable,
Some(Array2<Float>),
}
struct Communicator { struct Communicator {
/// Waker for this grid, neighbours should have a reference /// Waker for this grid, neighbours should have a reference
@ -898,9 +885,7 @@ impl DistributedSystemPart {
wb.assign(&this); wb.assign(&this);
{ {
let mut s = s.data.lock(); let mut s = s.data.lock();
let none = sel(&mut s).push(wb);
std::mem::replace(sel(&mut s), CommunicatorData::Some(wb));
assert!(matches!(none, CommunicatorData::NotAvailable));
} }
s.cvar.notify_one(); s.cvar.notify_one();
} }
@ -987,16 +972,69 @@ impl DistributedSystemPart {
{ {
let mut data = pull.data.lock(); let mut data = pull.data.lock();
'check_boundaries: loop { 'check_boundaries: while boundaries_remaining.any() {
if boundaries_remaining.north { let boundaries =
if let CommunicatorData::Some(boundary) = std::mem::replace( boundaries_remaining
&mut (*data).north, .zip(data.as_mut())
CommunicatorData::NotAvailable, .map(
) { |(remains, data)| {
lock_api::MutexGuard::unlocked(&mut data, || { if remains {
let wb0 = workbuffer_edges.0.north_mut(); data.pop_at(0)
let wb1 = workbuffer_edges.1.north.insert(boundary); } else {
match boundary_conditions.north() { None
}
},
);
if boundaries.as_ref().map(Option::is_none).all() {
// Park thread while waiting for boundaries
pull.cvar.wait(&mut data);
continue 'check_boundaries;
}
// While we are waiting we can unlock mutex
lock_api::MutexGuard::unlocked(&mut data, || {
if let Some(boundary) = boundaries.north {
boundaries_remaining.north = false;
let wb0 = workbuffer_edges.0.north_mut();
let wb1 = workbuffer_edges.1.north.insert(boundary);
match boundary_conditions.north() {
DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1);
}
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2];
for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
}
}
// Reshape edge buffer to correct size
let wb = workbuffer_edges.1.north.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb =
Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap();
workbuffer_edges.1.north = Some(wb);
}
_ => unreachable!(),
}
euler::SAT_north(
sbp.deref(),
k.north_mut(),
y,
metrics,
wb0.view(),
);
};
if boundaries_remaining.south {
boundaries_remaining.south = false;
if let Some(boundary) = boundaries.south {
let wb0 = workbuffer_edges.0.south_mut();
let wb1 = workbuffer_edges.1.south.insert(boundary);
match boundary_conditions.south() {
DistributedBoundaryConditions::Channel => { DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1); std::mem::swap(wb0, wb1);
} }
@ -1012,176 +1050,87 @@ impl DistributedSystemPart {
} }
} }
// Reshape edge buffer to correct size // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.north.take().unwrap(); let wb = workbuffer_edges.1.south.take().unwrap();
let mut vec = wb.into_raw_vec(); let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0); vec.resize(wb0.len(), 0.0);
let wb = let wb =
Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap();
workbuffer_edges.1.north = Some(wb); workbuffer_edges.1.south = Some(wb);
} }
_ => unreachable!(), _ => unreachable!(),
} }
euler::SAT_north( euler::SAT_south(
sbp.deref(), sbp.deref(),
k.north_mut(), k.south_mut(),
y, y,
metrics, metrics,
wb0.view(), wb0.view(),
); );
boundaries_remaining.north = false; };
});
continue 'check_boundaries;
} }
if boundaries_remaining.south { if let Some(boundary) = boundaries.east {
if let CommunicatorData::Some(boundary) = std::mem::replace( boundaries_remaining.east = false;
&mut (*data).south, let wb0 = workbuffer_edges.0.east_mut();
CommunicatorData::NotAvailable, let wb1 = workbuffer_edges.1.east.insert(boundary);
) { match boundary_conditions.east() {
lock_api::MutexGuard::unlocked(&mut data, || { DistributedBoundaryConditions::Channel => {
let wb0 = workbuffer_edges.0.south_mut(); std::mem::swap(wb0, wb1);
let wb1 = workbuffer_edges.1.south.insert(boundary); }
match boundary_conditions.south() { DistributedBoundaryConditions::Interpolate(int_op) => {
DistributedBoundaryConditions::Channel => { let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2];
std::mem::swap(wb0, wb1); for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
} }
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse =
wb1.shape()[1] > wb0.shape()[2];
for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
}
}
// Reshape edge buffer to correct size
let wb = workbuffer_edges.1.south.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.south = Some(wb);
}
_ => unreachable!(),
} }
euler::SAT_south( // Reshape edge buffer to correct size
sbp.deref(), let wb = workbuffer_edges.1.east.take().unwrap();
k.south_mut(), let mut vec = wb.into_raw_vec();
y, vec.resize(wb0.len(), 0.0);
metrics, let wb =
wb0.view(), Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap();
); workbuffer_edges.1.east = Some(wb);
boundaries_remaining.south = false; }
}); _ => unreachable!(),
continue 'check_boundaries;
} }
} euler::SAT_east(sbp.deref(), k.east_mut(), y, metrics, wb0.view());
};
if boundaries_remaining.east { if let Some(boundary) = boundaries.west {
if let CommunicatorData::Some(boundary) = std::mem::replace( boundaries_remaining.west = false;
&mut (*data).east, let wb0 = workbuffer_edges.0.west_mut();
CommunicatorData::NotAvailable, let wb1 = workbuffer_edges.1.west.insert(boundary);
) { match boundary_conditions.west() {
lock_api::MutexGuard::unlocked(&mut data, || { DistributedBoundaryConditions::Channel => {
let wb0 = workbuffer_edges.0.east_mut(); std::mem::swap(wb0, wb1);
let wb1 = workbuffer_edges.1.east.insert(boundary); }
match boundary_conditions.east() { DistributedBoundaryConditions::Interpolate(int_op) => {
DistributedBoundaryConditions::Channel => { let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2];
std::mem::swap(wb0, wb1); for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
} }
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse =
wb1.shape()[1] > wb0.shape()[2];
for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
}
}
// Reshape edge buffer to correct size
let wb = workbuffer_edges.1.east.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.east = Some(wb);
}
_ => unreachable!(),
} }
euler::SAT_east( // Reshape edge buffer to correct size
sbp.deref(), let wb = workbuffer_edges.1.west.take().unwrap();
k.east_mut(), let mut vec = wb.into_raw_vec();
y, vec.resize(wb0.len(), 0.0);
metrics, let wb =
wb0.view(), Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap();
); workbuffer_edges.1.west = Some(wb);
boundaries_remaining.east = false; }
}); _ => unreachable!(),
continue 'check_boundaries;
} }
} euler::SAT_west(sbp.deref(), k.west_mut(), y, metrics, wb0.view());
};
if boundaries_remaining.west { });
if let CommunicatorData::Some(boundary) = std::mem::replace(
&mut (*data).west,
CommunicatorData::NotAvailable,
) {
lock_api::MutexGuard::unlocked(&mut data, || {
let wb0 = workbuffer_edges.0.west_mut();
let wb1 = workbuffer_edges.1.west.insert(boundary);
match boundary_conditions.west() {
DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1);
}
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse =
wb1.shape()[1] > wb0.shape()[2];
for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse {
int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
}
}
// Reshape edge buffer to correct size
let wb = workbuffer_edges.1.west.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.west = Some(wb);
}
_ => unreachable!(),
}
euler::SAT_west(
sbp.deref(),
k.west_mut(),
y,
metrics,
wb0.view(),
);
boundaries_remaining.west = false;
});
continue 'check_boundaries;
}
}
}
if !boundaries_remaining.any() {
break 'check_boundaries;
}
println!("{:?}", boundaries_remaining);
// We have no available boundaries yet, can wait until
// notified by other threads. Early continues
// ensures the boundary has not been notified earlier
pull.cvar.wait(&mut data);
} }
} }
}; };

View File

@ -63,6 +63,18 @@ impl<T> Direction<T> {
west: self.east, west: self.east,
} }
} }
pub fn splat(t: T) -> Self
where
T: Copy,
{
Self {
north: t,
south: t,
east: t,
west: t,
}
}
} }
impl<T> Direction<Option<T>> { impl<T> Direction<Option<T>> {