cargo fmt

This commit is contained in:
John Goerzen 2020-09-20 23:56:11 -05:00
parent 0f93ff9dc2
commit e80f891340
7 changed files with 185 additions and 122 deletions

View File

@ -16,23 +16,27 @@
*/
use log::*;
use simplelog::*;
use std::io;
use log::*;
use std::thread;
mod ping;
mod pipe;
mod ser;
mod xb;
mod xbpacket;
mod xbrx;
mod pipe;
mod ping;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(name = "xbnet", about = "Networking for XBee Radios", author = "John Goerzen <jgoerzen@complete.org>")]
#[structopt(
name = "xbnet",
about = "Networking for XBee Radios",
author = "John Goerzen <jgoerzen@complete.org>"
)]
struct Opt {
/// Activate debug mode
// short and long flags (-d, --debug) will be deduced from the field's name
@ -48,7 +52,7 @@ struct Opt {
port: PathBuf,
#[structopt(subcommand)]
cmd: Command
cmd: Command,
}
#[derive(Debug, StructOpt)]
@ -62,7 +66,7 @@ enum Command {
/// Receive ping requests and transmit pongs
Pong,
/// Pipe data across radios using the xbnet protocol
Pipe{
Pipe {
/// The 64-bit destination for the pipe, in hex
#[structopt(long)]
dest: String,
@ -74,7 +78,8 @@ fn main() {
let opt = Opt::from_args();
if opt.debug {
WriteLogger::init(LevelFilter::Trace, Config::default(), io::stderr()).expect("Failed to init log");
WriteLogger::init(LevelFilter::Trace, Config::default(), io::stderr())
.expect("Failed to init log");
}
info!("xbnet starting");
@ -82,23 +87,26 @@ fn main() {
let (mut xb, xbeesender, writerthread) = xb::XB::new(ser_reader, ser_writer, opt.initfile);
let mut xbreframer = xbrx::XBReframer::new();
match opt.cmd {
Command::Ping{dest} => {
let dest_u64:u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || ping::genpings(dest_u64, xbeesender).expect("Failure in genpings"));
Command::Ping { dest } => {
let dest_u64: u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || {
ping::genpings(dest_u64, xbeesender).expect("Failure in genpings")
});
ping::displaypongs(&mut xbreframer, &mut xb.ser_reader);
},
}
Command::Pong => {
ping::pong(&mut xbreframer, &mut xb.ser_reader, xbeesender).expect("Failure in pong");
},
Command::Pipe{dest} => {
let dest_u64:u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || pipe::stdout_processor(&mut xbreframer, &mut xb.ser_reader).expect("Failure in stdout_processor"));
}
Command::Pipe { dest } => {
let dest_u64: u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || {
pipe::stdout_processor(&mut xbreframer, &mut xb.ser_reader)
.expect("Failure in stdout_processor")
});
pipe::stdin_processor(dest_u64, 1600, xbeesender).expect("Failure in stdin_processor");
// Make sure queued up data is sent
let _ = writerthread.join();
},
}
}
}

View File

@ -15,15 +15,15 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::io;
use crate::ser::*;
use crate::xb::*;
use crate::xbpacket::*;
use crate::ser::*;
use crate::xbrx::*;
use bytes::*;
use crossbeam_channel;
use std::io;
use std::thread;
use std::time::Duration;
use bytes::*;
const INTERVAL: u64 = 5;
@ -32,7 +32,9 @@ pub fn genpings(dest: u64, sender: crossbeam_channel::Sender<XBTX>) -> io::Resul
loop {
let sendstr = format!("Ping {}", counter);
println!("SEND: {}", sendstr);
sender.send(XBTX::TXData(XBDestAddr::U64(dest), Bytes::from(sendstr))).unwrap();
sender
.send(XBTX::TXData(XBDestAddr::U64(dest), Bytes::from(sendstr)))
.unwrap();
thread::sleep(Duration::from_secs(INTERVAL));
counter += 1;
}
@ -42,19 +44,32 @@ pub fn genpings(dest: u64, sender: crossbeam_channel::Sender<XBTX>) -> io::Resul
pub fn displaypongs(xbreframer: &mut XBReframer, ser: &mut XBSerReader) -> () {
loop {
let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser);
println!("RECV from {}: {}", hex::encode(fromu64.to_be_bytes()), String::from_utf8_lossy(&payload));
println!(
"RECV from {}: {}",
hex::encode(fromu64.to_be_bytes()),
String::from_utf8_lossy(&payload)
);
}
}
/// Reply to pings
pub fn pong(xbreframer: &mut XBReframer, ser: &mut XBSerReader, sender: crossbeam_channel::Sender<XBTX>) -> io::Result<()> {
pub fn pong(
xbreframer: &mut XBReframer,
ser: &mut XBSerReader,
sender: crossbeam_channel::Sender<XBTX>,
) -> io::Result<()> {
loop {
let (fromu64, _addr_16, payload) = xbreframer.rxframe(ser);
if payload.starts_with(b"Ping ") {
println!("RECV from {}: {}", hex::encode(fromu64.to_be_bytes()), String::from_utf8_lossy(&payload));
println!(
"RECV from {}: {}",
hex::encode(fromu64.to_be_bytes()),
String::from_utf8_lossy(&payload)
);
let resp = Bytes::from(format!("Pong {}", String::from_utf8_lossy(&payload[5..])));
sender.send(XBTX::TXData(XBDestAddr::U64(fromu64), resp)).unwrap();
sender
.send(XBTX::TXData(XBDestAddr::U64(fromu64), resp))
.unwrap();
}
}
}

View File

@ -15,17 +15,20 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use std::io;
use std::io::{Read, Write};
use crate::ser::*;
use crate::xb::*;
use crate::xbpacket::*;
use crate::ser::*;
use crate::xbrx::*;
use crossbeam_channel;
use bytes::*;
use crossbeam_channel;
use std::io;
use std::io::{Read, Write};
pub fn stdin_processor(dest: u64, maxframesize: usize,
sender: crossbeam_channel::Sender<XBTX>) -> io::Result<()> {
pub fn stdin_processor(
dest: u64,
maxframesize: usize,
sender: crossbeam_channel::Sender<XBTX>,
) -> io::Result<()> {
let stdin = io::stdin();
let mut br = io::BufReader::new(stdin);
let mut buf = vec![0u8; maxframesize - 1];
@ -38,7 +41,12 @@ pub fn stdin_processor(dest: u64, maxframesize: usize,
return Ok(());
}
sender.send(XBTX::TXData(XBDestAddr::U64(dest), Bytes::copy_from_slice(&buf[0..res]))).unwrap();
sender
.send(XBTX::TXData(
XBDestAddr::U64(dest),
Bytes::copy_from_slice(&buf[0..res]),
))
.unwrap();
}
}

View File

@ -16,18 +16,17 @@
*/
use std::io;
use serialport::prelude::*;
use std::io::{BufReader, BufRead, Write};
use log::*;
use std::time::Duration;
use std::path::PathBuf;
use bytes::*;
use log::*;
use serialport::prelude::*;
use std::io;
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use std::time::Duration;
pub struct XBSerReader {
pub br: BufReader<Box<dyn SerialPort>>,
pub portname: PathBuf,
}
pub struct XBSerWriter {
@ -35,23 +34,30 @@ pub struct XBSerWriter {
pub portname: PathBuf,
}
/// Initialize the serial system, configuring the port.
pub fn new(portname: PathBuf) -> io::Result<(XBSerReader, XBSerWriter)> {
let settings = SerialPortSettings {
baud_rate: 115200, // FIXME: make this configurable, default 9600
data_bits: DataBits::Eight,
flow_control: FlowControl::Hardware,
parity: Parity::None,
stop_bits: StopBits::One,
timeout: Duration::new(60 * 60 * 24 * 365 * 20, 0),
};
let readport = serialport::open_with_settings(&portname, &settings)?;
let writeport = readport.try_clone()?;
Ok((
XBSerReader {br: BufReader::new(readport), portname: portname.clone()},
XBSerWriter {swrite: writeport, portname}))
}
/// Initialize the serial system, configuring the port.
pub fn new(portname: PathBuf) -> io::Result<(XBSerReader, XBSerWriter)> {
let settings = SerialPortSettings {
baud_rate: 115200, // FIXME: make this configurable, default 9600
data_bits: DataBits::Eight,
flow_control: FlowControl::Hardware,
parity: Parity::None,
stop_bits: StopBits::One,
timeout: Duration::new(60 * 60 * 24 * 365 * 20, 0),
};
let readport = serialport::open_with_settings(&portname, &settings)?;
let writeport = readport.try_clone()?;
Ok((
XBSerReader {
br: BufReader::new(readport),
portname: portname.clone(),
},
XBSerWriter {
swrite: writeport,
portname,
},
))
}
impl XBSerReader {
/// Read a line from the port. Return it with EOL characters removed.
@ -61,7 +67,7 @@ impl XBSerReader {
let size = self.br.read_until(0x0D, &mut buf)?;
let buf = String::from_utf8_lossy(&buf);
if size == 0 {
debug!("{:?}: Received EOF from serial port", self.portname);
debug!("{:?}: Received EOF from serial port", self.portname);
Ok(None)
} else {
let buf = String::from(buf.trim());
@ -69,7 +75,6 @@ impl XBSerReader {
Ok(Some(buf))
}
}
}
impl XBSerWriter {
@ -83,5 +88,3 @@ impl XBSerWriter {
self.swrite.flush()
}
}

View File

@ -17,17 +17,17 @@
*/
use crate::ser::*;
use log::*;
use std::fs;
use std::io::{BufRead, BufReader, Error, ErrorKind};
use std::io;
use crate::xbpacket::*;
use bytes::Bytes;
use crossbeam_channel;
use hex;
use std::thread;
use std::time::{Duration};
use log::*;
use std::fs;
use std::io;
use std::io::{BufRead, BufReader, Error, ErrorKind};
use std::path::PathBuf;
use bytes::Bytes;
use crate::xbpacket::*;
use std::thread;
use std::time::Duration;
pub fn mkerror(msg: &str) -> Error {
Error::new(ErrorKind::Other, msg)
@ -60,7 +60,10 @@ pub fn assert_response(resp: String, expected: String) -> io::Result<()> {
if resp == expected {
Ok(())
} else {
Err(mkerror(&format!("Unexpected response: got {}, expected {}", resp, expected)))
Err(mkerror(&format!(
"Unexpected response: got {}, expected {}",
resp, expected
)))
}
}
@ -74,7 +77,11 @@ impl XB {
May panic if an error occurs during initialization.
*/
pub fn new(mut ser_reader: XBSerReader, mut ser_writer: XBSerWriter, initfile: Option<PathBuf>) -> (XB, crossbeam_channel::Sender<XBTX>, thread::JoinHandle<()>) {
pub fn new(
mut ser_reader: XBSerReader,
mut ser_writer: XBSerWriter,
initfile: Option<PathBuf>,
) -> (XB, crossbeam_channel::Sender<XBTX>, thread::JoinHandle<()>) {
// FIXME: make this maximum of 5 configurable
let (writertx, writerrx) = crossbeam_channel::bounded(5);
@ -133,7 +140,6 @@ impl XB {
let maxpacket = ser_reader.readln().unwrap().unwrap();
let maxpacketsize = usize::from(u16::from_str_radix(&maxpacket, 16).unwrap());
// Exit command mode
ser_writer.writeln("ATCN").unwrap();
assert_eq!(ser_reader.readln().unwrap().unwrap(), String::from("OK"));
@ -141,44 +147,50 @@ impl XB {
debug!("Radio configuration complete");
let writerthread = thread::spawn(move || writerthread(ser_writer, maxpacketsize, writerrx));
(XB {
ser_reader,
mymac,
maxpacketsize,
}, writertx, writerthread)
}
(
XB {
ser_reader,
mymac,
maxpacketsize,
},
writertx,
writerthread,
)
}
}
fn writerthread(mut ser: XBSerWriter, maxpacketsize: usize,
writerrx: crossbeam_channel::Receiver<XBTX>) {
fn writerthread(
mut ser: XBSerWriter,
maxpacketsize: usize,
writerrx: crossbeam_channel::Receiver<XBTX>,
) {
for item in writerrx.iter() {
match item {
XBTX::Shutdown => return,
XBTX::TXData(dest, data) => {
// Here we receive a block of data, which hasn't been
// packetized. Packetize it and send out the result.
// Here we receive a block of data, which hasn't been
// packetized. Packetize it and send out the result.
match packetize_data(maxpacketsize, &dest, &data) {
Ok(packets) => {
for packet in packets.into_iter() {
match packet.serialize() {
Ok(datatowrite) => {
trace!("TX to {:?} data {}", &dest, hex::encode(&datatowrite));
ser.swrite.write_all(&datatowrite).unwrap();
ser.swrite.flush().unwrap();
},
Err(e) => {
error!("Serialization error: {:?}", e);
},
};
};
},
Err(e) => {
error!("Packetization error: {}", e);
}
}
match packetize_data(maxpacketsize, &dest, &data) {
Ok(packets) => {
for packet in packets.into_iter() {
match packet.serialize() {
Ok(datatowrite) => {
trace!("TX to {:?} data {}", &dest, hex::encode(&datatowrite));
ser.swrite.write_all(&datatowrite).unwrap();
ser.swrite.flush().unwrap();
}
Err(e) => {
error!("Serialization error: {:?}", e);
}
};
}
}
Err(e) => {
error!("Packetization error: {}", e);
}
}
}
}
}

View File

@ -18,7 +18,7 @@
*/
use bytes::*;
use std::convert::{TryInto, TryFrom};
use std::convert::{TryFrom, TryInto};
use std::fmt;
/** XBee transmissions can give either a 64-bit or a 16-bit destination
@ -30,7 +30,7 @@ pub enum XBDestAddr {
/// The 64-bit destination address. 0xFFFF for broadcast.
/// When a 16-bit destination is given, this will be transmitted as 0xFFFFFFFFFFFFFFFF.
U64(u64)
U64(u64),
}
impl fmt::Debug for XBDestAddr {
@ -40,12 +40,12 @@ impl fmt::Debug for XBDestAddr {
f.write_str("U16(")?;
f.write_str(&hex::encode(x.to_be_bytes()))?;
f.write_str(")")
},
}
XBDestAddr::U64(x) => {
f.write_str("U64(")?;
f.write_str(&hex::encode(x.to_be_bytes()))?;
f.write_str(")")
},
}
}
}
}
@ -54,7 +54,7 @@ impl fmt::Debug for XBDestAddr {
#[derive(Eq, PartialEq, Debug)]
pub enum TXGenError {
/// The payload was an invalid length
InvalidLen
InvalidLen,
}
/** A Digi 64-bit transmit request, frame type 0x10 */
@ -87,7 +87,7 @@ impl XBTXRequest {
// inner parts, then combine them.
let mut fullframe = BytesMut::new();
fullframe.put_u8(0x7e); // Start delimeter
fullframe.put_u8(0x7e); // Start delimeter
let mut innerframe = BytesMut::new();
// Frame type
@ -98,7 +98,7 @@ impl XBTXRequest {
XBDestAddr::U16(dest) => {
innerframe.put_u64(0xFFFFFFFFFFFFFFFFu64);
innerframe.put_u16(dest);
},
}
XBDestAddr::U64(dest) => {
innerframe.put_u64(dest);
innerframe.put_u16(0xFFFEu16);
@ -160,24 +160,29 @@ pub fn mac48to64(mac48: &[u8; 6], pattern64: u64) -> u64 {
We create a leading byte that indicates how many more XBee packets are remaining
for the block. When zero, the receiver should process the accumulated data. */
pub fn packetize_data(maxpacketsize: usize, dest: &XBDestAddr, data: &[u8]) -> Result<Vec<XBTXRequest>, String> {
pub fn packetize_data(
maxpacketsize: usize,
dest: &XBDestAddr,
data: &[u8],
) -> Result<Vec<XBTXRequest>, String> {
let mut retval = Vec::new();
if data.is_empty() {
return Ok(retval);
}
let chunks: Vec<&[u8]> = data.chunks(maxpacketsize - 1).collect();
let mut chunks_remaining: u8 = u8::try_from(chunks.len() - 1).map_err(|e| String::from("More than 255 chunks to transmit"))?;
let mut chunks_remaining: u8 = u8::try_from(chunks.len() - 1)
.map_err(|e| String::from("More than 255 chunks to transmit"))?;
for chunk in chunks {
let mut payload = BytesMut::new();
payload.put_u8(chunks_remaining);
payload.put_slice(chunk);
let packet = XBTXRequest{
let packet = XBTXRequest {
frame_id: 0,
dest_addr: dest.clone(),
broadcast_radius: 0,
transmit_options: 0,
payload: Bytes::from(payload)
payload: Bytes::from(payload),
};
retval.push(packet);
@ -187,7 +192,6 @@ pub fn packetize_data(maxpacketsize: usize, dest: &XBDestAddr, data: &[u8]) -> R
Ok(retval)
}
//////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// RX side

View File

@ -20,11 +20,11 @@
use crate::ser::*;
use crate::xbpacket::*;
use log::*;
use std::io::{Read};
use hex;
use bytes::*;
use hex;
use log::*;
use std::collections::HashMap;
use std::io::Read;
/** Attempts to read a packet from the port. Returns
None if it's not an RX frame, or if there is a checksum mismatch. */
@ -45,8 +45,11 @@ pub fn rxxbpacket(ser: &mut XBSerReader) -> Option<RXPacket> {
}
// OK, got the start delimeter. Log the junk, if any.
if ! junkbytes.is_empty() {
error!("Found start delimeter after reading junk: {}", hex::encode(&junkbytes));
if !junkbytes.is_empty() {
error!(
"Found start delimeter after reading junk: {}",
hex::encode(&junkbytes)
);
junkbytes.clear();
}
@ -81,8 +84,18 @@ pub fn rxxbpacket(ser: &mut XBSerReader) -> Option<RXPacket> {
let sender_addr16 = inner.get_u16();
let rx_options = inner.get_u8();
let payload = inner.to_bytes();
trace!("SERIN: packet from {} / {}, payload {}", hex::encode(sender_addr64.to_be_bytes()), hex::encode(sender_addr16.to_be_bytes()), hex::encode(&payload));
Some(RXPacket {sender_addr64, sender_addr16, rx_options, payload})
trace!(
"SERIN: packet from {} / {}, payload {}",
hex::encode(sender_addr64.to_be_bytes()),
hex::encode(sender_addr16.to_be_bytes()),
hex::encode(&payload)
);
Some(RXPacket {
sender_addr64,
sender_addr16,
rx_options,
payload,
})
}
/// Like rxxbpacket, but wait until we have a valid packet.