Improve channel distribution
This commit is contained in:
parent
35b8af8b2d
commit
b11f3c9abb
|
@ -187,26 +187,48 @@ impl System {
|
|||
let dt = self.max_dt();
|
||||
|
||||
// Build up the boundary conditions
|
||||
// Assume all boundaries are push/pull through channels
|
||||
let channels = (0..nthreads)
|
||||
.map(|_| {
|
||||
use crossbeam_channel::unbounded;
|
||||
Direction {
|
||||
north: unbounded(),
|
||||
south: unbounded(),
|
||||
west: unbounded(),
|
||||
east: unbounded(),
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> =
|
||||
Vec::with_capacity(nthreads);
|
||||
let mut pull_channels: Vec<Direction<Option<Receiver<Array2<Float>>>>> =
|
||||
vec![Direction::default(); nthreads];
|
||||
|
||||
// TODO: Iterate through all grids and see if they need ourself to push
|
||||
let mut requested_channels = (0..nthreads)
|
||||
.map(|_| Direction::default())
|
||||
.collect::<Vec<Direction<bool>>>();
|
||||
// Build the set of communicators between boundaries
|
||||
for wb in &self.bt {
|
||||
let mut local_push = Direction::default();
|
||||
if let euler::BoundaryCharacteristic::Grid(i)
|
||||
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north()
|
||||
{
|
||||
let (s, r) = crossbeam_channel::bounded(1);
|
||||
pull_channels[*i].south_mut().replace(r).unwrap();
|
||||
*local_push.north_mut() = Some(s);
|
||||
}
|
||||
if let euler::BoundaryCharacteristic::Grid(i)
|
||||
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south()
|
||||
{
|
||||
let (s, r) = crossbeam_channel::bounded(1);
|
||||
pull_channels[*i].north_mut().replace(r).unwrap();
|
||||
*local_push.south_mut() = Some(s);
|
||||
}
|
||||
if let euler::BoundaryCharacteristic::Grid(i)
|
||||
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east()
|
||||
{
|
||||
let (s, r) = crossbeam_channel::bounded(1);
|
||||
pull_channels[*i].west_mut().replace(r).unwrap();
|
||||
*local_push.east_mut() = Some(s);
|
||||
}
|
||||
if let euler::BoundaryCharacteristic::Grid(i)
|
||||
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west()
|
||||
{
|
||||
let (s, r) = crossbeam_channel::bounded(1);
|
||||
pull_channels[*i].east_mut().replace(r).unwrap();
|
||||
*local_push.west_mut() = Some(s);
|
||||
}
|
||||
|
||||
push_channels.push(local_push);
|
||||
}
|
||||
|
||||
let mut tids = Vec::new();
|
||||
for (((((((current, fut), grid), metrics), sbp), wb), bt), req_channel) in self
|
||||
for ((((((((current, fut), grid), metrics), sbp), wb), bt), chan), push) in self
|
||||
.fnow
|
||||
.into_iter()
|
||||
.zip(self.fnext.into_iter())
|
||||
|
@ -215,103 +237,30 @@ impl System {
|
|||
.zip(self.operators.into_iter())
|
||||
.zip(self.wb.into_iter())
|
||||
.zip(self.bt)
|
||||
.zip(requested_channels)
|
||||
.zip(pull_channels)
|
||||
.zip(push_channels)
|
||||
{
|
||||
let builder = std::thread::Builder::new().name(format!("eulersolver: {}", "smth"));
|
||||
let barrier = b.clone();
|
||||
|
||||
let Direction {
|
||||
north: bt_north,
|
||||
south: bt_south,
|
||||
west: bt_west,
|
||||
east: bt_east,
|
||||
} = bt;
|
||||
let boundary_conditions = Direction {
|
||||
north: match bt_north {
|
||||
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
|
||||
euler::BoundaryCharacteristic::Grid(i) => {
|
||||
*requested_channels[i].south_mut() = true;
|
||||
DistributedBoundaryConditions::Channel(channels[i].south().1.clone())
|
||||
}
|
||||
euler::BoundaryCharacteristic::Interpolate(i, int_op) => {
|
||||
*requested_channels[i].south_mut() = true;
|
||||
DistributedBoundaryConditions::Interpolate(
|
||||
channels[i].south().1.clone(),
|
||||
int_op,
|
||||
)
|
||||
}
|
||||
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
|
||||
euler::BoundaryCharacteristic::Vortex(vp) => {
|
||||
DistributedBoundaryConditions::Vortex(vp)
|
||||
}
|
||||
euler::BoundaryCharacteristic::Eval(eval) => {
|
||||
DistributedBoundaryConditions::Eval(eval)
|
||||
}
|
||||
},
|
||||
south: match bt_south {
|
||||
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
|
||||
euler::BoundaryCharacteristic::Grid(i) => {
|
||||
*requested_channels[i].north_mut() = true;
|
||||
DistributedBoundaryConditions::Channel(channels[i].north().1.clone())
|
||||
}
|
||||
euler::BoundaryCharacteristic::Interpolate(i, int_op) => {
|
||||
*requested_channels[i].north_mut() = true;
|
||||
DistributedBoundaryConditions::Interpolate(
|
||||
channels[i].north().1.clone(),
|
||||
int_op,
|
||||
)
|
||||
}
|
||||
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
|
||||
euler::BoundaryCharacteristic::Vortex(vp) => {
|
||||
DistributedBoundaryConditions::Vortex(vp)
|
||||
}
|
||||
euler::BoundaryCharacteristic::Eval(eval) => {
|
||||
DistributedBoundaryConditions::Eval(eval)
|
||||
}
|
||||
},
|
||||
east: match bt_east {
|
||||
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
|
||||
euler::BoundaryCharacteristic::Grid(i) => {
|
||||
*requested_channels[i].west_mut() = true;
|
||||
DistributedBoundaryConditions::Channel(channels[i].west().1.clone())
|
||||
}
|
||||
euler::BoundaryCharacteristic::Interpolate(i, int_op) => {
|
||||
*requested_channels[i].west_mut() = true;
|
||||
DistributedBoundaryConditions::Interpolate(
|
||||
channels[i].west().1.clone(),
|
||||
int_op,
|
||||
)
|
||||
}
|
||||
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
|
||||
euler::BoundaryCharacteristic::Vortex(vp) => {
|
||||
DistributedBoundaryConditions::Vortex(vp)
|
||||
}
|
||||
euler::BoundaryCharacteristic::Eval(eval) => {
|
||||
DistributedBoundaryConditions::Eval(eval)
|
||||
}
|
||||
},
|
||||
west: match bt_west {
|
||||
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
|
||||
euler::BoundaryCharacteristic::Grid(i) => {
|
||||
*requested_channels[i].east_mut() = true;
|
||||
DistributedBoundaryConditions::Channel(channels[i].east().1.clone())
|
||||
}
|
||||
euler::BoundaryCharacteristic::Interpolate(i, int_op) => {
|
||||
*requested_channels[i].east_mut() = true;
|
||||
DistributedBoundaryConditions::Interpolate(
|
||||
channels[i].east().1.clone(),
|
||||
int_op,
|
||||
)
|
||||
}
|
||||
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
|
||||
euler::BoundaryCharacteristic::Vortex(vp) => {
|
||||
DistributedBoundaryConditions::Vortex(vp)
|
||||
}
|
||||
euler::BoundaryCharacteristic::Eval(eval) => {
|
||||
DistributedBoundaryConditions::Eval(eval)
|
||||
}
|
||||
},
|
||||
};
|
||||
let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt {
|
||||
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
|
||||
euler::BoundaryCharacteristic::Grid(_) => {
|
||||
DistributedBoundaryConditions::Channel(chan.unwrap())
|
||||
}
|
||||
euler::BoundaryCharacteristic::Interpolate(_, int_op) => {
|
||||
DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op)
|
||||
}
|
||||
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
|
||||
euler::BoundaryCharacteristic::Vortex(vp) => {
|
||||
DistributedBoundaryConditions::Vortex(vp)
|
||||
}
|
||||
euler::BoundaryCharacteristic::Eval(eval) => {
|
||||
DistributedBoundaryConditions::Eval(eval)
|
||||
}
|
||||
});
|
||||
|
||||
let (ny, nx) = (grid.nx(), grid.ny());
|
||||
|
||||
tids.push(
|
||||
builder
|
||||
|
@ -322,11 +271,16 @@ impl System {
|
|||
dt,
|
||||
current,
|
||||
fut,
|
||||
k: [todo!(); 4],
|
||||
k: [
|
||||
Diff::zeros((ny, nx)),
|
||||
Diff::zeros((ny, nx)),
|
||||
Diff::zeros((ny, nx)),
|
||||
Diff::zeros((ny, nx)),
|
||||
],
|
||||
boundary_conditions,
|
||||
grid: (grid, metrics),
|
||||
output: (),
|
||||
push: todo!(),
|
||||
push,
|
||||
sbp,
|
||||
t: time,
|
||||
wb,
|
||||
|
@ -414,17 +368,7 @@ impl DistributedBoundaryConditions {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum PushCommunicator {
|
||||
Channel(Sender<Array2<Float>>),
|
||||
None,
|
||||
}
|
||||
|
||||
impl Default for PushCommunicator {
|
||||
fn default() -> Self {
|
||||
Self::None
|
||||
}
|
||||
}
|
||||
type PushCommunicator = Option<Sender<Array2<Float>>>;
|
||||
|
||||
struct DistributedSystemPart {
|
||||
grid: (Grid, Metrics),
|
||||
|
@ -458,21 +402,17 @@ impl DistributedSystemPart {
|
|||
|
||||
let mut rhs = |k: &mut euler::Diff, y: &euler::Field, time: Float| {
|
||||
// Send off the boundaries optimistically, in case some grid is ready
|
||||
match &push.north {
|
||||
PushCommunicator::None => (),
|
||||
PushCommunicator::Channel(s) => s.send(y.north().to_owned()).unwrap(),
|
||||
if let Some(s) = &push.north {
|
||||
s.send(y.north().to_owned()).unwrap()
|
||||
}
|
||||
match &push.south {
|
||||
PushCommunicator::None => (),
|
||||
PushCommunicator::Channel(s) => s.send(y.south().to_owned()).unwrap(),
|
||||
if let Some(s) = &push.south {
|
||||
s.send(y.south().to_owned()).unwrap()
|
||||
}
|
||||
match &push.east {
|
||||
PushCommunicator::None => (),
|
||||
PushCommunicator::Channel(s) => s.send(y.east().to_owned()).unwrap(),
|
||||
if let Some(s) = &push.east {
|
||||
s.send(y.east().to_owned()).unwrap()
|
||||
}
|
||||
match &push.west {
|
||||
PushCommunicator::None => (),
|
||||
PushCommunicator::Channel(s) => s.send(y.west().to_owned()).unwrap(),
|
||||
if let Some(s) = &push.west {
|
||||
s.send(y.west().to_owned()).unwrap()
|
||||
}
|
||||
|
||||
use std::ops::Deref;
|
||||
|
|
Loading…
Reference in New Issue