Rework wait primitive to condvar

This commit is contained in:
Magnus Ulimoen 2021-08-20 15:57:12 +00:00
parent e7222a99b5
commit d2c811d3af
4 changed files with 421 additions and 338 deletions

View File

@ -301,22 +301,18 @@ impl Field {
pub fn west(&self) -> ArrayView2<Float> { pub fn west(&self) -> ArrayView2<Float> {
self.slice(s![.., .., 0]) self.slice(s![.., .., 0])
} }
#[allow(unused)] pub fn north_mut(&mut self) -> ArrayViewMut2<Float> {
fn north_mut(&mut self) -> ArrayViewMut2<Float> {
let ny = self.ny(); let ny = self.ny();
self.slice_mut(s![.., ny - 1, ..]) self.slice_mut(s![.., ny - 1, ..])
} }
#[allow(unused)] pub fn south_mut(&mut self) -> ArrayViewMut2<Float> {
fn south_mut(&mut self) -> ArrayViewMut2<Float> {
self.slice_mut(s![.., 0, ..]) self.slice_mut(s![.., 0, ..])
} }
#[allow(unused)] pub fn east_mut(&mut self) -> ArrayViewMut2<Float> {
fn east_mut(&mut self) -> ArrayViewMut2<Float> {
let nx = self.nx(); let nx = self.nx();
self.slice_mut(s![.., .., nx - 1]) self.slice_mut(s![.., .., nx - 1])
} }
#[allow(unused)] pub fn west_mut(&mut self) -> ArrayViewMut2<Float> {
fn west_mut(&mut self) -> ArrayViewMut2<Float> {
self.slice_mut(s![.., .., 0]) self.slice_mut(s![.., .., 0])
} }
@ -463,18 +459,18 @@ impl Diff {
pub fn zeros((ny, nx): (usize, usize)) -> Self { pub fn zeros((ny, nx): (usize, usize)) -> Self {
Self(Array3::zeros((4, ny, nx))) Self(Array3::zeros((4, ny, nx)))
} }
fn north_mut(&mut self) -> ArrayViewMut2<Float> { pub fn north_mut(&mut self) -> ArrayViewMut2<Float> {
let ny = self.shape()[1]; let ny = self.shape()[1];
self.0.slice_mut(s![.., ny - 1, ..]) self.0.slice_mut(s![.., ny - 1, ..])
} }
fn south_mut(&mut self) -> ArrayViewMut2<Float> { pub fn south_mut(&mut self) -> ArrayViewMut2<Float> {
self.0.slice_mut(s![.., 0, ..]) self.0.slice_mut(s![.., 0, ..])
} }
fn east_mut(&mut self) -> ArrayViewMut2<Float> { pub fn east_mut(&mut self) -> ArrayViewMut2<Float> {
let nx = self.shape()[2]; let nx = self.shape()[2];
self.0.slice_mut(s![.., .., nx - 1]) self.0.slice_mut(s![.., .., nx - 1])
} }
fn west_mut(&mut self) -> ArrayViewMut2<Float> { pub fn west_mut(&mut self) -> ArrayViewMut2<Float> {
self.0.slice_mut(s![.., .., 0]) self.0.slice_mut(s![.., .., 0])
} }
} }
@ -1010,16 +1006,25 @@ pub fn SAT_characteristics(
metrics: &Metrics, metrics: &Metrics,
boundaries: &BoundaryTerms, boundaries: &BoundaryTerms,
) { ) {
SAT_north(op, k, y, metrics, boundaries.north); SAT_north(op, k.north_mut(), y, metrics, boundaries.north);
SAT_south(op, k, y, metrics, boundaries.south); SAT_south(op, k.south_mut(), y, metrics, boundaries.south);
SAT_east(op, k, y, metrics, boundaries.east); SAT_east(op, k.east_mut(), y, metrics, boundaries.east);
SAT_west(op, k, y, metrics, boundaries.west); SAT_west(op, k.west_mut(), y, metrics, boundaries.west);
} }
pub const SAT_FUNCTIONS: Direction<
fn(&dyn SbpOperator2d, ArrayViewMut2<Float>, &Field, &Metrics, ArrayView2<Float>),
> = Direction {
north: SAT_north,
south: SAT_south,
west: SAT_west,
east: SAT_east,
};
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn SAT_north( pub fn SAT_north(
op: &dyn SbpOperator2d, op: &dyn SbpOperator2d,
k: &mut Diff, k: ArrayViewMut2<Float>,
y: &Field, y: &Field,
metrics: &Metrics, metrics: &Metrics,
boundary: ArrayView2<Float>, boundary: ArrayView2<Float>,
@ -1035,7 +1040,7 @@ pub fn SAT_north(
let tau = 1.0; let tau = 1.0;
let slice = s![y.ny() - 1, ..]; let slice = s![y.ny() - 1, ..];
SAT_characteristic( SAT_characteristic(
k.north_mut(), k,
y.north(), y.north(),
boundary, boundary,
hi, hi,
@ -1050,7 +1055,7 @@ pub fn SAT_north(
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn SAT_south( pub fn SAT_south(
op: &dyn SbpOperator2d, op: &dyn SbpOperator2d,
k: &mut Diff, k: ArrayViewMut2<Float>,
y: &Field, y: &Field,
metrics: &Metrics, metrics: &Metrics,
boundary: ArrayView2<Float>, boundary: ArrayView2<Float>,
@ -1065,7 +1070,7 @@ pub fn SAT_south(
let tau = -1.0; let tau = -1.0;
let slice = s![0, ..]; let slice = s![0, ..];
SAT_characteristic( SAT_characteristic(
k.south_mut(), k,
y.south(), y.south(),
boundary, boundary,
hi, hi,
@ -1080,7 +1085,7 @@ pub fn SAT_south(
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn SAT_west( pub fn SAT_west(
op: &dyn SbpOperator2d, op: &dyn SbpOperator2d,
k: &mut Diff, k: ArrayViewMut2<Float>,
y: &Field, y: &Field,
metrics: &Metrics, metrics: &Metrics,
boundary: ArrayView2<Float>, boundary: ArrayView2<Float>,
@ -1096,7 +1101,7 @@ pub fn SAT_west(
let tau = -1.0; let tau = -1.0;
let slice = s![.., 0]; let slice = s![.., 0];
SAT_characteristic( SAT_characteristic(
k.west_mut(), k,
y.west(), y.west(),
boundary, boundary,
hi, hi,
@ -1111,7 +1116,7 @@ pub fn SAT_west(
#[allow(non_snake_case)] #[allow(non_snake_case)]
pub fn SAT_east( pub fn SAT_east(
op: &dyn SbpOperator2d, op: &dyn SbpOperator2d,
k: &mut Diff, k: ArrayViewMut2<Float>,
y: &Field, y: &Field,
metrics: &Metrics, metrics: &Metrics,
boundary: ArrayView2<Float>, boundary: ArrayView2<Float>,
@ -1126,7 +1131,7 @@ pub fn SAT_east(
let tau = 1.0; let tau = 1.0;
let slice = s![.., y.nx() - 1]; let slice = s![.., y.nx() - 1];
SAT_characteristic( SAT_characteristic(
k.east_mut(), k,
y.east(), y.east(),
boundary, boundary,
hi, hi,

View File

@ -20,3 +20,5 @@ argh = "0.1.4"
evalexpr = "6.3.0" evalexpr = "6.3.0"
crossbeam-channel = "0.5.0" crossbeam-channel = "0.5.0"
crossbeam-utils = "0.8.5" crossbeam-utils = "0.8.5"
parking_lot = "0.11.1"
lock_api = "0.4.4"

View File

@ -1,15 +1,17 @@
use crate::parsing; use crate::parsing;
use crate::utils::Direction; use crate::utils::Direction;
use core::ops::Deref; use core::ops::Deref;
use crossbeam_channel::{Receiver, Select, Sender}; use crossbeam_channel::{Receiver, Sender};
use euler::{ use euler::{
eval::{self, Evaluator}, eval::{self, Evaluator},
Diff, Field, VortexParameters, WorkBuffers, Diff, Field, VortexParameters, WorkBuffers,
}; };
use ndarray::Array2; use ndarray::Array2;
use parking_lot::{Condvar, Mutex};
use sbp::grid::{Grid, Metrics}; use sbp::grid::{Grid, Metrics};
use sbp::operators::{InterpolationOperator, SbpOperator2d}; use sbp::operators::{InterpolationOperator, SbpOperator2d};
use sbp::*; use sbp::*;
use std::sync::Arc;
pub struct BaseSystem { pub struct BaseSystem {
pub names: Vec<String>, pub names: Vec<String>,
@ -157,56 +159,37 @@ impl BaseSystem {
let nthreads = self.grids.len(); let nthreads = self.grids.len();
// Build up the boundary conditions // Build up the boundary conditions
let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> = let mut pull = Vec::<Arc<Communicator>>::with_capacity(nthreads);
Vec::with_capacity(nthreads); for wb in &self.boundary_conditions {
let mut pull_channels: Vec<Direction<Option<Receiver<Array2<Float>>>>> = let data = wb.as_ref().map(|bc| {
vec![Direction::default(); nthreads]; if let euler::BoundaryCharacteristic::Grid(_)
| euler::BoundaryCharacteristic::Interpolate(_, _) = bc
{
CommunicatorData::NotAvailable
} else {
CommunicatorData::NotApplicable
}
});
pull.push(Arc::new(Communicator {
cvar: Condvar::new(),
data: Mutex::new(data),
}));
}
// Build the set of communicators between boundaries // Build the set of communicators between boundaries
let mut push = Vec::<Direction<Option<Arc<Communicator>>>>::with_capacity(nthreads);
for wb in &self.boundary_conditions { for wb in &self.boundary_conditions {
let mut local_push = Direction::default(); let local_push = wb.as_ref().map(|bc| {
if let euler::BoundaryCharacteristic::Grid(i) if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north() | euler::BoundaryCharacteristic::Interpolate(i, _) = bc
{ {
let (s, r) = crossbeam_channel::bounded(1); Some(pull[*i].clone())
pull_channels[*i] } else {
.south_mut() None
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.north_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.north_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.south_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.west_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.east_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.east_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.west_mut() = Some(s);
} }
});
push_channels.push(local_push); push.push(local_push);
} }
let (master_send, master_recv) = crossbeam_channel::unbounded(); let (master_send, master_recv) = crossbeam_channel::unbounded();
@ -214,25 +197,23 @@ impl BaseSystem {
let mut tids = Vec::with_capacity(nthreads); let mut tids = Vec::with_capacity(nthreads);
let mut communicators = Vec::with_capacity(nthreads); let mut communicators = Vec::with_capacity(nthreads);
for (id, (((((name, grid), sbp), bt), chan), push)) in self for (id, (((((name, grid), sbp), bt), pull), push)) in self
.names .names
.into_iter() .into_iter()
.zip(self.grids.into_iter()) .zip(self.grids.into_iter())
.zip(self.operators.into_iter()) .zip(self.operators.into_iter())
.zip(self.boundary_conditions) .zip(self.boundary_conditions)
.zip(pull_channels) .zip(pull)
.zip(push_channels) .zip(push)
.enumerate() .enumerate()
{ {
let builder = std::thread::Builder::new().name(format!("mg: {}", name)); let builder = std::thread::Builder::new().name(format!("mg: {}", name));
let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt { let boundary_conditions = bt.map(|bt| match bt {
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This, euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
euler::BoundaryCharacteristic::Grid(_) => { euler::BoundaryCharacteristic::Grid(_) => DistributedBoundaryConditions::Channel,
DistributedBoundaryConditions::Channel(chan.unwrap())
}
euler::BoundaryCharacteristic::Interpolate(_, int_op) => { euler::BoundaryCharacteristic::Interpolate(_, int_op) => {
DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op) DistributedBoundaryConditions::Interpolate(int_op)
} }
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(), euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
euler::BoundaryCharacteristic::Vortex(vp) => { euler::BoundaryCharacteristic::Vortex(vp) => {
@ -319,6 +300,7 @@ impl BaseSystem {
grid: (grid, metrics), grid: (grid, metrics),
output: g, output: g,
push, push,
pull,
sbp, sbp,
t: time, t: time,
dt: Float::NAN, dt: Float::NAN,
@ -331,8 +313,20 @@ impl BaseSystem {
send: master_send, send: master_send,
wb, wb,
wb_ns: Array2::zeros((4, nx)), workbuffer_edges: (
wb_ew: Array2::zeros((4, ny)), Direction {
north: Array2::zeros((4, nx)),
south: Array2::zeros((4, nx)),
east: Array2::zeros((4, ny)),
west: Array2::zeros((4, ny)),
},
Direction {
north: Some(Array2::zeros((4, nx))),
south: Some(Array2::zeros((4, nx))),
east: Some(Array2::zeros((4, ny))),
west: Some(Array2::zeros((4, ny))),
},
),
progressbar: None, progressbar: None,
}; };
@ -723,19 +717,34 @@ pub enum DistributedBoundaryConditions {
Vortex(VortexParameters), Vortex(VortexParameters),
Eval(std::sync::Arc<dyn eval::Evaluator<ndarray::Ix1>>), Eval(std::sync::Arc<dyn eval::Evaluator<ndarray::Ix1>>),
Interpolate(Receiver<Array2<Float>>, Box<dyn InterpolationOperator>), Interpolate(Box<dyn InterpolationOperator>),
Channel(Receiver<Array2<Float>>), Channel,
} }
type PushCommunicator = Option<Sender<Array2<Float>>>; enum CommunicatorData {
NotApplicable,
NotAvailable,
Some(Array2<Float>),
}
struct Communicator {
/// Waker for this grid, neighbours should have a reference
/// and notify when a boundary has been put
cvar: Condvar,
/// Internal data exchange, is None on missing data, inner type
/// can be set to None when grabbing the boundary
data: Mutex<Direction<CommunicatorData>>,
}
struct DistributedSystemPart { struct DistributedSystemPart {
grid: (Grid, Metrics), grid: (Grid, Metrics),
sbp: Box<dyn SbpOperator2d + 'static>, sbp: Box<dyn SbpOperator2d + 'static>,
boundary_conditions: Direction<DistributedBoundaryConditions>, boundary_conditions: Direction<DistributedBoundaryConditions>,
/// Channel pullers
pull: Arc<Communicator>,
/// Subscribers to the boundaries of self /// Subscribers to the boundaries of self
push: Direction<PushCommunicator>, push: Direction<Option<Arc<Communicator>>>,
current: Field, current: Field,
fut: Field, fut: Field,
@ -753,10 +762,13 @@ struct DistributedSystemPart {
k: [Diff; 4], k: [Diff; 4],
wb: WorkBuffers, wb: WorkBuffers,
/// Work buffer for north/south boundary /// Work buffer for boundaries
wb_ns: Array2<Float>, ///
/// Work buffer for east/west boundary // Option: This can be sent from the current thread to another,
wb_ew: Array2<Float>, // Will be replenished by arriving boundary conditions for
// zero-allocation in loop (no global locks).
// Should never be None on entry to loop
workbuffer_edges: (Direction<Array2<Float>>, Direction<Option<Array2<Float>>>),
progressbar: Option<indicatif::ProgressBar>, progressbar: Option<indicatif::ProgressBar>,
} }
@ -857,25 +869,42 @@ impl DistributedSystemPart {
let wb = &mut self.wb.0; let wb = &mut self.wb.0;
let sbp = &self.sbp; let sbp = &self.sbp;
let push = &self.push; let push = &self.push;
let pull = &self.pull;
let boundary_conditions = &self.boundary_conditions; let boundary_conditions = &self.boundary_conditions;
let grid = &self.grid.0; let grid = &self.grid.0;
let wb_ns = &mut self.wb_ns; let workbuffer_edges = &mut self.workbuffer_edges;
let wb_ew = &mut self.wb_ew;
let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| { let rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| {
// Send off the boundaries optimistically, in case some grid is ready // Send off the boundaries eagerly, in case neighbouring grid is ready
if let Some(s) = &push.north { push.as_ref()
s.send(y.north().to_owned()).unwrap() .zip(workbuffer_edges.1.as_mut())
.zip(
Direction::<fn(&mut Direction<CommunicatorData>) -> &mut CommunicatorData> {
north: |x| x.south_mut(),
south: |x| x.north_mut(),
east: |x| x.west_mut(),
west: |x| x.east_mut(),
},
)
.zip(Direction {
north: y.north(),
south: y.south(),
west: y.west(),
east: y.east(),
})
.map(|(((push, wb), sel), this)| {
if let Some(s) = push {
let mut wb = wb.take().unwrap();
wb.assign(&this);
{
let mut s = s.data.lock();
let none =
std::mem::replace(sel(&mut s), CommunicatorData::Some(wb));
assert!(matches!(none, CommunicatorData::NotAvailable));
} }
if let Some(s) = &push.south { s.cvar.notify_one();
s.send(y.south().to_owned()).unwrap()
}
if let Some(s) = &push.east {
s.send(y.east().to_owned()).unwrap()
}
if let Some(s) = &push.west {
s.send(y.west().to_owned()).unwrap()
} }
});
// This computation does not depend on the boundaries // This computation does not depend on the boundaries
euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb); euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb);
@ -883,249 +912,277 @@ impl DistributedSystemPart {
// Get boundaries, but be careful and maximise the amount of work which can be // Get boundaries, but be careful and maximise the amount of work which can be
// performed before we have all of them, whilst ensuring threads can sleep for as // performed before we have all of them, whilst ensuring threads can sleep for as
// long as possible // long as possible
let mut select = Select::new(); let computed = boundary_conditions
let mut selectable = 0; .as_ref()
let recv_north = match boundary_conditions.north() { .zip(euler::SAT_FUNCTIONS)
DistributedBoundaryConditions::Channel(r) .zip(workbuffer_edges.0.as_mut())
| DistributedBoundaryConditions::Interpolate(r, _) => { .zip(workbuffer_edges.1.as_mut())
selectable += 1; .zip(Direction {
Some(select.recv(r)) north: y.south(),
} south: y.north(),
east: y.west(),
west: y.east(),
})
.zip(Direction {
north: grid.north(),
south: grid.south(),
east: grid.east(),
west: grid.west(),
})
.map(|(((((bc, sat), wb0), wb1), self_edge), grid)| {
wb0.fill(0.0);
match bc {
DistributedBoundaryConditions::Channel
| DistributedBoundaryConditions::Interpolate(_) => false,
DistributedBoundaryConditions::This => { DistributedBoundaryConditions::This => {
euler::SAT_north(sbp.deref(), k, y, metrics, y.south()); sat(sbp.deref(), wb0.view_mut(), y, metrics, self_edge);
None true
} }
DistributedBoundaryConditions::Vortex(vp) => { DistributedBoundaryConditions::Vortex(vp) => {
let mut fiter = wb_ns.outer_iter_mut(); let wb1 = wb1.as_mut().unwrap();
let mut fiter = wb1.outer_iter_mut();
let (rho, rhou, rhov, e) = ( let (rho, rhou, rhov, e) = (
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
); );
let (gx, gy) = grid.north(); let (gx, gy) = grid;
vp.evaluate(time, gx, gy, rho, rhou, rhov, e); vp.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view());
None true
} }
DistributedBoundaryConditions::Eval(eval) => { DistributedBoundaryConditions::Eval(eval) => {
let mut fiter = wb_ns.outer_iter_mut(); let wb1 = wb1.as_mut().unwrap();
let mut fiter = wb1.outer_iter_mut();
let (rho, rhou, rhov, e) = ( let (rho, rhou, rhov, e) = (
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
fiter.next().unwrap(), fiter.next().unwrap(),
); );
let (gx, gy) = grid.north(); let (gx, gy) = grid;
eval.evaluate(time, gx, gy, rho, rhou, rhov, e); eval.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); sat(sbp.deref(), wb0.view_mut(), y, metrics, wb1.view());
None true
} }
};
let recv_south = match boundary_conditions.south() {
DistributedBoundaryConditions::Channel(r)
| DistributedBoundaryConditions::Interpolate(r, _) => {
selectable += 1;
Some(select.recv(r))
} }
DistributedBoundaryConditions::This => { });
euler::SAT_south(sbp.deref(), k, y, metrics, y.north());
None
}
DistributedBoundaryConditions::Vortex(vp) => {
let mut fiter = wb_ns.outer_iter_mut();
let (rho, rhou, rhov, e) = (
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
);
let (gx, gy) = grid.south();
vp.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view()); if computed.north {
None k.north_mut().scaled_add(1.0, &workbuffer_edges.0.north());
} }
DistributedBoundaryConditions::Eval(eval) => { if computed.south {
let mut fiter = wb_ns.outer_iter_mut(); k.south_mut().scaled_add(1.0, &workbuffer_edges.0.south());
let (rho, rhou, rhov, e) = (
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
);
let (gx, gy) = grid.south();
eval.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view());
None
} }
}; if computed.east {
let recv_east = match boundary_conditions.east() { k.east_mut().scaled_add(1.0, &workbuffer_edges.0.east());
DistributedBoundaryConditions::Channel(r)
| DistributedBoundaryConditions::Interpolate(r, _) => {
selectable += 1;
Some(select.recv(r))
} }
DistributedBoundaryConditions::This => { if computed.west {
euler::SAT_east(sbp.deref(), k, y, metrics, y.west()); k.west_mut().scaled_add(1.0, &workbuffer_edges.0.west());
None
} }
DistributedBoundaryConditions::Vortex(vp) => {
let mut fiter = wb_ew.outer_iter_mut();
let (rho, rhou, rhov, e) = (
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
);
let (gx, gy) = grid.east();
vp.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view()); let mut boundaries_remaining = computed.map(|b| !b);
None
}
DistributedBoundaryConditions::Eval(eval) => {
let mut fiter = wb_ew.outer_iter_mut();
let (rho, rhou, rhov, e) = (
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
);
let (gx, gy) = grid.east();
eval.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view());
None
}
};
let recv_west = match boundary_conditions.west() {
DistributedBoundaryConditions::Channel(r)
| DistributedBoundaryConditions::Interpolate(r, _) => {
selectable += 1;
Some(select.recv(r))
}
DistributedBoundaryConditions::This => {
euler::SAT_west(sbp.deref(), k, y, metrics, y.east());
None
}
DistributedBoundaryConditions::Vortex(vp) => {
let mut fiter = wb_ew.outer_iter_mut();
let (rho, rhou, rhov, e) = (
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
fiter.next().unwrap(),
);
let (gx, gy) = grid.west();
vp.evaluate(time, gx, gy, rho, rhou, rhov, e);
euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); {
None let mut data = pull.data.lock();
'check_boundaries: loop {
if boundaries_remaining.north {
if let CommunicatorData::Some(boundary) = std::mem::replace(
&mut (*data).north,
CommunicatorData::NotAvailable,
) {
lock_api::MutexGuard::unlocked(&mut data, || {
let wb0 = workbuffer_edges.0.north_mut();
let wb1 = workbuffer_edges.1.north.insert(boundary);
match boundary_conditions.north() {
DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1);
} }
DistributedBoundaryConditions::Eval(eval) => { DistributedBoundaryConditions::Interpolate(int_op) => {
let mut fiter = wb_ew.outer_iter_mut(); let is_fine2coarse = wb1.shape()[1] > wb0.shape()[2];
let (rho, rhou, rhov, e) = ( for (to, from) in
fiter.next().unwrap(), wb0.outer_iter_mut().zip(wb1.outer_iter())
fiter.next().unwrap(), {
fiter.next().unwrap(), if is_fine2coarse {
fiter.next().unwrap(), int_op.fine2coarse(from, to);
} else {
int_op.coarse2fine(from, to);
}
}
// Reshape edge buffer to correct size
let wb = workbuffer_edges.1.north.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb =
Array2::from_shape_vec(wb0.raw_dim(), vec).unwrap();
workbuffer_edges.1.north = Some(wb);
}
_ => unreachable!(),
}
euler::SAT_north(
sbp.deref(),
k.north_mut(),
y,
metrics,
wb0.view(),
); );
let (gx, gy) = grid.west(); boundaries_remaining.north = false;
eval.evaluate(time, gx, gy, rho, rhou, rhov, e); });
euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); continue 'check_boundaries;
None
} }
};
// Get an item off each channel, waiting minimally before processing that boundary. if boundaries_remaining.south {
// The waiting ensures other grids can be processed by the core in case of if let CommunicatorData::Some(boundary) = std::mem::replace(
// oversubscription (in case of a more grids than core scenario) &mut (*data).south,
// This minimises the amount of time waiting on boundary conditions CommunicatorData::NotAvailable,
while selectable != 0 { ) {
let s = select.select(); lock_api::MutexGuard::unlocked(&mut data, || {
let sindex = s.index(); let wb0 = workbuffer_edges.0.south_mut();
match Some(sindex) { let wb1 = workbuffer_edges.1.south.insert(boundary);
x if x == recv_north => match boundary_conditions.north() { match boundary_conditions.south() {
DistributedBoundaryConditions::Channel(r) => { DistributedBoundaryConditions::Channel => {
let r = s.recv(r).unwrap(); std::mem::swap(wb0, wb1);
euler::SAT_north(sbp.deref(), k, y, metrics, r.view());
} }
DistributedBoundaryConditions::Interpolate(r, int_op) => { DistributedBoundaryConditions::Interpolate(int_op) => {
let r = s.recv(r).unwrap(); let is_fine2coarse =
let is_fine2coarse = r.shape()[1] > wb_ns.shape()[1]; wb1.shape()[1] > wb0.shape()[2];
for (mut to, from) in wb_ns.outer_iter_mut().zip(r.outer_iter()) { for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse { if is_fine2coarse {
int_op.fine2coarse(from.view(), to.view_mut()); int_op.fine2coarse(from, to);
} else { } else {
int_op.coarse2fine(from.view(), to.view_mut()); int_op.coarse2fine(from, to);
} }
} }
euler::SAT_north(sbp.deref(), k, y, metrics, wb_ns.view()); // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.south.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.south = Some(wb);
} }
_ => unreachable!(), _ => unreachable!(),
},
x if x == recv_south => match boundary_conditions.south() {
DistributedBoundaryConditions::Channel(r) => {
let r = s.recv(r).unwrap();
euler::SAT_south(sbp.deref(), k, y, metrics, r.view());
} }
DistributedBoundaryConditions::Interpolate(r, int_op) => { euler::SAT_south(
let r = s.recv(r).unwrap(); sbp.deref(),
let is_fine2coarse = r.shape()[1] > wb_ns.shape()[1]; k.south_mut(),
for (mut to, from) in wb_ns.outer_iter_mut().zip(r.outer_iter()) { y,
metrics,
wb0.view(),
);
boundaries_remaining.south = false;
});
continue 'check_boundaries;
}
}
if boundaries_remaining.east {
if let CommunicatorData::Some(boundary) = std::mem::replace(
&mut (*data).east,
CommunicatorData::NotAvailable,
) {
lock_api::MutexGuard::unlocked(&mut data, || {
let wb0 = workbuffer_edges.0.east_mut();
let wb1 = workbuffer_edges.1.east.insert(boundary);
match boundary_conditions.east() {
DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1);
}
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse =
wb1.shape()[1] > wb0.shape()[2];
for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse { if is_fine2coarse {
int_op.fine2coarse(from.view(), to.view_mut()); int_op.fine2coarse(from, to);
} else { } else {
int_op.coarse2fine(from.view(), to.view_mut()); int_op.coarse2fine(from, to);
} }
} }
euler::SAT_south(sbp.deref(), k, y, metrics, wb_ns.view()); // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.east.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.east = Some(wb);
} }
_ => unreachable!(), _ => unreachable!(),
},
x if x == recv_west => match boundary_conditions.west() {
DistributedBoundaryConditions::Channel(r) => {
let r = s.recv(r).unwrap();
euler::SAT_west(sbp.deref(), k, y, metrics, r.view());
} }
DistributedBoundaryConditions::Interpolate(r, int_op) => { euler::SAT_east(
let r = s.recv(r).unwrap(); sbp.deref(),
let is_fine2coarse = r.shape()[1] > wb_ew.shape()[1]; k.east_mut(),
for (mut to, from) in wb_ew.outer_iter_mut().zip(r.outer_iter()) { y,
metrics,
wb0.view(),
);
boundaries_remaining.east = false;
});
continue 'check_boundaries;
}
}
if boundaries_remaining.west {
if let CommunicatorData::Some(boundary) = std::mem::replace(
&mut (*data).west,
CommunicatorData::NotAvailable,
) {
lock_api::MutexGuard::unlocked(&mut data, || {
let wb0 = workbuffer_edges.0.west_mut();
let wb1 = workbuffer_edges.1.west.insert(boundary);
match boundary_conditions.west() {
DistributedBoundaryConditions::Channel => {
std::mem::swap(wb0, wb1);
}
DistributedBoundaryConditions::Interpolate(int_op) => {
let is_fine2coarse =
wb1.shape()[1] > wb0.shape()[2];
for (to, from) in
wb0.outer_iter_mut().zip(wb1.outer_iter())
{
if is_fine2coarse { if is_fine2coarse {
int_op.fine2coarse(from.view(), to.view_mut()); int_op.fine2coarse(from, to);
} else { } else {
int_op.coarse2fine(from.view(), to.view_mut()); int_op.coarse2fine(from, to);
} }
} }
euler::SAT_west(sbp.deref(), k, y, metrics, wb_ew.view()); // Reshape edge buffer to correct size
let wb = workbuffer_edges.1.west.take().unwrap();
let mut vec = wb.into_raw_vec();
vec.resize(wb0.len(), 0.0);
let wb = Array2::from_shape_vec(wb0.raw_dim(), vec)
.unwrap();
workbuffer_edges.1.west = Some(wb);
} }
_ => unreachable!(), _ => unreachable!(),
},
x if x == recv_east => match boundary_conditions.east() {
DistributedBoundaryConditions::Channel(r) => {
let r = s.recv(r).unwrap();
euler::SAT_east(sbp.deref(), k, y, metrics, r.view());
} }
DistributedBoundaryConditions::Interpolate(r, int_op) => { euler::SAT_west(
let r = s.recv(r).unwrap(); sbp.deref(),
let is_fine2coarse = r.shape()[1] > wb_ew.shape()[1]; k.west_mut(),
for (mut to, from) in wb_ew.outer_iter_mut().zip(r.outer_iter()) { y,
if is_fine2coarse { metrics,
int_op.fine2coarse(from.view(), to.view_mut()); wb0.view(),
} else { );
int_op.coarse2fine(from.view(), to.view_mut()); boundaries_remaining.west = false;
});
continue 'check_boundaries;
} }
} }
euler::SAT_east(sbp.deref(), k, y, metrics, wb_ew.view());
} }
_ => unreachable!(), if !boundaries_remaining.any() {
}, break 'check_boundaries;
_ => unreachable!(), }
println!("{:?}", boundaries_remaining);
// We have no available boundaries yet, can wait until
// notified by other threads. Early continues
// ensures the boundary has not been notified earlier
pull.cvar.wait(&mut data);
} }
select.remove(sindex);
selectable -= 1;
} }
}; };
integrate::integrate::<integrate::Rk4, Field, _>( integrate::integrate::<integrate::Rk4, Field, _>(

View File

@ -53,6 +53,16 @@ impl<T> Direction<T> {
east: (self.east, other.east), east: (self.east, other.east),
} }
} }
/// Flips all direction through origo
pub fn opposite(self) -> Self {
Self {
north: self.south,
south: self.north,
east: self.west,
west: self.east,
}
}
} }
impl<T> Direction<Option<T>> { impl<T> Direction<Option<T>> {
@ -96,6 +106,15 @@ impl<T> Direction<T> {
} }
} }
impl Direction<bool> {
pub fn all(&self) -> bool {
self.north && self.south && self.east && self.west
}
pub fn any(&self) -> bool {
self.north || self.south || self.east || self.west
}
}
/// Linearly spaced parameters, apart from the boundaries which /// Linearly spaced parameters, apart from the boundaries which
/// only have a distance of `h/2` from the boundary /// only have a distance of `h/2` from the boundary
pub fn h2linspace(start: Float, end: Float, n: usize) -> ndarray::Array1<Float> { pub fn h2linspace(start: Float, end: Float, n: usize) -> ndarray::Array1<Float> {