From b88d12b4017043508a2906620900fff167b2947c Mon Sep 17 00:00:00 2001 From: John Goerzen Date: Thu, 24 Sep 2020 21:49:29 -0500 Subject: [PATCH] checkpointing --- Cargo.lock | 12 ---- src/main.rs | 30 ++++++++-- src/tap.rs | 78 ++++++++++++++----------- src/tun.rs | 149 ++++++++++++++++++++++++++---------------------- src/xb.rs | 18 +++++- src/xbpacket.rs | 8 +-- 6 files changed, 168 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44b9a3e..1da4aad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -120,15 +120,6 @@ name = "hex" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "ifstructs" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -423,8 +414,6 @@ dependencies = [ "etherparse 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "format_escape_default 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "ifstructs 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", "serialport 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "simplelog 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -450,7 +439,6 @@ dependencies = [ "checksum format_escape_default 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cdb2a22fc101e1c1be19e7401b58d502802839a4a7fd58ad35369a386b4639e9" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" -"checksum ifstructs 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b24d770f92a5ea876a33851b16553f21985bb83e7fe8e7e1f596ad75545e9581" "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" "checksum libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)" = "f2f96b10ec2560088a8e76961b00d47107b3a625fecb76dedb29ee7ccbf98235" "checksum libudev 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea626d3bdf40a1c5aee3bcd4f40826970cae8d80a8fec934c82a63840094dcfe" diff --git a/src/main.rs b/src/main.rs index f316cae..33b3a34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -111,7 +111,13 @@ fn main() { info!("xbnet starting"); let (ser_reader, ser_writer) = ser::new(opt.port).expect("Failed to initialize serial port"); - let (mut xb, xbeesender, writerthread) = xb::XB::new(ser_reader, ser_writer, opt.initfile, opt.disable_xbee_acks, opt.request_xbee_tx_reports); + let (mut xb, xbeesender, writerthread) = xb::XB::new( + ser_reader, + ser_writer, + opt.initfile, + opt.disable_xbee_acks, + opt.request_xbee_tx_reports, + ); let mut xbreframer = xbrx::XBReframer::new(); match opt.cmd { @@ -141,16 +147,28 @@ fn main() { // Make sure queued up data is sent let _ = writerthread.join(); } - Command::Tap { broadcast_unknown, broadcast_everything, iface_name } => { - let tap_reader = tap::XBTap::new_tap(xb.mymac, broadcast_unknown, broadcast_everything, iface_name).expect("Failure initializing tap"); + Command::Tap { + broadcast_unknown, + broadcast_everything, + iface_name, + } => { + let tap_reader = tap::XBTap::new_tap( + xb.mymac, + broadcast_unknown, + broadcast_everything, + iface_name, + ) + .expect("Failure initializing tap"); let tap_writer = tap_reader.clone(); let maxpacketsize = xb.maxpacketsize; thread::spawn(move || { - tap_writer.frames_from_xb_processor(&mut xbreframer, &mut xb.ser_reader) + tap_writer + .frames_from_xb_processor(&mut xbreframer, &mut xb.ser_reader) .expect("Failure in frames_from_xb_processor"); }); - tap_reader.frames_from_tap_processor(maxpacketsize - 1, xbeesender) - .expect("Failure in frames_from_tap_processor"); + tap_reader + .frames_from_tap_processor(maxpacketsize - 1, xbeesender) + .expect("Failure in frames_from_tap_processor"); // Make sure queued up data is sent let _ = writerthread.join(); } diff --git a/src/tap.rs b/src/tap.rs index 934be47..3a07c1b 100644 --- a/src/tap.rs +++ b/src/tap.rs @@ -28,12 +28,10 @@ use bytes::*; use crossbeam_channel; use etherparse::*; use log::*; -use std::convert::TryInto; use std::collections::HashMap; +use std::convert::TryInto; use std::io; use std::sync::{Arc, Mutex}; -use ifstructs::ifreq; -use libc; pub const ETHER_BROADCAST: [u8; 6] = [0xff, 0xff, 0xff, 0xff, 0xff, 0xff]; pub const XB_BROADCAST: u64 = 0xffff; @@ -54,20 +52,20 @@ pub struct XBTap { } impl XBTap { - pub fn new_tap(myxbmac: u64, broadcast_unknown: bool, broadcast_everything: bool, iface_name_requested: String) -> io::Result { + pub fn new_tap( + myxbmac: u64, + broadcast_unknown: bool, + broadcast_everything: bool, + iface_name_requested: String, + ) -> io::Result { let tap = Iface::without_packet_info(&iface_name_requested, Mode::Tap)?; let name = tap.name(); - println!( - "Interface {} (XBee MAC {:x}) ready", - name, - myxbmac, - ); + println!("Interface {} (XBee MAC {:x}) ready", name, myxbmac,); let mut desthm = HashMap::new(); desthm.insert(ETHER_BROADCAST, XB_BROADCAST); - Ok(XBTap { myxbmac, broadcast_unknown, @@ -84,12 +82,13 @@ impl XBTap { } match self.dests.lock().unwrap().get(ethermac) { - None => + None => { if self.broadcast_unknown { Some(XB_BROADCAST) } else { None - }, + } + } Some(dest) => Some(*dest), } } @@ -110,25 +109,26 @@ impl XBTap { } Ok(packet) => { if let Some(LinkSlice::Ethernet2(header)) = packet.link { - trace!("TAPIN: Packet is {} -> {}", hex::encode(header.source()), hex::encode(header.destination())); + trace!( + "TAPIN: Packet is {} -> {}", + hex::encode(header.source()), + hex::encode(header.destination()) + ); match self.get_xb_dest_mac(header.destination().try_into().unwrap()) { - None => - warn!("Destination MAC address unknown; discarding packet"), - Some(destxbmac) => - { - let res = - sender - .try_send(XBTX::TXData( - XBDestAddr::U64(destxbmac), - Bytes::copy_from_slice(tapdata), - )); - match res { - Ok(()) => (), - Err(crossbeam_channel::TrySendError::Full(_)) => - debug!("Dropped packet due to full TX buffer"), - Err(e) => Err(e).unwrap(), + None => warn!("Destination MAC address unknown; discarding packet"), + Some(destxbmac) => { + let res = sender.try_send(XBTX::TXData( + XBDestAddr::U64(destxbmac), + Bytes::copy_from_slice(tapdata), + )); + match res { + Ok(()) => (), + Err(crossbeam_channel::TrySendError::Full(_)) => { + debug!("Dropped packet due to full TX buffer") } + Err(e) => Err(e).unwrap(), } + } } } else { warn!("Unable to get Ethernet2 header from tap packet; discarding"); @@ -140,20 +140,31 @@ impl XBTap { pub fn frames_from_xb_processor( &self, xbreframer: &mut XBReframer, - ser: &mut XBSerReader) -> io::Result<()> { + ser: &mut XBSerReader, + ) -> io::Result<()> { loop { let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser); // Register the sender in our map of known MACs match SlicedPacket::from_ethernet(&payload) { Err(x) => { - warn!("Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", x); + warn!( + "Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", + x + ); } Ok(packet) => { if let Some(LinkSlice::Ethernet2(header)) = packet.link { - trace!("SERIN: Packet Ethernet header is {} -> {}", hex::encode(header.source()), hex::encode(header.destination())); - if ! self.broadcast_everything { - self.dests.lock().unwrap().insert(header.source().try_into().unwrap(), fromu64); + trace!( + "SERIN: Packet Ethernet header is {} -> {}", + hex::encode(header.source()), + hex::encode(header.destination()) + ); + if !self.broadcast_everything { + self.dests + .lock() + .unwrap() + .insert(header.source().try_into().unwrap(), fromu64); } } } @@ -164,7 +175,6 @@ impl XBTap { } } - pub fn showmac(mac: &[u8; 6]) -> String { format!( "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", diff --git a/src/tun.rs b/src/tun.rs index 934be47..66584a0 100644 --- a/src/tun.rs +++ b/src/tun.rs @@ -1,4 +1,4 @@ -/*! tap virtual Ethernet gateway */ +/*! tun virtual IP gateway */ /* Copyright (C) 2019-2020 John Goerzen , + pub tun: Arc, + pub max_ip_cache: Duration, - /** We can't just blindly generate destination MACs because there is a bug - in the firmware that causes the radio to lock up if we send too many - packets to a MAC that's not online. So, we keep a translation map of - MACs we've seen. */ - pub dests: Arc>>, + /** The map from IP Addresses (v4 or v6) to destination MAC addresses. Also + includes a timestamp at which the destination expires. */ + pub dests: Arc>>, } -impl XBTap { - pub fn new_tap(myxbmac: u64, broadcast_unknown: bool, broadcast_everything: bool, iface_name_requested: String) -> io::Result { - let tap = Iface::without_packet_info(&iface_name_requested, Mode::Tap)?; - let name = tap.name(); +impl XBTun { + pub fn new_tap( + myxbmac: u64, + broadcast_everything: bool, + iface_name_requested: String, + max_ip_cache: Duration, + ) -> io::Result { + let tun = Iface::without_packet_info(&iface_name_requested, Mode::Tun)?; + let name = tun.name(); - println!( - "Interface {} (XBee MAC {:x}) ready", - name, - myxbmac, - ); + println!("Interface {} (XBee MAC {:x}) ready", name, myxbmac,); let mut desthm = HashMap::new(); - desthm.insert(ETHER_BROADCAST, XB_BROADCAST); - - Ok(XBTap { + Ok(XBTun { myxbmac, - broadcast_unknown, broadcast_everything, + max_ip_cache, name: String::from(name), - tap: Arc::new(tap), + tun: Arc::new(tun), dests: Arc::new(Mutex::new(desthm)), }) } - pub fn get_xb_dest_mac(&self, ethermac: &[u8; 6]) -> Option { + pub fn get_xb_dest_mac(&self, ipaddr: &IpAddr) -> Option { if self.broadcast_everything { return Some(XB_BROADCAST); } - match self.dests.lock().unwrap().get(ethermac) { - None => - if self.broadcast_unknown { + match self.dests.lock().unwrap().get(ipaddr) { + None => Some(XB_BROADCAST), + Some((dest, expiration)) => { + if *expiration >= Instant::now() { + // Broadcast it if it's not in the cache Some(XB_BROADCAST) } else { - None - }, - Some(dest) => Some(*dest), + Some(*dest) + } + } } } - pub fn frames_from_tap_processor( + pub fn frames_from_tun_processor( &self, maxframesize: usize, sender: crossbeam_channel::Sender, ) -> io::Result<()> { let mut buf = [0u8; 9100]; // Enough to handle even jumbo frames loop { - let size = self.tap.recv(&mut buf)?; - let tapdata = &buf[0..size]; - trace!("TAPIN: {}", hex::encode(tapdata)); - match SlicedPacket::from_ethernet(tapdata) { + let size = self.tun.recv(&mut buf)?; + let tundata = &buf[0..size]; + trace!("TUNIN: {}", hex::encode(tundata)); + match SlicedPacket::from_ip(tundata) { Err(x) => { - warn!("Error parsing packet from tap; discarding: {:?}", x); + warn!("Error parsing packet from tun; discarding: {:?}", x); } Ok(packet) => { - if let Some(LinkSlice::Ethernet2(header)) = packet.link { - trace!("TAPIN: Packet is {} -> {}", hex::encode(header.source()), hex::encode(header.destination())); - match self.get_xb_dest_mac(header.destination().try_into().unwrap()) { - None => - warn!("Destination MAC address unknown; discarding packet"), - Some(destxbmac) => - { - let res = - sender - .try_send(XBTX::TXData( - XBDestAddr::U64(destxbmac), - Bytes::copy_from_slice(tapdata), - )); - match res { - Ok(()) => (), - Err(crossbeam_channel::TrySendError::Full(_)) => - debug!("Dropped packet due to full TX buffer"), - Err(e) => Err(e).unwrap(), - } - } + let destination = match packet.ip { + Some(InternetSlice::Ipv4(header)) => { + Some(IpAddr::V4(header.destination_addr())) + } + Some(InternetSlice::Ipv6(header, _)) => { + Some(IpAddr::V6(header.destination_addr())) + } + _ => { + warn!("Could not parse packet at IPv4 or IPv6; discarding"); + None + } + }; + + if let Some(destination) = destination { + let destxbmac = self.get_xb_dest_mac(&destination); + trace!("TAPIN: Packet dest {} (MAC {x})", destination, destxbmac); + let res = sender.try_send(XBTX::TXData( + XBDestAddr::U64(destxbmac), + Bytes::copy_from_slice(tundata), + )); + match res { + Ok(()) => (), + Err(crossbeam_channel::TrySendError::Full(_)) => { + debug!("Dropped packet due to full TX buffer") + } + Err(e) => Err(e).unwrap(), } } else { - warn!("Unable to get Ethernet2 header from tap packet; discarding"); + warn!("Unable to get IP header from tun packet; discarding"); } } } } } + pub fn frames_from_xb_processor( &self, xbreframer: &mut XBReframer, - ser: &mut XBSerReader) -> io::Result<()> { + ser: &mut XBSerReader, + ) -> io::Result<()> { loop { let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser); // Register the sender in our map of known MACs match SlicedPacket::from_ethernet(&payload) { Err(x) => { - warn!("Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", x); + warn!( + "Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", + x + ); } Ok(packet) => { if let Some(LinkSlice::Ethernet2(header)) = packet.link { - trace!("SERIN: Packet Ethernet header is {} -> {}", hex::encode(header.source()), hex::encode(header.destination())); - if ! self.broadcast_everything { - self.dests.lock().unwrap().insert(header.source().try_into().unwrap(), fromu64); + trace!( + "SERIN: Packet Ethernet header is {} -> {}", + hex::encode(header.source()), + hex::encode(header.destination()) + ); + if !self.broadcast_everything { + self.dests + .lock() + .unwrap() + .insert(header.source().try_into().unwrap(), fromu64); } } } @@ -164,7 +180,6 @@ impl XBTap { } } - pub fn showmac(mac: &[u8; 6]) -> String { format!( "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", diff --git a/src/xb.rs b/src/xb.rs index 9703415..d7dd024 100644 --- a/src/xb.rs +++ b/src/xb.rs @@ -148,7 +148,15 @@ impl XB { debug!("Radio configuration complete"); - let writerthread = thread::spawn(move || writerthread(ser_writer, maxpacketsize, writerrx, disable_xbee_acks, request_xbee_tx_reports)); + let writerthread = thread::spawn(move || { + writerthread( + ser_writer, + maxpacketsize, + writerrx, + disable_xbee_acks, + request_xbee_tx_reports, + ) + }); ( XB { @@ -177,7 +185,13 @@ fn writerthread( // Here we receive a block of data, which hasn't been // packetized. Packetize it and send out the result. - match packetstream.packetize_data(maxpacketsize, &dest, &data, disable_xbee_acks, request_xbee_tx_reports) { + match packetstream.packetize_data( + maxpacketsize, + &dest, + &data, + disable_xbee_acks, + request_xbee_tx_reports, + ) { Ok(packets) => { for packet in packets.into_iter() { match packet.serialize() { diff --git a/src/xbpacket.rs b/src/xbpacket.rs index 438236a..7435b86 100644 --- a/src/xbpacket.rs +++ b/src/xbpacket.rs @@ -18,9 +18,9 @@ */ use bytes::*; +use log::*; use std::convert::{TryFrom, TryInto}; use std::fmt; -use log::*; /** XBee transmissions can give either a 64-bit or a 16-bit destination address. This permits the user to select one. */ @@ -213,11 +213,7 @@ impl PacketStream { frame_id, dest_addr: dest.clone(), broadcast_radius: 0, - transmit_options: if disable_xbee_acks { - 0x01 - } else { - 0 - }, + transmit_options: if disable_xbee_acks { 0x01 } else { 0 }, payload: Bytes::from(payload), };