checkpoint

This commit is contained in:
Magnus Ulimoen 2021-09-24 17:02:48 +00:00
parent d0901f5755
commit 44e0eb98f3
1 changed files with 92 additions and 63 deletions

View File

@ -1,5 +1,6 @@
use crate::parsing; use crate::parsing;
use crate::utils::Direction; use crate::utils::Direction;
use arrayvec::ArrayVec;
use core::ops::Deref; use core::ops::Deref;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use euler::{ use euler::{
@ -163,7 +164,7 @@ impl BaseSystem {
for _ in 0..nthreads { for _ in 0..nthreads {
pull.push(Arc::new(Communicator { pull.push(Arc::new(Communicator {
cvar: Condvar::new(), cvar: Condvar::new(),
data: Mutex::new(Direction::splat(()).map(|_| arrayvec::ArrayVec::new())), data: Mutex::new(Direction::splat(()).map(|_| ArrayVec::new())),
})); }));
} }
@ -304,20 +305,44 @@ impl BaseSystem {
send: master_send, send: master_send,
wb, wb,
workbuffer_edges: ( workbuffer_edges: {
Direction { Direction {
north: Array2::zeros((4, nx)), north: (Array2::zeros((4, nx)), Array2::zeros((4, nx))),
south: Array2::zeros((4, nx)), south: (Array2::zeros((4, nx)), Array2::zeros((4, nx))),
east: Array2::zeros((4, ny)), east: (Array2::zeros((4, ny)), Array2::zeros((4, ny))),
west: Array2::zeros((4, ny)), west: (Array2::zeros((4, ny)), Array2::zeros((4, ny))),
}
},
workbuffer_free: Direction {
north: {
let mut arr = ArrayVec::new();
for _ in 0..2 {
arr.push(Array2::zeros((4, nx)))
}
arr
}, },
Direction { south: {
north: Some(Array2::zeros((4, nx))), let mut arr = ArrayVec::new();
south: Some(Array2::zeros((4, nx))), for _ in 0..2 {
east: Some(Array2::zeros((4, ny))), arr.push(Array2::zeros((4, nx)))
west: Some(Array2::zeros((4, ny))), }
arr
}, },
), east: {
let mut arr = ArrayVec::new();
for _ in 0..2 {
arr.push(Array2::zeros((4, ny)))
}
arr
},
west: {
let mut arr = ArrayVec::new();
for _ in 0..2 {
arr.push(Array2::zeros((4, ny)))
}
arr
},
},
progressbar: None, progressbar: None,
}; };
@ -712,7 +737,7 @@ pub enum DistributedBoundaryConditions {
Channel, Channel,
} }
type CommunicatorData = arrayvec::ArrayVec<Array2<Float>, 2>; type CommunicatorData = ArrayVec<Array2<Float>, 2>;
struct Communicator { struct Communicator {
/// Waker for this grid, neighbours should have a reference /// Waker for this grid, neighbours should have a reference
@ -750,12 +775,9 @@ struct DistributedSystemPart {
k: [Diff; 4], k: [Diff; 4],
wb: WorkBuffers, wb: WorkBuffers,
/// Work buffer for boundaries /// Work buffer for boundaries
/// workbuffer_edges: Direction<(Array2<Float>, Array2<Float>)>,
// Option: This can be sent from the current thread to another, /// These can be popped and pushed as we communicate data
// Will be replenished by arriving boundary conditions for workbuffer_free: Direction<CommunicatorData>,
// zero-allocation in loop (no global locks).
// Should never be None on entry to loop
workbuffer_edges: (Direction<Array2<Float>>, Direction<Option<Array2<Float>>>),
progressbar: Option<indicatif::ProgressBar>, progressbar: Option<indicatif::ProgressBar>,
} }
@ -860,11 +882,12 @@ impl DistributedSystemPart {
let boundary_conditions = &self.boundary_conditions; let boundary_conditions = &self.boundary_conditions;
let grid = &self.grid.0; let grid = &self.grid.0;
let workbuffer_edges = &mut self.workbuffer_edges; let workbuffer_edges = &mut self.workbuffer_edges;
let workbuffer_free = &mut self.workbuffer_free;
let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| { let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| {
// Send off the boundaries eagerly, in case neighbouring grid is ready // Send off the boundaries eagerly, in case neighbouring grid is ready
push.as_ref() push.as_ref()
.zip(workbuffer_edges.1.as_mut()) .zip(workbuffer_free.as_mut())
.zip( .zip(
Direction::<fn(&mut Direction<CommunicatorData>) -> &mut CommunicatorData> { Direction::<fn(&mut Direction<CommunicatorData>) -> &mut CommunicatorData> {
north: |x| x.south_mut(), north: |x| x.south_mut(),
@ -881,7 +904,7 @@ impl DistributedSystemPart {
}) })
.map(|(((push, wb), sel), this)| { .map(|(((push, wb), sel), this)| {
if let Some(s) = push { if let Some(s) = push {
let mut wb = wb.take().unwrap(); let mut wb = wb.pop().unwrap();
wb.assign(&this); wb.assign(&this);
{ {
let mut s = s.data.lock(); let mut s = s.data.lock();
@ -900,8 +923,7 @@ impl DistributedSystemPart {
let computed = boundary_conditions let computed = boundary_conditions
.as_ref() .as_ref()
.zip(euler::SAT_FUNCTIONS) .zip(euler::SAT_FUNCTIONS)
.zip(workbuffer_edges.0.as_mut()) .zip(workbuffer_edges.as_mut())
.zip(workbuffer_edges.1.as_mut())
.zip(Direction { .zip(Direction {
north: y.south(), north: y.south(),
south: y.north(), south: y.north(),
@ -914,18 +936,17 @@ impl DistributedSystemPart {
east: grid.east(), east: grid.east(),
west: grid.west(), west: grid.west(),
}) })
.map(|(((((bc, sat), wb0), wb1), self_edge), grid)| { .map(|((((bc, sat), wb), self_edge), grid)| {
wb0.fill(0.0); wb.0.fill(0.0);
match bc { match bc {
DistributedBoundaryConditions::Channel DistributedBoundaryConditions::Channel
| DistributedBoundaryConditions::Interpolate(_) => false, | DistributedBoundaryConditions::Interpolate(_) => false,
DistributedBoundaryConditions::This => { DistributedBoundaryConditions::This => {
sat(sbp.deref(), wb0.view_mut(), y, metrics, self_edge); sat(sbp.deref(), wb.0.view_mut(), y, metrics, self_edge);
true true
} }
DistributedBoundaryConditions::Vortex(vp) => { DistributedBoundaryConditions::Vortex(vp) => {
let wb1 = wb1.as_mut().unwrap(); let mut fiter = wb.1.outer_iter_mut();
let mut fiter = wb1.outer_iter_mut();
let (rho, rhou, rhov, e) = ( let (rho, rhou, rhov, e) = (
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
@ -935,12 +956,11 @@ impl DistributedSystemPart {
let (gx, gy) = grid; let (gx, gy) = grid;
vp.evaluate(time, gx, gy, rho, rhou, rhov, e); vp.evaluate(time, gx, gy, rho, rhou, rhov, e);
sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view()); sat(sbp.deref(), wb.0.view_mut(), y, metrics, wb.1.view());
true true
} }
DistributedBoundaryConditions::Eval(eval) => { DistributedBoundaryConditions::Eval(eval) => {
let wb1 = wb1.as_mut().unwrap(); let mut fiter = wb.1.outer_iter_mut();
let mut fiter = wb1.outer_iter_mut();
let (rho, rhou, rhov, e) = ( let (rho, rhou, rhov, e) = (
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
@ -949,23 +969,27 @@ impl DistributedSystemPart {
); );
let (gx, gy) = grid; let (gx, gy) = grid;
eval.evaluate(time, gx, gy, rho, rhou, rhov, e); eval.evaluate(time, gx, gy, rho, rhou, rhov, e);
sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view()); sat(sbp.deref(), wb.0.view_mut(), y, metrics, wb.1.view());
true true
} }
} }
}); });
if computed.north { if computed.north {
k.north_mut().scaled_add(1.0, &workbuffer_edges.0.north()); k.north_mut()
.scaled_add(1.0, &workbuffer_edges.north().0.view());
} }
if computed.south { if computed.south {
k.south_mut().scaled_add(1.0, &workbuffer_edges.0.south()); k.south_mut()
.scaled_add(1.0, &workbuffer_edges.south().0.view());
} }
if computed.east { if computed.east {
k.east_mut().scaled_add(1.0, &workbuffer_edges.0.east()); k.east_mut()
.scaled_add(1.0, &workbuffer_edges.east().0.view());
} }
if computed.west { if computed.west {
k.west_mut().scaled_add(1.0, &workbuffer_edges.0.west()); k.west_mut()
.scaled_add(1.0, &workbuffer_edges.west().0.view());
} }
let mut boundaries_remaining = computed.map(|b| !b); let mut boundaries_remaining = computed.map(|b| !b);
@ -994,15 +1018,17 @@ impl DistributedSystemPart {
lock_api::MutexGuard::unlocked(&mut data, || { lock_api::MutexGuard::unlocked(&mut data, || {
if let Some(boundary) = boundaries.north { if let Some(boundary) = boundaries.north {
boundaries_remaining.north = false; boundaries_remaining.north = false;
let wb0 = workbuffer_edges.0.north_mut(); let wb = workbuffer_edges.north_mut();
let wb1 = workbuffer_edges.1.north.insert(boundary); let wb_push = workbuffer_free.north_mut();
match boundary_conditions.north() { match boundary_conditions.north() {
DistributedBoundaryConditions::Channel => { DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1); std::mem::swap(&mut wb.0, &mut boundary);
wb_push.push(boundary);
} }
DistributedBoundaryConditions::Interpolate(int_op) => { DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; let is_fine2coarse = boundary.shape()[1] > wb.0.shape()[2];
for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter()) for (to, from) in
boundary.outer_iter_mut().zip(boundary.outer_iter())
{ {
if is_fine2coarse { if is_fine2coarse {
int_op.fine2coarse(from, to); int_op.fine2coarse(from, to);
@ -1011,12 +1037,11 @@ impl DistributedSystemPart {
} }
} }
// Reshape edge buffer to correct size // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.north.take().unwrap(); let mut vec = boundary.into_raw_vec();
let mut vec = wb.into_raw_vec(); vec.resize(wb.0.len(), 0.0);
vec.resize(wb0.len(), 0.0); let boundary =
let wb = Array2::from_shape_vec(wb.0.raw_dim(), vec).unwrap();
Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); wb_push.push(boundary)
workbuffer_edges.1.north = Some(wb);
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -1025,23 +1050,25 @@ impl DistributedSystemPart {
k.north_mut(), k.north_mut(),
y, y,
metrics, metrics,
wb0.view(), wb.0.view(),
); );
}; };
if boundaries_remaining.south { if boundaries_remaining.south {
boundaries_remaining.south = false; boundaries_remaining.south = false;
if let Some(boundary) = boundaries.south { if let Some(boundary) = boundaries.south {
let wb0 = workbuffer_edges.0.south_mut(); let wb = workbuffer_edges.north_mut();
let wb1 = workbuffer_edges.1.south.insert(boundary); let wb_push = workbuffer_free.south_mut();
match boundary_conditions.south() { match boundary_conditions.south() {
DistributedBoundaryConditions::Channel => { DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1); std::mem::swap(&mut wb.0, &mut boundary);
wb_push.push(boundary);
} }
DistributedBoundaryConditions::Interpolate(int_op) => { DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; let is_fine2coarse =
boundary.shape()[1] > wb.0.shape()[2];
for (to, from) in for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter()) wb.0.outer_iter_mut().zip(boundary.outer_iter())
{ {
if is_fine2coarse { if is_fine2coarse {
int_op.fine2coarse(from, to); int_op.fine2coarse(from, to);
@ -1050,12 +1077,12 @@ impl DistributedSystemPart {
} }
} }
// Reshape edge buffer to correct size // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.south.take().unwrap(); let mut vec = boundary.into_raw_vec();
let mut vec = wb.into_raw_vec(); vec.resize(wb.0.len(), 0.0);
vec.resize(wb0.len(), 0.0); let boundary =
let wb = Array2::from_shape_vec(wb.0.raw_dim(), vec)
Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap(); .unwrap();
workbuffer_edges.1.south = Some(wb); wb_push.push(boundary);
} }
_ => unreachable!(), _ => unreachable!(),
} }
@ -1064,19 +1091,21 @@ impl DistributedSystemPart {
k.south_mut(), k.south_mut(),
y, y,
metrics, metrics,
wb0.view(), wb.0.view(),
); );
}; };
} }
if let Some(boundary) = boundaries.east { if let Some(boundary) = boundaries.east {
boundaries_remaining.east = false; boundaries_remaining.east = false;
let wb0 = workbuffer_edges.0.east_mut(); let wb = workbuffer_edges.east_mut();
let wb1 = workbuffer_edges.1.east.insert(boundary); let wb_push = workbuffer_free.east_mut();
match boundary_conditions.east() { match boundary_conditions.east() {
DistributedBoundaryConditions::Channel => { DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1); std::mem::swap(&mut wb.0, &mut boundary);
wb_push.push(boundary);
} }
// TODO: From this point down
DistributedBoundaryConditions::Interpolate(int_op) => { DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2]; let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2];
for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter()) for (to, from) in wb0.outer_iter_mut().zip(wb1.outer_iter())