Compilable state

This commit is contained in:
Magnus Ulimoen 2021-08-07 17:30:00 +00:00
parent 4319e403a5
commit 0a5647df3a
2 changed files with 320 additions and 271 deletions

View File

@ -86,42 +86,28 @@ fn main() {
operators, operators,
boundary_conditions, boundary_conditions,
initial_conditions.clone(), initial_conditions.clone(),
opt.output.clone(),
); );
let mut sys = basesystem.create();
// System::new(grids, grid_connections, operators); // System::new(grids, grid_connections, operators);
let dt = sys.max_dt(); let sys = if opt.distribute {
basesystem.create_distributed()
let ntime = (integration_time / dt).round() as u64; } else {
basesystem.create()
if opt.distribute {
let sys = sys.distribute(ntime);
let timer = if opt.timings {
Some(std::time::Instant::now())
} else {
None
};
sys.run();
if let Some(timer) = timer {
let duration = timer.elapsed();
println!("Duration: {:?}", duration);
}
return;
}
let should_output = |itime| {
opt.number_of_outputs.map_or(false, |num_out| {
if num_out == 0 {
false
} else {
itime % (std::cmp::max(ntime / (num_out - 1), 1)) == 0
}
})
}; };
let output = File::create(&opt.output, sys.grids.as_slice(), names).unwrap(); let dt = sys.max_dt();
let mut output = OutputThread::new(output); let ntime = (integration_time / dt).round() as u64;
output.add_timestep(0, &sys.fnow); let steps_between_outputs = if let Some(n) = opt.number_of_outputs {
std::cmp::max(n / ntime, 1)
} else {
ntime
};
sys.output(0);
//let output = File::create(&opt.output, sys.grids.as_slice(), names).unwrap();
//let mut output = OutputThread::new(output);
//output.add_timestep(0, &sys.fnow);
let progressbar = progressbar(opt.no_progressbar, ntime); let progressbar = progressbar(opt.no_progressbar, ntime);
@ -131,6 +117,16 @@ fn main() {
None None
}; };
let mut itime = 0;
while itime < ntime {
sys.advance(steps_between_outputs);
progressbar.inc(1);
itime += steps_between_outputs;
sys.output(itime);
}
/*
for itime in 0..ntime { for itime in 0..ntime {
if should_output(itime) { if should_output(itime) {
output.add_timestep(itime, &sys.fnow); output.add_timestep(itime, &sys.fnow);
@ -138,6 +134,7 @@ fn main() {
progressbar.inc(1); progressbar.inc(1);
sys.advance(dt); sys.advance(dt);
} }
*/
progressbar.finish_and_clear(); progressbar.finish_and_clear();
let mut outinfo = OutputInformation { let mut outinfo = OutputInformation {
@ -150,8 +147,9 @@ fn main() {
outinfo.time_elapsed = Some(duration); outinfo.time_elapsed = Some(duration);
} }
output.add_timestep(ntime, &sys.fnow); //output.add_timestep(ntime, &sys.fnow);
/*
if opt.error { if opt.error {
let time = ntime as Float * dt; let time = ntime as Float * dt;
let mut e = 0.0; let mut e = 0.0;
@ -171,6 +169,7 @@ fn main() {
} }
outinfo.error = Some(e); outinfo.error = Some(e);
} }
*/
if opt.output_json { if opt.output_json {
println!("{}", json5::to_string(&outinfo).unwrap()); println!("{}", json5::to_string(&outinfo).unwrap());

View File

@ -1,5 +1,6 @@
use crate::parsing; use crate::parsing;
use crate::utils::Direction; use crate::utils::Direction;
use core::ops::Deref;
use crossbeam_channel::{Receiver, Select, Sender}; use crossbeam_channel::{Receiver, Select, Sender};
use euler::{ use euler::{
eval::{self, Evaluator}, eval::{self, Evaluator},
@ -9,7 +10,6 @@ use ndarray::Array2;
use sbp::grid::{Grid, Metrics}; use sbp::grid::{Grid, Metrics};
use sbp::operators::{InterpolationOperator, SbpOperator2d}; use sbp::operators::{InterpolationOperator, SbpOperator2d};
use sbp::*; use sbp::*;
use std::sync::{Arc, Barrier};
pub struct BaseSystem { pub struct BaseSystem {
pub names: Vec<String>, pub names: Vec<String>,
@ -18,6 +18,7 @@ pub struct BaseSystem {
pub boundary_conditions: Vec<euler::BoundaryCharacteristics>, pub boundary_conditions: Vec<euler::BoundaryCharacteristics>,
pub initial_conditions: crate::parsing::InitialConditions, pub initial_conditions: crate::parsing::InitialConditions,
pub operators: Vec<Box<dyn SbpOperator2d>>, pub operators: Vec<Box<dyn SbpOperator2d>>,
pub output: hdf5::File,
} }
impl BaseSystem { impl BaseSystem {
@ -28,7 +29,15 @@ impl BaseSystem {
operators: Vec<Box<dyn SbpOperator2d>>, operators: Vec<Box<dyn SbpOperator2d>>,
boundary_conditions: Vec<euler::BoundaryCharacteristics>, boundary_conditions: Vec<euler::BoundaryCharacteristics>,
initial_conditions: crate::parsing::InitialConditions, initial_conditions: crate::parsing::InitialConditions,
output: std::path::PathBuf,
) -> Self { ) -> Self {
let output = hdf5::File::create(output).unwrap();
output
.new_dataset::<u64>()
.resizable(true)
.chunk((1,))
.create("t", (0,))
.unwrap();
Self { Self {
names, names,
grids, grids,
@ -36,10 +45,54 @@ impl BaseSystem {
boundary_conditions, boundary_conditions,
initial_conditions, initial_conditions,
operators, operators,
output,
} }
} }
pub fn create(self) -> System { pub fn create(self) -> System {
let mut sys = System::new(self.grids, self.boundary_conditions, self.operators); let fnow = self
.grids
.iter()
.map(|g| euler::Field::new(g.ny(), g.nx()))
.collect::<Vec<_>>();
let fnext = fnow.clone();
let wb = self
.grids
.iter()
.map(|g| euler::WorkBuffers::new(g.ny(), g.nx()))
.collect();
let k = self
.grids
.iter()
.map(|g| euler::Diff::zeros((g.ny(), g.nx())))
.collect::<Vec<_>>();
let k = [k.clone(), k.clone(), k.clone(), k];
let metrics = self
.grids
.iter()
.zip(&self.operators)
.map(|(g, op)| g.metrics(&**op).unwrap())
.collect::<Vec<_>>();
let eb = self
.boundary_conditions
.iter()
.zip(&self.grids)
.map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid))
.collect();
let sys = SingleThreadedSystem {
fnow,
fnext,
k,
wb,
grids: self.grids,
metrics,
bt: self.boundary_conditions,
eb,
time: self.time,
operators: self.operators,
output: todo!(),
};
match &self.initial_conditions { match &self.initial_conditions {
/* /*
parsing::InitialConditions::File(f) => { parsing::InitialConditions::File(f) => {
@ -61,11 +114,180 @@ impl BaseSystem {
} }
} }
} }
sys System::SingleThreaded(sys)
}
/// Spreads the computation over n threads, in a thread per grid way.
pub fn create_distributed(self) -> System {
let nthreads = self.grids.len();
// let dt = self.max_dt();
// Build up the boundary conditions
let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> =
Vec::with_capacity(nthreads);
let mut pull_channels: Vec<Direction<Option<Receiver<Array2<Float>>>>> =
vec![Direction::default(); nthreads];
// Build the set of communicators between boundaries
for wb in &self.boundary_conditions {
let mut local_push = Direction::default();
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.south_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.north_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.north_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.south_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.west_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.east_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.east_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.west_mut() = Some(s);
}
push_channels.push(local_push);
}
let (master_send, master_recv) = crossbeam_channel::unbounded();
let mut tids = Vec::with_capacity(nthreads);
let mut communicators = Vec::with_capacity(nthreads);
for (id, (((((name, grid), sbp), bt), chan), push)) in self
.names
.into_iter()
.zip(self.grids.into_iter())
.zip(self.operators.into_iter())
.zip(self.boundary_conditions)
.zip(pull_channels)
.zip(push_channels)
.enumerate()
{
let builder = std::thread::Builder::new().name(format!("eulersolver: {}", name));
let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt {
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
euler::BoundaryCharacteristic::Grid(_) => {
DistributedBoundaryConditions::Channel(chan.unwrap())
}
euler::BoundaryCharacteristic::Interpolate(_, int_op) => {
DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op)
}
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
euler::BoundaryCharacteristic::Vortex(vp) => {
DistributedBoundaryConditions::Vortex(vp)
}
euler::BoundaryCharacteristic::Eval(eval) => {
DistributedBoundaryConditions::Eval(eval)
}
});
let master_send = master_send.clone();
let (t_send, t_recv) = crossbeam_channel::unbounded();
communicators.push(t_send);
let time = self.time;
tids.push(
builder
.spawn(move || {
let (ny, nx) = (grid.ny(), grid.nx());
let current = Field::new(ny, nx);
let fut = current.clone();
let k = [
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
];
let metrics = grid.metrics(sbp.deref()).unwrap();
let wb = WorkBuffers::new(ny, nx);
let mut sys = DistributedSystemPart {
current,
fut,
k,
boundary_conditions,
grid: (grid, metrics),
output: todo!(),
push,
sbp,
t: time,
dt: Float::NAN,
name,
id,
recv: t_recv,
send: master_send,
wb,
wb_ns: Array2::zeros((4, nx)),
wb_ew: Array2::zeros((4, ny)),
};
// init and send maxdt
// receive maxdt
sys.run();
})
.unwrap(),
);
}
System::MultiThreaded(DistributedSystem {
sys: tids,
recv: master_recv,
send: communicators,
})
} }
} }
pub struct System { pub enum System {
SingleThreaded(SingleThreadedSystem),
MultiThreaded(DistributedSystem),
}
impl System {
pub fn max_dt(&self) -> Float {
todo!()
}
pub fn advance(&self, nsteps: u64) {
todo!()
}
pub fn output(&self, ntime: u64) {
todo!()
}
}
pub struct SingleThreadedSystem {
pub fnow: Vec<euler::Field>, pub fnow: Vec<euler::Field>,
pub fnext: Vec<euler::Field>, pub fnext: Vec<euler::Field>,
pub wb: Vec<euler::WorkBuffers>, pub wb: Vec<euler::WorkBuffers>,
@ -76,9 +298,10 @@ pub struct System {
pub eb: Vec<euler::BoundaryStorage>, pub eb: Vec<euler::BoundaryStorage>,
pub time: Float, pub time: Float,
pub operators: Vec<Box<dyn SbpOperator2d>>, pub operators: Vec<Box<dyn SbpOperator2d>>,
pub output: Vec<hdf5::Dataset>,
} }
impl integrate::Integrable for System { impl integrate::Integrable for SingleThreadedSystem {
type State = Vec<euler::Field>; type State = Vec<euler::Field>;
type Diff = Vec<euler::Diff>; type Diff = Vec<euler::Diff>;
@ -89,52 +312,7 @@ impl integrate::Integrable for System {
} }
} }
impl System { impl SingleThreadedSystem {
pub fn new(
grids: Vec<grid::Grid>,
bt: Vec<euler::BoundaryCharacteristics>,
operators: Vec<Box<dyn SbpOperator2d>>,
) -> Self {
let fnow = grids
.iter()
.map(|g| euler::Field::new(g.ny(), g.nx()))
.collect::<Vec<_>>();
let fnext = fnow.clone();
let wb = grids
.iter()
.map(|g| euler::WorkBuffers::new(g.ny(), g.nx()))
.collect();
let k = grids
.iter()
.map(|g| euler::Diff::zeros((g.ny(), g.nx())))
.collect::<Vec<_>>();
let k = [k.clone(), k.clone(), k.clone(), k];
let metrics = grids
.iter()
.zip(&operators)
.map(|(g, op)| g.metrics(&**op).unwrap())
.collect::<Vec<_>>();
let eb = bt
.iter()
.zip(&grids)
.map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid))
.collect();
Self {
fnow,
fnext,
k,
wb,
grids,
metrics,
bt,
eb,
time: 0.0,
operators,
}
}
pub fn vortex(&mut self, t: Float, vortex_params: &euler::VortexParameters) { pub fn vortex(&mut self, t: Float, vortex_params: &euler::VortexParameters) {
for (f, g) in self.fnow.iter_mut().zip(&self.grids) { for (f, g) in self.fnow.iter_mut().zip(&self.grids) {
f.vortex(g.x(), g.y(), t, &vortex_params); f.vortex(g.x(), g.y(), t, &vortex_params);
@ -170,7 +348,7 @@ impl System {
} }
}; };
integrate::integrate::<integrate::Rk4, System, _>( integrate::integrate::<integrate::Rk4, SingleThreadedSystem, _>(
rhs, rhs,
&self.fnow, &self.fnow,
&mut self.fnext, &mut self.fnext,
@ -231,191 +409,48 @@ impl System {
max_dt max_dt
} }
/// Spreads the computation over n threads, in a thread per grid way.
/// This system can only be called once for ntime calls.
pub fn distribute(self, ntime: u64) -> DistributedSystem {
let nthreads = self.grids.len();
let time = 0.0;
// alt: crossbeam::WaitGroup
let b = Arc::new(Barrier::new(nthreads + 1));
let dt = self.max_dt();
// Build up the boundary conditions
let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> =
Vec::with_capacity(nthreads);
let mut pull_channels: Vec<Direction<Option<Receiver<Array2<Float>>>>> =
vec![Direction::default(); nthreads];
// Build the set of communicators between boundaries
for wb in &self.bt {
let mut local_push = Direction::default();
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.north()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.south_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.north_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.south()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.north_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.south_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.east()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.west_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.east_mut() = Some(s);
}
if let euler::BoundaryCharacteristic::Grid(i)
| euler::BoundaryCharacteristic::Interpolate(i, _) = wb.west()
{
let (s, r) = crossbeam_channel::bounded(1);
pull_channels[*i]
.east_mut()
.replace(r)
.and_then::<(), _>(|_| panic!("channel is already present"));
*local_push.west_mut() = Some(s);
}
push_channels.push(local_push);
}
let mut tids = Vec::new();
for ((((((((current, fut), grid), metrics), sbp), wb), bt), chan), push) in self
.fnow
.into_iter()
.zip(self.fnext.into_iter())
.zip(self.grids.into_iter())
.zip(self.metrics.into_iter())
.zip(self.operators.into_iter())
.zip(self.wb.into_iter())
.zip(self.bt)
.zip(pull_channels)
.zip(push_channels)
{
let builder = std::thread::Builder::new().name(format!("eulersolver: {}", "smth"));
let barrier = b.clone();
let boundary_conditions = bt.zip(chan).map(|(bt, chan)| match bt {
euler::BoundaryCharacteristic::This => DistributedBoundaryConditions::This,
euler::BoundaryCharacteristic::Grid(_) => {
DistributedBoundaryConditions::Channel(chan.unwrap())
}
euler::BoundaryCharacteristic::Interpolate(_, int_op) => {
DistributedBoundaryConditions::Interpolate(chan.unwrap(), int_op)
}
euler::BoundaryCharacteristic::MultiGrid(_) => unimplemented!(),
euler::BoundaryCharacteristic::Vortex(vp) => {
DistributedBoundaryConditions::Vortex(vp)
}
euler::BoundaryCharacteristic::Eval(eval) => {
DistributedBoundaryConditions::Eval(eval)
}
});
let (ny, nx) = (grid.ny(), grid.nx());
tids.push(
builder
.spawn(move || {
let mut sys = DistributedSystemPart {
barrier,
ntime,
dt,
current,
fut,
k: [
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
Diff::zeros((ny, nx)),
],
boundary_conditions,
grid: (grid, metrics),
_output: (),
push,
sbp,
t: time,
wb,
wb_ns: Array2::zeros((4, nx)),
wb_ew: Array2::zeros((4, ny)),
};
sys.advance();
})
.unwrap(),
);
}
// Set up communicators
// Spawn a new communicator
DistributedSystem {
_ntime: ntime,
start: b,
sys: tids,
}
}
} }
// single-threaded items: clone_from
// sync points: scaled_add, rhs
//
// Could instead make every thread (of a threadpool?) carry one grid themselves,
// and instead only wait on obtaining bc, which would make synchronisation
// be somewhat minimal
//
// Difficulties/implementation notes:
// * How to get output from each thread? Push to the multi-producer, single consumer queue
// with capacity of 2*N (which ensures internal synchronisation stays active), or many
// channels which are select!'ed. Choose instead a (name, itime, data) tuple, which is
// communicated. Also initialise the output file all at the start, since we know the
// number of time steps, and which timesteps will be used (should_output).
// * How to balance the number of threads/jobs?
// Each grid will be pushed to a thread, and a thread pool created for use by all threads
// combined. Set affinity to limit the number of cores available (could have side effects
// on heat etc, although this would also be the case for multi-threaded code). A
// thread-per-core architecture + the output thread if we have enough cores, else let os
// control the scheduling for us.
// Use cgroups to artificially limit the number of cores.
// Use taskset to artificially limit the number of cores.
// * Mechanism to wait for bc, is select available? Use a channel with 0-4 capacity, each
// other grid additonally pushes their direction to always use the available
// Use select or similar from crossbeam_channel
// * Mechanism to push bc: try_push and then move to next bc if not available. Spawn a
// thread into a threadpool to handle this.
// * Each grid is spawned onto an async threadpool with M threads allocated to itself, this should
// allow async waiting on the channel (or in the future from process/internet/bc processor)
// * BC: A local thread can have a handle
pub struct DistributedSystem { pub struct DistributedSystem {
/// Simple messaging system to be replaced by a more sophisticated system (e.g. run 5 steps, recv: Receiver<(usize, MsgToHost)>,
/// collect, initialise, return something to main) send: Vec<Sender<MsgFromHost>>,
/// This simply waits until all threads are ready, then starts the computation on all threads /// All threads should be joined to mark the end of the computation
start: Arc<Barrier>,
_ntime: u64,
/// These should be joined to mark the end of the computation
sys: Vec<std::thread::JoinHandle<()>>, sys: Vec<std::thread::JoinHandle<()>>,
} }
impl DistributedSystem { impl DistributedSystem {
pub fn run(self) { pub fn advance(&mut self, ntime: u64) {
// This should start as we have n thread, but barrier blocks on n+1 for tid in &self.send {
self.start.wait(); tid.send(MsgFromHost::Advance(ntime));
self.sys.into_iter().for_each(|tid| tid.join().unwrap()); }
} }
pub fn output(&self) {
todo!()
}
}
impl Drop for DistributedSystem {
fn drop(&mut self) {
for tid in &self.send {
tid.send(MsgFromHost::Stop);
}
let handles = std::mem::take(&mut self.sys);
for tid in handles {
tid.join().unwrap()
}
}
}
enum MsgFromHost {
Advance(u64),
MaxDt(Float),
Output(u64),
Stop,
}
enum MsgToHost {
MaxDt(Float),
CurrentTimestep(u64),
} }
// #[derive(Debug)] // #[derive(Debug)]
@ -441,23 +476,39 @@ struct DistributedSystemPart {
current: Field, current: Field,
fut: Field, fut: Field,
k: [Diff; 4],
wb: WorkBuffers,
barrier: Arc<Barrier>,
_output: (), // hdf5::Dataset eventually,
t: Float, t: Float,
dt: Float, dt: Float,
ntime: u64,
/// Work buffer for boundaries name: String,
id: usize,
recv: Receiver<MsgFromHost>,
send: Sender<(usize, MsgToHost)>,
output: hdf5::Dataset,
k: [Diff; 4],
wb: WorkBuffers,
/// Work buffer for north/south boundary
wb_ns: Array2<Float>, wb_ns: Array2<Float>,
/// Work buffer for east/west boundary
wb_ew: Array2<Float>, wb_ew: Array2<Float>,
} }
impl DistributedSystemPart { impl DistributedSystemPart {
fn advance(&mut self) { fn run(&mut self) {
self.barrier.wait(); loop {
for _i in 0..self.ntime { match self.recv.recv().unwrap() {
MsgFromHost::MaxDt(dt) => self.dt = dt,
MsgFromHost::Advance(ntime) => self.advance(ntime),
MsgFromHost::Output(ntime) => todo!(),
MsgFromHost::Stop => return,
}
}
}
fn advance(&mut self, ntime: u64) {
for ntime in 0..ntime {
self.send.send((self.id, MsgToHost::CurrentTimestep(ntime)));
let metrics = &self.grid.1; let metrics = &self.grid.1;
let wb = &mut self.wb.0; let wb = &mut self.wb.0;
let sbp = &self.sbp; let sbp = &self.sbp;
@ -482,7 +533,6 @@ impl DistributedSystemPart {
s.send(y.west().to_owned()).unwrap() s.send(y.west().to_owned()).unwrap()
} }
use std::ops::Deref;
// This computation does not depend on the boundaries // This computation does not depend on the boundaries
euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb); euler::RHS_no_SAT(sbp.deref(), k, y, metrics, wb);