use packet id

This commit is contained in:
John Goerzen 2020-09-21 00:00:08 -05:00
parent e80f891340
commit 87549ba45b
4 changed files with 113 additions and 52 deletions

View File

@ -100,11 +100,13 @@ fn main() {
} }
Command::Pipe { dest } => { Command::Pipe { dest } => {
let dest_u64: u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination"); let dest_u64: u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
let maxpacketsize = xb.maxpacketsize;
thread::spawn(move || { thread::spawn(move || {
pipe::stdout_processor(&mut xbreframer, &mut xb.ser_reader) pipe::stdout_processor(&mut xbreframer, &mut xb.ser_reader)
.expect("Failure in stdout_processor") .expect("Failure in stdout_processor")
}); });
pipe::stdin_processor(dest_u64, 1600, xbeesender).expect("Failure in stdin_processor"); pipe::stdin_processor(dest_u64, maxpacketsize - 1, xbeesender)
.expect("Failure in stdin_processor");
// Make sure queued up data is sent // Make sure queued up data is sent
let _ = writerthread.join(); let _ = writerthread.join();
} }

View File

@ -165,6 +165,7 @@ fn writerthread(
maxpacketsize: usize, maxpacketsize: usize,
writerrx: crossbeam_channel::Receiver<XBTX>, writerrx: crossbeam_channel::Receiver<XBTX>,
) { ) {
let mut packetstream = PacketStream::new();
for item in writerrx.iter() { for item in writerrx.iter() {
match item { match item {
XBTX::Shutdown => return, XBTX::Shutdown => return,
@ -172,12 +173,17 @@ fn writerthread(
// Here we receive a block of data, which hasn't been // Here we receive a block of data, which hasn't been
// packetized. Packetize it and send out the result. // packetized. Packetize it and send out the result.
match packetize_data(maxpacketsize, &dest, &data) { match packetstream.packetize_data(maxpacketsize, &dest, &data) {
Ok(packets) => { Ok(packets) => {
for packet in packets.into_iter() { for packet in packets.into_iter() {
match packet.serialize() { match packet.serialize() {
Ok(datatowrite) => { Ok(datatowrite) => {
trace!("TX to {:?} data {}", &dest, hex::encode(&datatowrite)); trace!(
"TX ID {} to {:?} data {}",
packet.frame_id,
&dest,
hex::encode(&datatowrite)
);
ser.swrite.write_all(&datatowrite).unwrap(); ser.swrite.write_all(&datatowrite).unwrap();
ser.swrite.flush().unwrap(); ser.swrite.flush().unwrap();
} }

View File

@ -156,15 +156,36 @@ pub fn mac48to64(mac48: &[u8; 6], pattern64: u64) -> u64 {
mac64 mac64
} }
/** Convert the given data into zero or more packets for transmission. pub struct PacketStream {
/// The counter for the frame
framecounter: u8,
}
We create a leading byte that indicates how many more XBee packets are remaining impl PacketStream {
for the block. When zero, the receiver should process the accumulated data. */ pub fn new() -> Self {
pub fn packetize_data( PacketStream { framecounter: 1 }
}
pub fn get_and_incr_framecounter(&mut self) -> u8 {
let retval = self.framecounter;
if self.framecounter == std::u8::MAX {
self.framecounter = 1
} else {
self.framecounter += 1
}
retval
}
/** Convert the given data into zero or more packets for transmission.
We create a leading byte that indicates how many more XBee packets are remaining
for the block. When zero, the receiver should process the accumulated data. */
pub fn packetize_data(
&mut self,
maxpacketsize: usize, maxpacketsize: usize,
dest: &XBDestAddr, dest: &XBDestAddr,
data: &[u8], data: &[u8],
) -> Result<Vec<XBTXRequest>, String> { ) -> Result<Vec<XBTXRequest>, String> {
let mut retval = Vec::new(); let mut retval = Vec::new();
if data.is_empty() { if data.is_empty() {
return Ok(retval); return Ok(retval);
@ -177,8 +198,9 @@ pub fn packetize_data(
let mut payload = BytesMut::new(); let mut payload = BytesMut::new();
payload.put_u8(chunks_remaining); payload.put_u8(chunks_remaining);
payload.put_slice(chunk); payload.put_slice(chunk);
let frame_id = self.get_and_incr_framecounter();
let packet = XBTXRequest { let packet = XBTXRequest {
frame_id: 0, frame_id,
dest_addr: dest.clone(), dest_addr: dest.clone(),
broadcast_radius: 0, broadcast_radius: 0,
transmit_options: 0, transmit_options: 0,
@ -190,6 +212,7 @@ pub fn packetize_data(
} }
Ok(retval) Ok(retval)
}
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
@ -204,3 +227,13 @@ pub struct RXPacket {
pub rx_options: u8, pub rx_options: u8,
pub payload: Bytes, pub payload: Bytes,
} }
/** A Digi extended transmit status frame, 0x8B */
#[derive(PartialEq, Eq, Debug)]
pub struct ExtTxStatus {
pub frame_id: u8,
pub dest_addr_16: u16,
pub tx_retry_count: u8,
pub delivery_status: u8,
pub discovery_status: u8,
}

View File

@ -75,11 +75,25 @@ pub fn rxxbpacket(ser: &mut XBSerReader) -> Option<RXPacket> {
let mut inner = Bytes::from(inner); let mut inner = Bytes::from(inner);
let frametype = inner.get_u8(); let frametype = inner.get_u8();
if frametype != 0x90 { match frametype {
debug!("SERIN: Non-0x90 frame; data: {}", hex::encode(inner)); 0x8B => {
return None; // Delivery status update. Log and ignore.
let frame_id = inner.get_u8();
let dest_addr_16 = inner.get_u16();
let tx_retry_count = inner.get_u8();
let delivery_status = inner.get_u8();
let discovery_status = inner.get_u8();
let txstatus = ExtTxStatus {
frame_id,
dest_addr_16,
tx_retry_count,
delivery_status,
discovery_status,
};
trace!("TX STATUS: {:?}", txstatus);
None
} }
0x90 => {
let sender_addr64 = inner.get_u64(); let sender_addr64 = inner.get_u64();
let sender_addr16 = inner.get_u16(); let sender_addr16 = inner.get_u16();
let rx_options = inner.get_u8(); let rx_options = inner.get_u8();
@ -96,6 +110,12 @@ pub fn rxxbpacket(ser: &mut XBSerReader) -> Option<RXPacket> {
rx_options, rx_options,
payload, payload,
}) })
}
_ => {
debug!("SERIN: Non-0x90 frame; data: {}", hex::encode(inner));
None
}
}
} }
/// Like rxxbpacket, but wait until we have a valid packet. /// Like rxxbpacket, but wait until we have a valid packet.