From 3ff3430f0103174db1dfe3c3e454898dd3525db5 Mon Sep 17 00:00:00 2001 From: John Goerzen Date: Sun, 20 Sep 2020 19:59:08 -0500 Subject: [PATCH] checkpointing --- src/main.rs | 71 +++++------------------- src/ping.rs | 16 +++--- src/xbpacket.rs | 14 +++++ src/xbrx.rs | 141 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 176 insertions(+), 66 deletions(-) create mode 100644 src/xbrx.rs diff --git a/src/main.rs b/src/main.rs index aaf3e39..f338efe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,45 +38,10 @@ struct Opt { #[structopt(short, long)] debug: bool, - /// Read and log quality data after receiving packets - #[structopt(long)] - readqual: bool, - - /// Pack as many bytes as possible into each TX frame, regardless of original framing - #[structopt(long)] - pack: bool, - /// Radio initialization command file #[structopt(long, parse(from_os_str))] initfile: Option, - /// Maximum frame size sent to radio [10..250] (valid only for ping and kiss) - #[structopt(long, default_value = "100")] - maxpacketsize: usize, - - /// Maximum time to transmit at once before giving a chance to receive (in ms). 0=infinite - #[structopt(long, default_value = "0")] - txslot: u64, - - /// Amount of time (ms) to pause before transmitting a packet - /* The - main purpose of this is to give the othe rradio a chance to finish - decoding the previous packet, send it to the OS, and re-enter RX mode. - A secondary purpose is to give the duplex logic a chance to see if - anything else is coming in. Given in ms. - */ - #[structopt(long, default_value = "120")] - txwait: u64, - - /// Amount of time (ms) to wait for end-of-transmission signal before transmitting - /* The amount of time to wait before transmitting after receiving a - packet that indicated more data was forthcoming. The purpose of this is - to compensate for a situation in which the "last" incoming packet was lost, - to prevent the receiver from waiting forever for more packets before - transmitting. Given in ms. */ - #[structopt(long, default_value = "1000")] - eotwait: u64, - #[structopt(parse(from_os_str))] /// Serial port to use to communicate with radio port: PathBuf, @@ -87,14 +52,14 @@ struct Opt { #[derive(Debug, StructOpt)] enum Command { - /// Pipe data across raios - Pipe, /// Transmit ping requests - Ping, + Ping { + /// The 64-bit destination for the ping, in hex + #[structopt(long)] + dest: String, + }, /// Receive ping requests and transmit pongs Pong, - /// Pipe KISS data across the radios - Kiss, } fn main() { @@ -107,29 +72,19 @@ fn main() { let maxpacketsize = opt.maxpacketsize; - let loraser = ser::LoraSer::new(opt.port).expect("Failed to initialize serial port"); - let (mut ls, radioreceiver) = lorastik::LoraStik::new(loraser, opt.readqual, opt.txwait, opt.eotwait, maxpacketsize, opt.pack, opt.txslot); - ls.radiocfg(opt.initfile).expect("Failed to configure radio"); + let xbser = ser::XBSer::new(opt.port).expect("Failed to initialize serial port"); + let (mut xb, xbeesender) = xb::XB::new(xbser, opt.initfile); + let mut xbreframer = XBReframer::new(); - let mut ls2 = ls.clone(); - thread::spawn(move || ls2.mainloop().expect("Failure in readerthread")); match opt.cmd { - Command::Pipe => { - thread::spawn(move || pipe::stdintolora(&mut ls).expect("Failure in stdintolora")); - pipe::loratostdout(radioreceiver).expect("Failure in loratostdout"); - }, - Command::Kiss => { - thread::spawn(move || kiss::stdintolorakiss(&mut ls).expect("Failure in stdintolorakiss")); - kiss::loratostdout(radioreceiver).expect("Failure in loratostdout"); - }, - Command::Ping => { - thread::spawn(move || ping::genpings(&mut ls).expect("Failure in genpings")); - pipe::loratostdout(radioreceiver).expect("Failure in loratostdout"); + Command::Ping(p) => { + let dest_u64 = hex::decode(p.dest).unwrap(); + thread::spawn(move || ping::genpings(dest_u64, xbeesender).expect("Failure in genpings")); + xbreframer.discardframes(&xb.ser); }, Command::Pong => { - ping::pong(&mut ls, radioreceiver).expect("Failure in loratostdout"); + ping::pong(&mut xbreframer, &xb.ser).expect("Failure in loratostdout"); } } - } diff --git a/src/ping.rs b/src/ping.rs index 7b9874d..a281f29 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -24,24 +24,24 @@ use std::time::Duration; const INTERVAL: u64 = 5; -pub fn genpings(ls: &mut LoraStik) -> io::Result<()> { +pub fn genpings(dest: u64, sender: crossbeam_channel::Sender<(XBDestAddr, Bytes)>) -> io::Result<()> { let mut counter: u64 = 1; loop { let sendstr = format!("Ping {}", counter); println!("SEND: {}", sendstr); - ls.transmit(&sendstr.as_bytes()); + sender.send((dest, Bytes::from(sendstr.as_bytes()))); thread::sleep(Duration::from_secs(INTERVAL)); counter += 1; } } /// Reply to pings -pub fn pong(ls: &mut LoraStik, receiver: crossbeam_channel::Receiver) -> io::Result<()> { +pub fn pong(xbreframer: &mut XBReframer, ser: &XBSer, sender: crossbeam_channel::Sender<(XBDestAddr, Bytes)>) -> io::Result<()> { loop { - let data = receiver.recv().unwrap(); - let resp = format!("Pong {}, {:?}", String::from_utf8_lossy(&data.0), data.1); - println!("SEND: {}", resp); - ls.transmit(resp.as_bytes()); + let (addr_64, addr_16, payload) = xbreframer.rxframe(ser); + if payload.starts_with(b"Ping ") { + let resp = format!("Pong {}", Strimg::from_utf8_lossy(payload[5..])); + sender.send((XBDestAddr::U64(addr_64), Bytes::from(resp.as_bytes()))); + } } } - diff --git a/src/xbpacket.rs b/src/xbpacket.rs index 0826f64..b67ee7b 100644 --- a/src/xbpacket.rs +++ b/src/xbpacket.rs @@ -167,3 +167,17 @@ pub fn packetize_data(maxpacketsize: usize, dest: XBDestAddr, data: &[u8]) -> Re Ok(retval) } + + +////////////////////////////////////////////////////////////////////// +////////////////////////////////////////////////////////////////////// +// RX side + +/** A Digi receive packet, 0x90 */ +#[derive(PartialEq, Eq, Ord, Debug)] +pub struct RXPacket { + sender_addr64: u64, + sender_addr16: u16, + rx_options: u8, + payload: Bytes, +} diff --git a/src/xbrx.rs b/src/xbrx.rs new file mode 100644 index 0000000..0202c81 --- /dev/null +++ b/src/xbrx.rs @@ -0,0 +1,141 @@ +/*! Receiving data from XBee */ + +/* + Copyright (C) 2019-2020 John Goerzen . + +*/ + +use crate::ser::XBSer; +use log::*; +use std::fs; +use std::io::{BufRead, BufReader, Error, ErrorKind}; +use std::io; +use crossbeam_channel; +use hex; +use std::thread; +use std::time::{Duration, Instant}; +use format_escape_default::format_escape_default; +use std::path::PathBuf; +use bytes::*; +use std::collections::HashMap; + +/** Attempts to read a packet from the port. Returns +None if it's not an RX frame, or if there is a checksum mismatch. */ +pub fn rxxbpacket(ser: &XBSer) -> Option { + let mut junkbytes = BytesMut::new(); + let serport = ser.br.lock().unwrap(); + loop { + let mut startdelim = [0u8; 1]; + serport.read_exact(&mut startdelim).unwrap(); + if startdelim[0] != 0x7e { + if junkbytes.is_empty() { + error!("Receiving junk"); + } + + junkbytes.push(startdelim); + } + } + + // OK, got the start delimeter. Log the junk, if any. + if ! junkbytes.is_empty() { + error!("Found start delimeter after reading junk: {}", hex::encode(junkbytes)); + junkbytes.clear(); + } + + // Read the length. + + let mut lenbytes = [0u8; 2]; + serport.read_exact(&mut lenbytes).unwrap(); + let length = usize::from(u16::from_be_bytes(lenbytes)); + + // Now read the rest of the frame. + let mut inner = [0u8; length]; + + serport.read_exact(&mut inner).unwrap(); + + // And the checksum. + let mut checksum = [0u8; 1]; + serport.read_exact(&mut checksum).unwrap(); + + if xbchecksum(&inner) != checksum[0] { + error!("SERIN: Checksum mismatch; data: {}", hex::encode(inner)); + return None; + } + + let inner = Bytes::from(inner); + let frametype = inner.get_u8(); + if frametype != 0x90 { + debug!("SERIN: Non-0x90 frame; data: {}", hex::encode(inner)); + return None; + } + + let sender_addr64 = inner.get_u64(); + let sender_addr16 = inner.get_u16(); + let sender_rxoptions = inner.get_u8(); + let payload = inner.to_bytes(); + trace!("SERIN: packet from {} / {}, payload {}", hex::encode(sender_addr64), hex::encode(sender_addr16), hex::encode(payload)); + Some(RXPacket {sender_addr64, sender_addr16, rx_options, payload}) +} + +/// Like rxxbpacket, but wait until we have a valid packet. +pub fn rxxbpacket_wait(ser: &XBSer) -> RXPacket { + loop { + if let Some(packet) = rxxbpacket(ser) { + return packet; + } + } +} + +/// Receives XBee packets, recomposes into larger frames. +pub struct XBReframer { + buf: HashMap, +} + +/** Receive a frame that may have been split up into multiple XBee frames. Reassemble +as needed and return when we've got something that can be returned. */ +impl XBReframer { + pub fn new() -> Self { + XBReframer { + buf: HashMap::new(), + } + } + + /// Receive a frame. Indicate the sender (u64, u16) and payload. + pub fn rxframe(&mut self, ser: &XBSer) -> (u64, u16, Bytes) { + loop { + let packet = rxxbpacket_wait(); + let mut frame = if let Some(olddata) = self.buf.get(packet.sender_addr64) { + olddata + } else { + BytesMut::new() + } + + frame.extend_from_slice(packet.payload[1..]); + if packet.payload[0] == 0x0 { + self.buf.remove(packet.sender_addr64); + return (packet.sender_addr64, packet.sender_addr16, frame); + } else { + self.buf.insert(packet.sender_addr64, frame); + } + } + } + + pub fn discardframes(&mut self, ser: &XBSer) -> () { + loop { + let _ = self.rxframe(ser); + } + } +}