Fix ouptut for single/multi backend

This commit is contained in:
Magnus Ulimoen 2021-08-08 20:28:37 +00:00
parent 0a5647df3a
commit 4d44b4a74a
3 changed files with 191 additions and 181 deletions

View File

@ -1,155 +0,0 @@
use super::*;
pub struct OutputThread {
rx: Option<std::sync::mpsc::Receiver<Vec<euler::Field>>>,
tx: Option<std::sync::mpsc::SyncSender<(u64, Vec<euler::Field>)>>,
thread: Option<std::thread::JoinHandle<()>>,
}
impl OutputThread {
pub fn new(file: File) -> Self {
// Pingpong back and forth a number of Vec<Field> to be used for the
// output. The sync_channel applies some backpressure
let (tx_thread, rx) = std::sync::mpsc::channel::<Vec<euler::Field>>();
let (tx, rx_thread) = std::sync::mpsc::sync_channel::<(u64, Vec<euler::Field>)>(3);
let thread = std::thread::Builder::new()
.name("multigrid_output".to_owned())
.spawn(move || {
let mut times = Vec::<u64>::new();
for (ntime, fields) in rx_thread.iter() {
if !times.contains(&ntime) {
file.add_timestep(ntime, fields.as_slice()).unwrap();
times.push(ntime);
}
tx_thread.send(fields).unwrap();
}
})
.unwrap();
Self {
tx: Some(tx),
rx: Some(rx),
thread: Some(thread),
}
}
pub fn add_timestep(&mut self, ntime: u64, fields: &[euler::Field]) {
match self.rx.as_ref().unwrap().try_recv() {
Ok(mut copy_fields) => {
for (from, to) in fields.iter().zip(copy_fields.iter_mut()) {
to.clone_from(&from);
}
self.tx
.as_ref()
.unwrap()
.send((ntime, copy_fields))
.unwrap();
}
Err(std::sync::mpsc::TryRecvError::Empty) => {
let fields = fields.to_vec();
self.tx.as_ref().unwrap().send((ntime, fields)).unwrap();
}
Err(e) => panic!("{:?}", e),
};
}
}
impl Drop for OutputThread {
fn drop(&mut self) {
let tx = self.tx.take();
std::mem::drop(tx);
let thread = self.thread.take().unwrap();
thread.join().unwrap();
}
}
#[derive(Debug, Clone)]
pub struct File(hdf5::File, Vec<String>);
impl File {
pub fn create<P: AsRef<std::path::Path>>(
path: P,
grids: &[sbp::grid::Grid],
names: Vec<String>,
) -> Result<Self, Box<dyn std::error::Error>> {
assert_eq!(grids.len(), names.len());
let file = hdf5::File::create(path.as_ref())?;
let _tds = file
.new_dataset::<u64>()
.resizable(true)
.chunk((1,))
.create("t", (0,))?;
for (name, grid) in names.iter().zip(grids.iter()) {
let g = file.create_group(name)?;
g.link_soft("/t", "t").unwrap();
let add_dim = |name| {
g.new_dataset::<Float>()
.chunk((grid.ny(), grid.nx()))
.gzip(9)
.create(name, (grid.ny(), grid.nx()))
};
let xds = add_dim("x")?;
xds.write(grid.x())?;
let yds = add_dim("y")?;
yds.write(grid.y())?;
let add_var = |name| {
g.new_dataset::<Float>()
.gzip(3)
.shuffle(true)
.chunk((1, grid.ny(), grid.nx()))
.resizable(true)
.create(name, (0, grid.ny(), grid.nx()))
};
add_var("rho")?;
add_var("rhou")?;
add_var("rhov")?;
add_var("e")?;
}
Ok(Self(file, names))
}
pub fn add_timestep(
&self,
t: u64,
fields: &[euler::Field],
) -> Result<(), Box<dyn std::error::Error>> {
let file = &self.0;
let tds = file.dataset("t")?;
let tpos = tds.size();
tds.resize((tpos + 1,))?;
tds.write_slice(&[t], ndarray::s![tpos..tpos + 1])?;
for (groupname, fnow) in self.1.iter().zip(fields.iter()) {
let g = file.group(groupname)?;
let (tpos, ny, nx) = {
let ds = g.dataset("rho")?;
let shape = ds.shape();
(shape[0], shape[1], shape[2])
};
let rhods = g.dataset("rho")?;
let rhouds = g.dataset("rhou")?;
let rhovds = g.dataset("rhov")?;
let eds = g.dataset("e")?;
let (rho, rhou, rhov, e) = fnow.components();
rhods.resize((tpos + 1, ny, nx))?;
rhods.write_slice(rho, ndarray::s![tpos, .., ..])?;
rhouds.resize((tpos + 1, ny, nx))?;
rhouds.write_slice(rhou, ndarray::s![tpos, .., ..])?;
rhovds.resize((tpos + 1, ny, nx))?;
rhovds.write_slice(rhov, ndarray::s![tpos, .., ..])?;
eds.resize((tpos + 1, ny, nx))?;
eds.write_slice(e, ndarray::s![tpos, .., ..])?;
}
Ok(())
}
}

View File

@ -1,13 +1,10 @@
use argh::FromArgs; use argh::FromArgs;
use euler::eval::Evaluator;
use sbp::*; use sbp::*;
mod file; mod eval;
mod input; mod input;
mod parsing; mod parsing;
use file::*;
mod eval;
mod system; mod system;
#[derive(Debug, FromArgs)] #[derive(Debug, FromArgs)]
@ -90,13 +87,14 @@ fn main() {
); );
// System::new(grids, grid_connections, operators); // System::new(grids, grid_connections, operators);
let sys = if opt.distribute { let mut sys = if opt.distribute {
basesystem.create_distributed() basesystem.create_distributed()
} else { } else {
basesystem.create() basesystem.create()
}; };
let dt = sys.max_dt(); let dt = sys.max_dt();
sys.set_dt(dt);
let ntime = (integration_time / dt).round() as u64; let ntime = (integration_time / dt).round() as u64;
let steps_between_outputs = if let Some(n) = opt.number_of_outputs { let steps_between_outputs = if let Some(n) = opt.number_of_outputs {
std::cmp::max(n / ntime, 1) std::cmp::max(n / ntime, 1)

View File

@ -80,7 +80,38 @@ impl BaseSystem {
.map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid)) .map(|(bt, grid)| euler::BoundaryStorage::new(bt, grid))
.collect(); .collect();
let sys = SingleThreadedSystem { let mut outputs = Vec::with_capacity(self.grids.len());
for (name, grid) in self.names.iter().zip(&self.grids) {
let g = self.output.create_group(name).unwrap();
g.link_soft("/t", "t").unwrap();
let add_dim = |name| {
g.new_dataset::<Float>()
.chunk((grid.ny(), grid.nx()))
.gzip(9)
.create(name, (grid.ny(), grid.nx()))
};
let xds = add_dim("x").unwrap();
xds.write(grid.x()).unwrap();
let yds = add_dim("y").unwrap();
yds.write(grid.y()).unwrap();
let add_var = |name| {
g.new_dataset::<Float>()
.gzip(3)
.shuffle(true)
.chunk((1, grid.ny(), grid.nx()))
.resizable(true)
.create(name, (0, grid.ny(), grid.nx()))
};
add_var("rho").unwrap();
add_var("rhou").unwrap();
add_var("rhov").unwrap();
add_var("e").unwrap();
outputs.push(g);
}
let mut sys = SingleThreadedSystem {
fnow, fnow,
fnext, fnext,
k, k,
@ -90,8 +121,9 @@ impl BaseSystem {
bt: self.boundary_conditions, bt: self.boundary_conditions,
eb, eb,
time: self.time, time: self.time,
dt: Float::NAN,
operators: self.operators, operators: self.operators,
output: todo!(), output: (self.output, outputs),
}; };
match &self.initial_conditions { match &self.initial_conditions {
/* /*
@ -120,7 +152,6 @@ impl BaseSystem {
/// Spreads the computation over n threads, in a thread per grid way. /// Spreads the computation over n threads, in a thread per grid way.
pub fn create_distributed(self) -> System { pub fn create_distributed(self) -> System {
let nthreads = self.grids.len(); let nthreads = self.grids.len();
// let dt = self.max_dt();
// Build up the boundary conditions // Build up the boundary conditions
let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> = let mut push_channels: Vec<Direction<Option<Sender<Array2<Float>>>>> =
@ -215,6 +246,8 @@ impl BaseSystem {
communicators.push(t_send); communicators.push(t_send);
let time = self.time; let time = self.time;
let output = self.output.clone();
tids.push( tids.push(
builder builder
.spawn(move || { .spawn(move || {
@ -231,19 +264,48 @@ impl BaseSystem {
let wb = WorkBuffers::new(ny, nx); let wb = WorkBuffers::new(ny, nx);
let g = {
let g = output.create_group(&name).unwrap();
g.link_soft("/t", "t").unwrap();
let add_dim = |name| {
g.new_dataset::<Float>()
.chunk((grid.ny(), grid.nx()))
.gzip(9)
.create(name, (grid.ny(), grid.nx()))
};
let xds = add_dim("x").unwrap();
xds.write(grid.x()).unwrap();
let yds = add_dim("y").unwrap();
yds.write(grid.y()).unwrap();
let add_var = |name| {
g.new_dataset::<Float>()
.gzip(3)
.shuffle(true)
.chunk((1, grid.ny(), grid.nx()))
.resizable(true)
.create(name, (0, grid.ny(), grid.nx()))
};
add_var("rho").unwrap();
add_var("rhou").unwrap();
add_var("rhov").unwrap();
add_var("e").unwrap();
g
};
let mut sys = DistributedSystemPart { let mut sys = DistributedSystemPart {
current, current,
fut, fut,
k, k,
boundary_conditions, boundary_conditions,
grid: (grid, metrics), grid: (grid, metrics),
output: todo!(), output: g,
push, push,
sbp, sbp,
t: time, t: time,
dt: Float::NAN, dt: Float::NAN,
name, _name: name,
id, id,
recv: t_recv, recv: t_recv,
@ -266,6 +328,7 @@ impl BaseSystem {
sys: tids, sys: tids,
recv: master_recv, recv: master_recv,
send: communicators, send: communicators,
output: self.output,
}) })
} }
} }
@ -277,13 +340,47 @@ pub enum System {
impl System { impl System {
pub fn max_dt(&self) -> Float { pub fn max_dt(&self) -> Float {
todo!() match self {
Self::SingleThreaded(sys) => sys.max_dt(),
Self::MultiThreaded(sys) => {
for send in &sys.send {
send.send(MsgFromHost::DtRequest).unwrap();
}
let mut max_dt = Float::MAX;
let mut to_receive = sys.sys.len();
while to_receive != 0 {
let dt = match sys.recv.recv().unwrap() {
(_, MsgToHost::MaxDt(dt)) => dt,
_ => unreachable!(),
};
max_dt = max_dt.min(dt);
to_receive -= 1;
}
max_dt
}
}
}
pub fn set_dt(&mut self, dt: Float) {
match self {
Self::SingleThreaded(sys) => sys.dt = dt,
Self::MultiThreaded(sys) => {
for tid in &sys.send {
tid.send(MsgFromHost::DtSet(dt)).unwrap()
}
}
}
}
pub fn advance(&mut self, nsteps: u64) {
match self {
Self::SingleThreaded(sys) => sys.advance(nsteps),
Self::MultiThreaded(sys) => sys.advance(nsteps),
} }
pub fn advance(&self, nsteps: u64) {
todo!()
} }
pub fn output(&self, ntime: u64) { pub fn output(&self, ntime: u64) {
todo!() match self {
Self::SingleThreaded(sys) => sys.output(ntime),
Self::MultiThreaded(sys) => sys.output(ntime),
}
} }
} }
@ -297,8 +394,9 @@ pub struct SingleThreadedSystem {
pub bt: Vec<euler::BoundaryCharacteristics>, pub bt: Vec<euler::BoundaryCharacteristics>,
pub eb: Vec<euler::BoundaryStorage>, pub eb: Vec<euler::BoundaryStorage>,
pub time: Float, pub time: Float,
pub dt: Float,
pub operators: Vec<Box<dyn SbpOperator2d>>, pub operators: Vec<Box<dyn SbpOperator2d>>,
pub output: Vec<hdf5::Dataset>, pub output: (hdf5::File, Vec<hdf5::Group>),
} }
impl integrate::Integrable for SingleThreadedSystem { impl integrate::Integrable for SingleThreadedSystem {
@ -319,7 +417,13 @@ impl SingleThreadedSystem {
} }
} }
pub fn advance(&mut self, dt: Float) { pub fn advance(&mut self, nsteps: u64) {
for _ in 0..nsteps {
self.advance_single_step(self.dt)
}
}
pub fn advance_single_step(&mut self, dt: Float) {
let metrics = &self.metrics; let metrics = &self.metrics;
let grids = &self.grids; let grids = &self.grids;
let bt = &self.bt; let bt = &self.bt;
@ -409,6 +513,34 @@ impl SingleThreadedSystem {
max_dt max_dt
} }
pub fn output(&self, ntime: u64) {
let tds = self.output.0.dataset("t").unwrap();
let tpos = tds.size();
tds.resize((tpos + 1,)).unwrap();
tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1])
.unwrap();
for (group, fnow) in self.output.1.iter().zip(&self.fnow) {
let (ny, nx) = (fnow.ny(), fnow.nx());
let rhods = group.dataset("rho").unwrap();
let rhouds = group.dataset("rhou").unwrap();
let rhovds = group.dataset("rhov").unwrap();
let eds = group.dataset("e").unwrap();
let (rho, rhou, rhov, e) = fnow.components();
rhods.resize((tpos + 1, ny, nx)).unwrap();
rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap();
rhouds.resize((tpos + 1, ny, nx)).unwrap();
rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap();
rhovds.resize((tpos + 1, ny, nx)).unwrap();
rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap();
eds.resize((tpos + 1, ny, nx)).unwrap();
eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap();
}
}
} }
pub struct DistributedSystem { pub struct DistributedSystem {
@ -416,23 +548,31 @@ pub struct DistributedSystem {
send: Vec<Sender<MsgFromHost>>, send: Vec<Sender<MsgFromHost>>,
/// All threads should be joined to mark the end of the computation /// All threads should be joined to mark the end of the computation
sys: Vec<std::thread::JoinHandle<()>>, sys: Vec<std::thread::JoinHandle<()>>,
output: hdf5::File,
} }
impl DistributedSystem { impl DistributedSystem {
pub fn advance(&mut self, ntime: u64) { pub fn advance(&mut self, ntime: u64) {
for tid in &self.send { for tid in &self.send {
tid.send(MsgFromHost::Advance(ntime)); tid.send(MsgFromHost::Advance(ntime)).unwrap();
} }
} }
pub fn output(&self) { pub fn output(&self, ntime: u64) {
todo!() for tid in &self.send {
tid.send(MsgFromHost::Output(ntime)).unwrap();
}
let tds = self.output.dataset("t").unwrap();
let tpos = tds.size();
tds.resize((tpos + 1,)).unwrap();
tds.write_slice(&[ntime], ndarray::s![tpos..tpos + 1])
.unwrap();
} }
} }
impl Drop for DistributedSystem { impl Drop for DistributedSystem {
fn drop(&mut self) { fn drop(&mut self) {
for tid in &self.send { for tid in &self.send {
tid.send(MsgFromHost::Stop); tid.send(MsgFromHost::Stop).unwrap();
} }
let handles = std::mem::take(&mut self.sys); let handles = std::mem::take(&mut self.sys);
for tid in handles { for tid in handles {
@ -443,7 +583,8 @@ impl Drop for DistributedSystem {
enum MsgFromHost { enum MsgFromHost {
Advance(u64), Advance(u64),
MaxDt(Float), DtRequest,
DtSet(Float),
Output(u64), Output(u64),
Stop, Stop,
} }
@ -480,12 +621,12 @@ struct DistributedSystemPart {
t: Float, t: Float,
dt: Float, dt: Float,
name: String, _name: String,
id: usize, id: usize,
recv: Receiver<MsgFromHost>, recv: Receiver<MsgFromHost>,
send: Sender<(usize, MsgToHost)>, send: Sender<(usize, MsgToHost)>,
output: hdf5::Dataset, output: hdf5::Group,
k: [Diff; 4], k: [Diff; 4],
wb: WorkBuffers, wb: WorkBuffers,
@ -499,16 +640,42 @@ impl DistributedSystemPart {
fn run(&mut self) { fn run(&mut self) {
loop { loop {
match self.recv.recv().unwrap() { match self.recv.recv().unwrap() {
MsgFromHost::MaxDt(dt) => self.dt = dt, MsgFromHost::DtSet(dt) => self.dt = dt,
MsgFromHost::DtRequest => todo!(),
MsgFromHost::Advance(ntime) => self.advance(ntime), MsgFromHost::Advance(ntime) => self.advance(ntime),
MsgFromHost::Output(ntime) => todo!(), MsgFromHost::Output(ntime) => self.output(ntime),
MsgFromHost::Stop => return, MsgFromHost::Stop => return,
} }
} }
} }
fn output(&mut self, _ntime: u64) {
let (ny, nx) = (self.current.ny(), self.current.nx());
let rhods = self.output.dataset("rho").unwrap();
let rhouds = self.output.dataset("rhou").unwrap();
let rhovds = self.output.dataset("rhov").unwrap();
let eds = self.output.dataset("e").unwrap();
let (rho, rhou, rhov, e) = self.current.components();
let tpos = rhods.size() / (ny * nx);
rhods.resize((tpos + 1, ny, nx)).unwrap();
rhods.write_slice(rho, ndarray::s![tpos, .., ..]).unwrap();
rhouds.resize((tpos + 1, ny, nx)).unwrap();
rhouds.write_slice(rhou, ndarray::s![tpos, .., ..]).unwrap();
rhovds.resize((tpos + 1, ny, nx)).unwrap();
rhovds.write_slice(rhov, ndarray::s![tpos, .., ..]).unwrap();
eds.resize((tpos + 1, ny, nx)).unwrap();
eds.write_slice(e, ndarray::s![tpos, .., ..]).unwrap();
}
fn advance(&mut self, ntime: u64) { fn advance(&mut self, ntime: u64) {
for ntime in 0..ntime { for ntime in 0..ntime {
self.send.send((self.id, MsgToHost::CurrentTimestep(ntime))); self.send
.send((self.id, MsgToHost::CurrentTimestep(ntime)))
.unwrap();
let metrics = &self.grid.1; let metrics = &self.grid.1;
let wb = &mut self.wb.0; let wb = &mut self.wb.0;
let sbp = &self.sbp; let sbp = &self.sbp;