checkpointing

This commit is contained in:
John Goerzen 2020-09-24 21:49:29 -05:00
parent 19be37d37a
commit b88d12b401
6 changed files with 168 additions and 127 deletions

12
Cargo.lock generated
View File

@ -120,15 +120,6 @@ name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ifstructs"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -423,8 +414,6 @@ dependencies = [
"etherparse 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
"format_escape_default 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
"ifstructs 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
"serialport 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"simplelog 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -450,7 +439,6 @@ dependencies = [
"checksum format_escape_default 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cdb2a22fc101e1c1be19e7401b58d502802839a4a7fd58ad35369a386b4639e9"
"checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205"
"checksum hex 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
"checksum ifstructs 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b24d770f92a5ea876a33851b16553f21985bb83e7fe8e7e1f596ad75545e9581"
"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
"checksum libc 0.2.77 (registry+https://github.com/rust-lang/crates.io-index)" = "f2f96b10ec2560088a8e76961b00d47107b3a625fecb76dedb29ee7ccbf98235"
"checksum libudev 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ea626d3bdf40a1c5aee3bcd4f40826970cae8d80a8fec934c82a63840094dcfe"

View File

@ -111,7 +111,13 @@ fn main() {
info!("xbnet starting");
let (ser_reader, ser_writer) = ser::new(opt.port).expect("Failed to initialize serial port");
let (mut xb, xbeesender, writerthread) = xb::XB::new(ser_reader, ser_writer, opt.initfile, opt.disable_xbee_acks, opt.request_xbee_tx_reports);
let (mut xb, xbeesender, writerthread) = xb::XB::new(
ser_reader,
ser_writer,
opt.initfile,
opt.disable_xbee_acks,
opt.request_xbee_tx_reports,
);
let mut xbreframer = xbrx::XBReframer::new();
match opt.cmd {
@ -141,16 +147,28 @@ fn main() {
// Make sure queued up data is sent
let _ = writerthread.join();
}
Command::Tap { broadcast_unknown, broadcast_everything, iface_name } => {
let tap_reader = tap::XBTap::new_tap(xb.mymac, broadcast_unknown, broadcast_everything, iface_name).expect("Failure initializing tap");
Command::Tap {
broadcast_unknown,
broadcast_everything,
iface_name,
} => {
let tap_reader = tap::XBTap::new_tap(
xb.mymac,
broadcast_unknown,
broadcast_everything,
iface_name,
)
.expect("Failure initializing tap");
let tap_writer = tap_reader.clone();
let maxpacketsize = xb.maxpacketsize;
thread::spawn(move || {
tap_writer.frames_from_xb_processor(&mut xbreframer, &mut xb.ser_reader)
tap_writer
.frames_from_xb_processor(&mut xbreframer, &mut xb.ser_reader)
.expect("Failure in frames_from_xb_processor");
});
tap_reader.frames_from_tap_processor(maxpacketsize - 1, xbeesender)
.expect("Failure in frames_from_tap_processor");
tap_reader
.frames_from_tap_processor(maxpacketsize - 1, xbeesender)
.expect("Failure in frames_from_tap_processor");
// Make sure queued up data is sent
let _ = writerthread.join();
}

View File

@ -28,12 +28,10 @@ use bytes::*;
use crossbeam_channel;
use etherparse::*;
use log::*;
use std::convert::TryInto;
use std::collections::HashMap;
use std::convert::TryInto;
use std::io;
use std::sync::{Arc, Mutex};
use ifstructs::ifreq;
use libc;
pub const ETHER_BROADCAST: [u8; 6] = [0xff, 0xff, 0xff, 0xff, 0xff, 0xff];
pub const XB_BROADCAST: u64 = 0xffff;
@ -54,20 +52,20 @@ pub struct XBTap {
}
impl XBTap {
pub fn new_tap(myxbmac: u64, broadcast_unknown: bool, broadcast_everything: bool, iface_name_requested: String) -> io::Result<XBTap> {
pub fn new_tap(
myxbmac: u64,
broadcast_unknown: bool,
broadcast_everything: bool,
iface_name_requested: String,
) -> io::Result<XBTap> {
let tap = Iface::without_packet_info(&iface_name_requested, Mode::Tap)?;
let name = tap.name();
println!(
"Interface {} (XBee MAC {:x}) ready",
name,
myxbmac,
);
println!("Interface {} (XBee MAC {:x}) ready", name, myxbmac,);
let mut desthm = HashMap::new();
desthm.insert(ETHER_BROADCAST, XB_BROADCAST);
Ok(XBTap {
myxbmac,
broadcast_unknown,
@ -84,12 +82,13 @@ impl XBTap {
}
match self.dests.lock().unwrap().get(ethermac) {
None =>
None => {
if self.broadcast_unknown {
Some(XB_BROADCAST)
} else {
None
},
}
}
Some(dest) => Some(*dest),
}
}
@ -110,25 +109,26 @@ impl XBTap {
}
Ok(packet) => {
if let Some(LinkSlice::Ethernet2(header)) = packet.link {
trace!("TAPIN: Packet is {} -> {}", hex::encode(header.source()), hex::encode(header.destination()));
trace!(
"TAPIN: Packet is {} -> {}",
hex::encode(header.source()),
hex::encode(header.destination())
);
match self.get_xb_dest_mac(header.destination().try_into().unwrap()) {
None =>
warn!("Destination MAC address unknown; discarding packet"),
Some(destxbmac) =>
{
let res =
sender
.try_send(XBTX::TXData(
XBDestAddr::U64(destxbmac),
Bytes::copy_from_slice(tapdata),
));
match res {
Ok(()) => (),
Err(crossbeam_channel::TrySendError::Full(_)) =>
debug!("Dropped packet due to full TX buffer"),
Err(e) => Err(e).unwrap(),
None => warn!("Destination MAC address unknown; discarding packet"),
Some(destxbmac) => {
let res = sender.try_send(XBTX::TXData(
XBDestAddr::U64(destxbmac),
Bytes::copy_from_slice(tapdata),
));
match res {
Ok(()) => (),
Err(crossbeam_channel::TrySendError::Full(_)) => {
debug!("Dropped packet due to full TX buffer")
}
Err(e) => Err(e).unwrap(),
}
}
}
} else {
warn!("Unable to get Ethernet2 header from tap packet; discarding");
@ -140,20 +140,31 @@ impl XBTap {
pub fn frames_from_xb_processor(
&self,
xbreframer: &mut XBReframer,
ser: &mut XBSerReader) -> io::Result<()> {
ser: &mut XBSerReader,
) -> io::Result<()> {
loop {
let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser);
// Register the sender in our map of known MACs
match SlicedPacket::from_ethernet(&payload) {
Err(x) => {
warn!("Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", x);
warn!(
"Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}",
x
);
}
Ok(packet) => {
if let Some(LinkSlice::Ethernet2(header)) = packet.link {
trace!("SERIN: Packet Ethernet header is {} -> {}", hex::encode(header.source()), hex::encode(header.destination()));
if ! self.broadcast_everything {
self.dests.lock().unwrap().insert(header.source().try_into().unwrap(), fromu64);
trace!(
"SERIN: Packet Ethernet header is {} -> {}",
hex::encode(header.source()),
hex::encode(header.destination())
);
if !self.broadcast_everything {
self.dests
.lock()
.unwrap()
.insert(header.source().try_into().unwrap(), fromu64);
}
}
}
@ -164,7 +175,6 @@ impl XBTap {
}
}
pub fn showmac(mac: &[u8; 6]) -> String {
format!(
"{:x}:{:x}:{:x}:{:x}:{:x}:{:x}",

View File

@ -1,4 +1,4 @@
/*! tap virtual Ethernet gateway */
/*! tun virtual IP gateway */
/*
Copyright (C) 2019-2020 John Goerzen <jgoerzen@complete.org
@ -28,132 +28,148 @@ use bytes::*;
use crossbeam_channel;
use etherparse::*;
use log::*;
use std::convert::TryInto;
use std::collections::HashMap;
use std::convert::TryInto;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::{Arc, Mutex};
use ifstructs::ifreq;
use libc;
use std::time::{Duration, Instant};
pub const ETHER_BROADCAST: [u8; 6] = [0xff, 0xff, 0xff, 0xff, 0xff, 0xff];
pub const XB_BROADCAST: u64 = 0xffff;
#[derive(Clone)]
pub struct XBTap {
pub struct XBTun {
pub myxbmac: u64,
pub name: String,
pub broadcast_unknown: bool,
pub broadcast_everything: bool,
pub tap: Arc<Iface>,
pub tun: Arc<Iface>,
pub max_ip_cache: Duration,
/** We can't just blindly generate destination MACs because there is a bug
in the firmware that causes the radio to lock up if we send too many
packets to a MAC that's not online. So, we keep a translation map of
MACs we've seen. */
pub dests: Arc<Mutex<HashMap<[u8; 6], u64>>>,
/** The map from IP Addresses (v4 or v6) to destination MAC addresses. Also
includes a timestamp at which the destination expires. */
pub dests: Arc<Mutex<HashMap<IpAddr, (u64, Instant)>>>,
}
impl XBTap {
pub fn new_tap(myxbmac: u64, broadcast_unknown: bool, broadcast_everything: bool, iface_name_requested: String) -> io::Result<XBTap> {
let tap = Iface::without_packet_info(&iface_name_requested, Mode::Tap)?;
let name = tap.name();
impl XBTun {
pub fn new_tap(
myxbmac: u64,
broadcast_everything: bool,
iface_name_requested: String,
max_ip_cache: Duration,
) -> io::Result<XBTap> {
let tun = Iface::without_packet_info(&iface_name_requested, Mode::Tun)?;
let name = tun.name();
println!(
"Interface {} (XBee MAC {:x}) ready",
name,
myxbmac,
);
println!("Interface {} (XBee MAC {:x}) ready", name, myxbmac,);
let mut desthm = HashMap::new();
desthm.insert(ETHER_BROADCAST, XB_BROADCAST);
Ok(XBTap {
Ok(XBTun {
myxbmac,
broadcast_unknown,
broadcast_everything,
max_ip_cache,
name: String::from(name),
tap: Arc::new(tap),
tun: Arc::new(tun),
dests: Arc::new(Mutex::new(desthm)),
})
}
pub fn get_xb_dest_mac(&self, ethermac: &[u8; 6]) -> Option<u64> {
pub fn get_xb_dest_mac(&self, ipaddr: &IpAddr) -> Option<u64> {
if self.broadcast_everything {
return Some(XB_BROADCAST);
}
match self.dests.lock().unwrap().get(ethermac) {
None =>
if self.broadcast_unknown {
match self.dests.lock().unwrap().get(ipaddr) {
None => Some(XB_BROADCAST),
Some((dest, expiration)) => {
if *expiration >= Instant::now() {
// Broadcast it if it's not in the cache
Some(XB_BROADCAST)
} else {
None
},
Some(dest) => Some(*dest),
Some(*dest)
}
}
}
}
pub fn frames_from_tap_processor(
pub fn frames_from_tun_processor(
&self,
maxframesize: usize,
sender: crossbeam_channel::Sender<XBTX>,
) -> io::Result<()> {
let mut buf = [0u8; 9100]; // Enough to handle even jumbo frames
loop {
let size = self.tap.recv(&mut buf)?;
let tapdata = &buf[0..size];
trace!("TAPIN: {}", hex::encode(tapdata));
match SlicedPacket::from_ethernet(tapdata) {
let size = self.tun.recv(&mut buf)?;
let tundata = &buf[0..size];
trace!("TUNIN: {}", hex::encode(tundata));
match SlicedPacket::from_ip(tundata) {
Err(x) => {
warn!("Error parsing packet from tap; discarding: {:?}", x);
warn!("Error parsing packet from tun; discarding: {:?}", x);
}
Ok(packet) => {
if let Some(LinkSlice::Ethernet2(header)) = packet.link {
trace!("TAPIN: Packet is {} -> {}", hex::encode(header.source()), hex::encode(header.destination()));
match self.get_xb_dest_mac(header.destination().try_into().unwrap()) {
None =>
warn!("Destination MAC address unknown; discarding packet"),
Some(destxbmac) =>
{
let res =
sender
.try_send(XBTX::TXData(
XBDestAddr::U64(destxbmac),
Bytes::copy_from_slice(tapdata),
));
match res {
Ok(()) => (),
Err(crossbeam_channel::TrySendError::Full(_)) =>
debug!("Dropped packet due to full TX buffer"),
Err(e) => Err(e).unwrap(),
}
}
let destination = match packet.ip {
Some(InternetSlice::Ipv4(header)) => {
Some(IpAddr::V4(header.destination_addr()))
}
Some(InternetSlice::Ipv6(header, _)) => {
Some(IpAddr::V6(header.destination_addr()))
}
_ => {
warn!("Could not parse packet at IPv4 or IPv6; discarding");
None
}
};
if let Some(destination) = destination {
let destxbmac = self.get_xb_dest_mac(&destination);
trace!("TAPIN: Packet dest {} (MAC {x})", destination, destxbmac);
let res = sender.try_send(XBTX::TXData(
XBDestAddr::U64(destxbmac),
Bytes::copy_from_slice(tundata),
));
match res {
Ok(()) => (),
Err(crossbeam_channel::TrySendError::Full(_)) => {
debug!("Dropped packet due to full TX buffer")
}
Err(e) => Err(e).unwrap(),
}
} else {
warn!("Unable to get Ethernet2 header from tap packet; discarding");
warn!("Unable to get IP header from tun packet; discarding");
}
}
}
}
}
pub fn frames_from_xb_processor(
&self,
xbreframer: &mut XBReframer,
ser: &mut XBSerReader) -> io::Result<()> {
ser: &mut XBSerReader,
) -> io::Result<()> {
loop {
let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser);
// Register the sender in our map of known MACs
match SlicedPacket::from_ethernet(&payload) {
Err(x) => {
warn!("Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}", x);
warn!(
"Packet from XBee wasn't valid Ethernet; continueing anyhow: {:?}",
x
);
}
Ok(packet) => {
if let Some(LinkSlice::Ethernet2(header)) = packet.link {
trace!("SERIN: Packet Ethernet header is {} -> {}", hex::encode(header.source()), hex::encode(header.destination()));
if ! self.broadcast_everything {
self.dests.lock().unwrap().insert(header.source().try_into().unwrap(), fromu64);
trace!(
"SERIN: Packet Ethernet header is {} -> {}",
hex::encode(header.source()),
hex::encode(header.destination())
);
if !self.broadcast_everything {
self.dests
.lock()
.unwrap()
.insert(header.source().try_into().unwrap(), fromu64);
}
}
}
@ -164,7 +180,6 @@ impl XBTap {
}
}
pub fn showmac(mac: &[u8; 6]) -> String {
format!(
"{:x}:{:x}:{:x}:{:x}:{:x}:{:x}",

View File

@ -148,7 +148,15 @@ impl XB {
debug!("Radio configuration complete");
let writerthread = thread::spawn(move || writerthread(ser_writer, maxpacketsize, writerrx, disable_xbee_acks, request_xbee_tx_reports));
let writerthread = thread::spawn(move || {
writerthread(
ser_writer,
maxpacketsize,
writerrx,
disable_xbee_acks,
request_xbee_tx_reports,
)
});
(
XB {
@ -177,7 +185,13 @@ fn writerthread(
// Here we receive a block of data, which hasn't been
// packetized. Packetize it and send out the result.
match packetstream.packetize_data(maxpacketsize, &dest, &data, disable_xbee_acks, request_xbee_tx_reports) {
match packetstream.packetize_data(
maxpacketsize,
&dest,
&data,
disable_xbee_acks,
request_xbee_tx_reports,
) {
Ok(packets) => {
for packet in packets.into_iter() {
match packet.serialize() {

View File

@ -18,9 +18,9 @@
*/
use bytes::*;
use log::*;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use log::*;
/** XBee transmissions can give either a 64-bit or a 16-bit destination
address. This permits the user to select one. */
@ -213,11 +213,7 @@ impl PacketStream {
frame_id,
dest_addr: dest.clone(),
broadcast_radius: 0,
transmit_options: if disable_xbee_acks {
0x01
} else {
0
},
transmit_options: if disable_xbee_acks { 0x01 } else { 0 },
payload: Bytes::from(payload),
};