checkpointing

This commit is contained in:
John Goerzen 2020-09-20 19:59:08 -05:00
parent 00a104a8c9
commit 3ff3430f01
4 changed files with 176 additions and 66 deletions

View File

@ -38,45 +38,10 @@ struct Opt {
#[structopt(short, long)]
debug: bool,
/// Read and log quality data after receiving packets
#[structopt(long)]
readqual: bool,
/// Pack as many bytes as possible into each TX frame, regardless of original framing
#[structopt(long)]
pack: bool,
/// Radio initialization command file
#[structopt(long, parse(from_os_str))]
initfile: Option<PathBuf>,
/// Maximum frame size sent to radio [10..250] (valid only for ping and kiss)
#[structopt(long, default_value = "100")]
maxpacketsize: usize,
/// Maximum time to transmit at once before giving a chance to receive (in ms). 0=infinite
#[structopt(long, default_value = "0")]
txslot: u64,
/// Amount of time (ms) to pause before transmitting a packet
/* The
main purpose of this is to give the othe rradio a chance to finish
decoding the previous packet, send it to the OS, and re-enter RX mode.
A secondary purpose is to give the duplex logic a chance to see if
anything else is coming in. Given in ms.
*/
#[structopt(long, default_value = "120")]
txwait: u64,
/// Amount of time (ms) to wait for end-of-transmission signal before transmitting
/* The amount of time to wait before transmitting after receiving a
packet that indicated more data was forthcoming. The purpose of this is
to compensate for a situation in which the "last" incoming packet was lost,
to prevent the receiver from waiting forever for more packets before
transmitting. Given in ms. */
#[structopt(long, default_value = "1000")]
eotwait: u64,
#[structopt(parse(from_os_str))]
/// Serial port to use to communicate with radio
port: PathBuf,
@ -87,14 +52,14 @@ struct Opt {
#[derive(Debug, StructOpt)]
enum Command {
/// Pipe data across raios
Pipe,
/// Transmit ping requests
Ping,
Ping {
/// The 64-bit destination for the ping, in hex
#[structopt(long)]
dest: String,
},
/// Receive ping requests and transmit pongs
Pong,
/// Pipe KISS data across the radios
Kiss,
}
fn main() {
@ -107,29 +72,19 @@ fn main() {
let maxpacketsize = opt.maxpacketsize;
let loraser = ser::LoraSer::new(opt.port).expect("Failed to initialize serial port");
let (mut ls, radioreceiver) = lorastik::LoraStik::new(loraser, opt.readqual, opt.txwait, opt.eotwait, maxpacketsize, opt.pack, opt.txslot);
ls.radiocfg(opt.initfile).expect("Failed to configure radio");
let xbser = ser::XBSer::new(opt.port).expect("Failed to initialize serial port");
let (mut xb, xbeesender) = xb::XB::new(xbser, opt.initfile);
let mut xbreframer = XBReframer::new();
let mut ls2 = ls.clone();
thread::spawn(move || ls2.mainloop().expect("Failure in readerthread"));
match opt.cmd {
Command::Pipe => {
thread::spawn(move || pipe::stdintolora(&mut ls).expect("Failure in stdintolora"));
pipe::loratostdout(radioreceiver).expect("Failure in loratostdout");
},
Command::Kiss => {
thread::spawn(move || kiss::stdintolorakiss(&mut ls).expect("Failure in stdintolorakiss"));
kiss::loratostdout(radioreceiver).expect("Failure in loratostdout");
},
Command::Ping => {
thread::spawn(move || ping::genpings(&mut ls).expect("Failure in genpings"));
pipe::loratostdout(radioreceiver).expect("Failure in loratostdout");
Command::Ping(p) => {
let dest_u64 = hex::decode(p.dest).unwrap();
thread::spawn(move || ping::genpings(dest_u64, xbeesender).expect("Failure in genpings"));
xbreframer.discardframes(&xb.ser);
},
Command::Pong => {
ping::pong(&mut ls, radioreceiver).expect("Failure in loratostdout");
ping::pong(&mut xbreframer, &xb.ser).expect("Failure in loratostdout");
}
}
}

View File

@ -24,24 +24,24 @@ use std::time::Duration;
const INTERVAL: u64 = 5;
pub fn genpings(ls: &mut LoraStik) -> io::Result<()> {
pub fn genpings(dest: u64, sender: crossbeam_channel::Sender<(XBDestAddr, Bytes)>) -> io::Result<()> {
let mut counter: u64 = 1;
loop {
let sendstr = format!("Ping {}", counter);
println!("SEND: {}", sendstr);
ls.transmit(&sendstr.as_bytes());
sender.send((dest, Bytes::from(sendstr.as_bytes())));
thread::sleep(Duration::from_secs(INTERVAL));
counter += 1;
}
}
/// Reply to pings
pub fn pong(ls: &mut LoraStik, receiver: crossbeam_channel::Receiver<ReceivedFrames>) -> io::Result<()> {
pub fn pong(xbreframer: &mut XBReframer, ser: &XBSer, sender: crossbeam_channel::Sender<(XBDestAddr, Bytes)>) -> io::Result<()> {
loop {
let data = receiver.recv().unwrap();
let resp = format!("Pong {}, {:?}", String::from_utf8_lossy(&data.0), data.1);
println!("SEND: {}", resp);
ls.transmit(resp.as_bytes());
let (addr_64, addr_16, payload) = xbreframer.rxframe(ser);
if payload.starts_with(b"Ping ") {
let resp = format!("Pong {}", Strimg::from_utf8_lossy(payload[5..]));
sender.send((XBDestAddr::U64(addr_64), Bytes::from(resp.as_bytes())));
}
}
}

View File

@ -167,3 +167,17 @@ pub fn packetize_data(maxpacketsize: usize, dest: XBDestAddr, data: &[u8]) -> Re
Ok(retval)
}
//////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////
// RX side
/** A Digi receive packet, 0x90 */
#[derive(PartialEq, Eq, Ord, Debug)]
pub struct RXPacket {
sender_addr64: u64,
sender_addr16: u16,
rx_options: u8,
payload: Bytes,
}

141
src/xbrx.rs Normal file
View File

@ -0,0 +1,141 @@
/*! Receiving data from XBee */
/*
Copyright (C) 2019-2020 John Goerzen <jgoerzen@complete.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::ser::XBSer;
use log::*;
use std::fs;
use std::io::{BufRead, BufReader, Error, ErrorKind};
use std::io;
use crossbeam_channel;
use hex;
use std::thread;
use std::time::{Duration, Instant};
use format_escape_default::format_escape_default;
use std::path::PathBuf;
use bytes::*;
use std::collections::HashMap;
/** Attempts to read a packet from the port. Returns
None if it's not an RX frame, or if there is a checksum mismatch. */
pub fn rxxbpacket(ser: &XBSer) -> Option<RXPacket> {
let mut junkbytes = BytesMut::new();
let serport = ser.br.lock().unwrap();
loop {
let mut startdelim = [0u8; 1];
serport.read_exact(&mut startdelim).unwrap();
if startdelim[0] != 0x7e {
if junkbytes.is_empty() {
error!("Receiving junk");
}
junkbytes.push(startdelim);
}
}
// OK, got the start delimeter. Log the junk, if any.
if ! junkbytes.is_empty() {
error!("Found start delimeter after reading junk: {}", hex::encode(junkbytes));
junkbytes.clear();
}
// Read the length.
let mut lenbytes = [0u8; 2];
serport.read_exact(&mut lenbytes).unwrap();
let length = usize::from(u16::from_be_bytes(lenbytes));
// Now read the rest of the frame.
let mut inner = [0u8; length];
serport.read_exact(&mut inner).unwrap();
// And the checksum.
let mut checksum = [0u8; 1];
serport.read_exact(&mut checksum).unwrap();
if xbchecksum(&inner) != checksum[0] {
error!("SERIN: Checksum mismatch; data: {}", hex::encode(inner));
return None;
}
let inner = Bytes::from(inner);
let frametype = inner.get_u8();
if frametype != 0x90 {
debug!("SERIN: Non-0x90 frame; data: {}", hex::encode(inner));
return None;
}
let sender_addr64 = inner.get_u64();
let sender_addr16 = inner.get_u16();
let sender_rxoptions = inner.get_u8();
let payload = inner.to_bytes();
trace!("SERIN: packet from {} / {}, payload {}", hex::encode(sender_addr64), hex::encode(sender_addr16), hex::encode(payload));
Some(RXPacket {sender_addr64, sender_addr16, rx_options, payload})
}
/// Like rxxbpacket, but wait until we have a valid packet.
pub fn rxxbpacket_wait(ser: &XBSer) -> RXPacket {
loop {
if let Some(packet) = rxxbpacket(ser) {
return packet;
}
}
}
/// Receives XBee packets, recomposes into larger frames.
pub struct XBReframer {
buf: HashMap<u64, BytesMut>,
}
/** Receive a frame that may have been split up into multiple XBee frames. Reassemble
as needed and return when we've got something that can be returned. */
impl XBReframer {
pub fn new() -> Self {
XBReframer {
buf: HashMap::new(),
}
}
/// Receive a frame. Indicate the sender (u64, u16) and payload.
pub fn rxframe(&mut self, ser: &XBSer) -> (u64, u16, Bytes) {
loop {
let packet = rxxbpacket_wait();
let mut frame = if let Some(olddata) = self.buf.get(packet.sender_addr64) {
olddata
} else {
BytesMut::new()
}
frame.extend_from_slice(packet.payload[1..]);
if packet.payload[0] == 0x0 {
self.buf.remove(packet.sender_addr64);
return (packet.sender_addr64, packet.sender_addr16, frame);
} else {
self.buf.insert(packet.sender_addr64, frame);
}
}
}
pub fn discardframes(&mut self, ser: &XBSer) -> () {
loop {
let _ = self.rxframe(ser);
}
}
}