This commit is contained in:
John Goerzen 2020-09-20 17:48:07 -05:00
parent b73b556f2a
commit c7c90a8ff3
4 changed files with 142 additions and 246 deletions

1
Cargo.lock generated
View File

@ -980,6 +980,7 @@ dependencies = [
name = "xbnet"
version = "1.1.0"
dependencies = [
"bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
"crossbeam-channel 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
"format_escape_default 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -32,3 +32,4 @@ format_escape_default = "0.1.1"
structopt = "0.3"
tun-tap = "0.1.2"
rustbee = "0.1.1"
bytes = "0.5"

108
src/txpacket.rs Normal file
View File

@ -0,0 +1,108 @@
/*! XBee packet transmission */
/*
Copyright (C) 2020 John Goerzen <jgoerzen@complete.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
use bytes::*;
/** XBee transmissions can give either a 64-bit or a 16-bit destination
address. This permits the user to select one. */
#[derive(Eq, PartialEq, Debug)]
pub enum XBDestAddr {
/// A 16-bit destination address. When a 64-bit address is given, this is transmitted as 0xFFFE.
U16(u16),
/// The 64-bit destination address. 0xFFFF for broadcast.
/// When a 16-bit destination is given, this will be transmitted as 0xFFFFFFFFFFFFFFFF.
U64(u64)
}
/** Possible errors from serialization */
#[derive(Eq, PartialEq, Debug)]
pub enum TXGenError {
/// The payload was an invalid length
InvalidLen
}
/** A Digi 64-bit transmit request, frame type 0x10 */
#[derive(Eq, PartialEq, Debug)]
pub struct XBTXRequest<'a> {
/// The frame ID, which will be returned in the subsequent response frame.
/// Set to 0 to disable a response for this transmission.
pub frame_id: u8,
/// The destination address
pub dest_addr: XBDestAddr,
/// The number of hops a broadcast transmission can traverse. When 0, the value if NH is used.
pub broadcast_radius: u8,
/// Transmit options bitfield. When 0, uses the TO setting.
pub transmit_options: u8,
/// The payload
pub payload: &'a [u8],
}
impl XBTXRequest {
pub fn serialize(&self) -> Result<Bytes, TXGenError> {
if self.payload.is_empty() {
return Err(TXGenError::InvalidLen);
}
// We generate the bits that are outside the length & checksum parts, then the
// inner parts, then combine them.
let mut fullframe = BytesMut::new();
fullframe.put_u8(0x7e); // Start delimeter
let mut innerframe = BytesMut::new();
// Frame type
innerframe.put_u8(0x10);
innerframe.put_u8(self.frame_id);
match self.dest_addr {
XBDestAddr::U16(dest) => {
innerframe.put_u64(0xFFFFFFFFFFFFFFFFu64);
innerframe.put_u16(dest);
},
XBDestAddr::U64(dest) => {
innerframe.put_u64(dest);
innerframe.put_u16(0xFFFEu16);
}
};
innerframe.put_u8(self.broadcast_radius);
innerframe.put_u8(self.transmit_options);
innerframe.put_slice(self.payload);
// That's it for the inner frame. Now fill in the outer frame.
if let Some(lenu16) = u16::try_from(self.payload.len()) {
fullframe.put_u16(lenu16);
fullframe.put_slice(self.innerframe);
fullframe.put_u8(xbchecksum(self.innerframe));
} else {
Err(TXGenError::InvalidLen)
}
}
}
/// Calculate an XBee checksum over a slice
pub fn xbchecksum(data: &[u8]) -> u8 {
let sumu64 = data.into_iter().map(|x| u64::from(x)).sum();
0xffu8 - (sumu64 as u8)
}

278
src/xb.rs
View File

@ -1,5 +1,5 @@
/*
Copyright (C) 2019 John Goerzen <jgoerzen@complete.org
Copyright (C) 2019-2020 John Goerzen <jgoerzen@complete.org
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -114,15 +114,40 @@ impl XB {
/// as well as a separate receiver to be used in a separate thread to handle
/// incoming frames. The bool specifies whether or not to read the quality
/// parameters after a read.
pub fn new(ser: XBSer, readqual: bool, txwait: u64, eotwait: u64, maxpacketsize: usize, pack: bool, txslot: u64) -> (XB, crossbeam_channel::Receiver<ReceivedFrames>) {
let (readerlinestx, readerlinesrx) = crossbeam_channel::unbounded();
let (txblockstx, txblocksrx) = crossbeam_channel::bounded(2);
let (readeroutput, readeroutputreader) = crossbeam_channel::unbounded();
pub fn new(ser: XBSer) -> (XB, crossbeam_channel::Receiver<ReceivedFrames>) {
debug!("Configuring radio");
thread::sleep(Duration::from_msecs(1100));
ser.swrite.lock().unwrap().write_all(b"+++")?;
ser.swrite.lock().unwrap().flush();
assert_response(ser.readln()?, "OK");
// Enter API mode
ser.writeln("ATAP 1")?;
assert_response(ser.readln()?, "OK");
// Standard API output mode
ser.writeln("ATAO 0")?;
assert_response(ser.readln()?, "OK");
// Get our own MAC address
ser.writeln("ATSH")?;
let serialhigh = ser.readln()?;
ser.writeln("ATSL")?;
let seriallow = ser.readln()?;
// Get maximum packet size
ser.writeln("ATNP")?;
let maxpacket = ser.readln()?;
// Exit command mode
ser.writeln("ATCN")?;
assert_response(ser.readln()?, "OK");
let ser2 = ser.clone();
thread::spawn(move || readerlinesthread(ser2, readerlinestx));
(XB { readqual, ser, readeroutput, readerlinesrx, txblockstx, txblocksrx, maxpacketsize, pack,
txdelay: None,
txwait: Duration::from_millis(txwait),
@ -134,245 +159,6 @@ impl XB {
extradata: vec![]}, readeroutputreader)
}
/// Utility to read the response from initialization
fn initresp(&mut self) -> io::Result<()> {
let line = self.readerlinesrx.recv().unwrap();
if line == "invalid_param" {
Err(mkerror("Bad response from radio during initialization"))
} else {
Ok(())
}
}
pub fn radiocfg(&mut self, initfile: Option<PathBuf>) -> io::Result<()> {
// First, send it an invalid command. Then, consume everything it sends back
self.ser.writeln(String::from("INVALIDCOMMAND"))?;
// Give it a chance to do its thing.
thread::sleep(Duration::from_secs(1));
// Consume all data.
while let Ok(_) = self.readerlinesrx.try_recv() {
}
debug!("Configuring radio");
let default = vec![
"sys get ver",
"mac reset",
"mac pause",
"radio get mod",
"radio get freq",
"radio get pwr",
"radio get sf",
"radio get bw",
"radio get cr",
"radio get wdt",
"radio set pwr 20",
"radio set sf sf12",
"radio set bw 125",
"radio set cr 4/5",
"radio set wdt 60000"];
let initlines: Vec<String> = if let Some(file) = initfile {
let f = fs::File::open(file)?;
let reader = BufReader::new(f);
reader.lines().map(|l| l.unwrap()).collect()
} else {
default.iter().map(|l| String::from(*l)).collect()
};
for line in initlines {
if line.len() > 0 {
self.ser.writeln(line)?;
self.initresp()?;
}
}
Ok(())
}
/// Utililty function to handle actual sending. Assumes radio is idle.
fn dosend(&mut self, data: Vec<u8>) -> io::Result<()> {
let mut tosend = vec![];
tosend.append(&mut self.extradata); // drains self.extradata!
tosend.append(&mut data.clone());
let mut data = tosend; // hide the original 'data'
if data.len() > self.maxpacketsize {
self.extradata = data.split_off(self.maxpacketsize);
}
while data.len() < self.maxpacketsize && self.extradata.is_empty() {
// Consider the next packet - maybe we can combine it with this one.
let r = self.txblocksrx.try_recv();
match r {
Ok(mut next) => {
if self.pack {
// Try to fill up the frame.
data.append(&mut next);
if data.len() > self.maxpacketsize {
// Too much; put the extra into extradata.
self.extradata = data.split_off(self.maxpacketsize);
break; // for clarity only -- would exit the loop anyhow
}
} else {
// Only append the extra if it will fit entirely in the frame.
if data.len() + next.len() <= self.maxpacketsize {
data.append(&mut next);
} else {
self.extradata.append(&mut next);
break; // for clarity only -- would exit the loop anyhow
}
}
},
Err(e) => {
if e.is_disconnected() {
// other threads crashed
r.unwrap();
}
// Otherwise - nothing to do
break;
}
}
}
let mut flag: u8 = 0;
// Give receiver a change to process.
thread::sleep(self.txwait);
if (!self.txblocksrx.is_empty()) || (!self.extradata.is_empty()) {
// If there will be more data to send..
flag = 1;
// See if we need to signal the other end's turn.
match (self.txslotend, self.txslot) {
(None, Some(txslot)) => self.txslotend = Some(Instant::now() + txslot),
(Some(txslotend), _) =>
if Instant::now() > txslotend {
debug!("txslot exceeded; setting txdelay and sending flag 2");
flag = 2;
self.txdelay = Some(Instant::now() + self.eotwait);
self.txslotend = None;
},
_ => ()
}
} else {
self.txslotend = None;
}
// Now, send the mesage.
let txstr = format!("radio tx {}{}", hex::encode([flag]), hex::encode(data));
self.ser.writeln(txstr)?;
// We get two responses from this.... though sometimes a lingering radio_err also.
let mut resp = self.readerlinesrx.recv().unwrap();
if resp == String::from("radio_err") {
resp = self.readerlinesrx.recv().unwrap();
}
assert_response(resp, String::from("ok"))?;
// Second.
self.readerlinesrx.recv().unwrap(); // normally radio_tx_ok
Ok(())
}
// Receive a message from the incoming radio channel and process it.
fn handlerx(&mut self, msg: String, readqual: bool) -> io::Result<()> {
if msg.starts_with("radio_rx ") {
if let Ok(mut decoded) = hex::decode(&msg.as_bytes()[10..]) {
trace!("DECODED: {}", format_escape_default(&decoded));
let radioqual = if readqual {
self.ser.writeln(String::from("radio get snr"))?;
let snr = self.readerlinesrx.recv().unwrap();
self.ser.writeln(String::from("radio get rssi"))?;
let rssi = self.readerlinesrx.recv().unwrap();
Some((snr, rssi))
} else {
None
};
let flag = decoded.remove(0); // Remove the flag from the vec
if flag == 1 {
// More data is coming
self.txdelay = Some(Instant::now() + self.eotwait);
} else {
self.txdelay = None;
}
debug!("handlerx: txdelay set to {:?}", self.txdelay);
self.readeroutput.send(ReceivedFrames(decoded, radioqual)).unwrap();
if flag == 2 && self.txslot != None {
// Other end has more data, but it giving us a chance to transmit.
// Need to immediately send something. dosend() will pick up
// self.extradata or self.txblocksrx to fill up the frame if it can.
self.dosend(vec![])?;
}
} else {
return Err(mkerror("Error with hex decoding"));
}
}
// Might get radio_err here. That's harmless.
Ok(())
}
// Whether or not a txdelay prevents transmit at this time. None if
// we are cleared to transmit; Some(Duration) gives the amount of time
// we'd have to wait otherwise.
fn txdelayrequired(&mut self) -> Option<Duration> {
debug!("txdelayrequired: self.txdelay = {:?}", self.txdelay);
match self.txdelay {
None => None,
Some(delayend) => {
let now = Instant::now();
if now >= delayend {
// We're past the delay. Clear it and return.
debug!("txdelayrequired: {:?} past the required delay {:?}", now, delayend);
self.txdelay = None;
None
} else {
// Indicate we're still blocked.
debug!("txdelayrequired: required delay {:?}", delayend - now);
Some(delayend - now)
}
}
}
}
fn enterrxmode(&mut self) -> io::Result<()> {
// Enter read mode
self.ser.writeln(String::from("radio rx 0"))?;
let mut response = self.readerlinesrx.recv().unwrap();
// For some reason, sometimes we get a radio_err here, then an OK. Ignore it.
if response == String::from("radio_err") {
response = self.readerlinesrx.recv().unwrap();
}
assert_response(response, String::from("ok"))?;
Ok(())
}
fn rxstop(&mut self) -> io::Result<()> {
self.ser.writeln(String::from("radio rxstop"))?;
let checkresp = self.readerlinesrx.recv().unwrap();
if checkresp.starts_with("radio_rx ") {
// We had a race. A packet was coming in. Decode and deal with it,
// then look for the 'ok' from rxstop. We can't try to read the quality in
// this scenario.
self.handlerx(checkresp, false)?;
self.readerlinesrx.recv().unwrap(); // used to pop this into checkresp, but no need now.
}
// Now, checkresp should hold 'ok'.
// It might not be; I sometimes see radio_err here. it's OK too.
// assert_response(checkresp, String::from("ok"))?;
Ok(())
}
pub fn mainloop(&mut self) -> io::Result<()> {
loop {
// First, check to see if we're allowed to transmit. If not, just