Removed lock around serial ports

This commit is contained in:
John Goerzen 2020-09-20 23:51:49 -05:00
parent 9e9699c459
commit 8c8374e575
6 changed files with 62 additions and 60 deletions

View File

@ -79,8 +79,8 @@ fn main() {
}
info!("xbnet starting");
let xbser = ser::XBSer::new(opt.port).expect("Failed to initialize serial port");
let (xb, xbeesender, writerthread) = xb::XB::new(xbser, opt.initfile);
let (ser_reader, ser_writer) = ser::new(opt.port).expect("Failed to initialize serial port");
let (mut xb, xbeesender, writerthread) = xb::XB::new(ser_reader, ser_writer, opt.initfile);
let mut xbreframer = xbrx::XBReframer::new();
@ -88,14 +88,14 @@ fn main() {
Command::Ping{dest} => {
let dest_u64:u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || ping::genpings(dest_u64, xbeesender).expect("Failure in genpings"));
ping::displaypongs(&mut xbreframer, &xb.ser);
ping::displaypongs(&mut xbreframer, &mut xb.ser_reader);
},
Command::Pong => {
ping::pong(&mut xbreframer, &xb.ser, xbeesender).expect("Failure in pong");
ping::pong(&mut xbreframer, &mut xb.ser_reader, xbeesender).expect("Failure in pong");
},
Command::Pipe{dest} => {
let dest_u64:u64 = u64::from_str_radix(&dest, 16).expect("Invalid destination");
thread::spawn(move || pipe::stdout_processor(&mut xbreframer, &xb.ser).expect("Failure in stdout_processor"));
thread::spawn(move || pipe::stdout_processor(&mut xbreframer, &mut xb.ser_reader).expect("Failure in stdout_processor"));
pipe::stdin_processor(dest_u64, 1600, xbeesender).expect("Failure in stdin_processor");
// Make sure queued up data is sent
writerthread.join();

View File

@ -39,7 +39,7 @@ pub fn genpings(dest: u64, sender: crossbeam_channel::Sender<XBTX>) -> io::Resul
}
/// Show pongs
pub fn displaypongs(xbreframer: &mut XBReframer, ser: &XBSer) -> () {
pub fn displaypongs(xbreframer: &mut XBReframer, ser: &mut XBSerReader) -> () {
loop {
let (fromu64, _fromu16, payload) = xbreframer.rxframe(ser);
println!("RECV from {}: {}", hex::encode(fromu64.to_be_bytes()), String::from_utf8_lossy(&payload));
@ -47,7 +47,7 @@ pub fn displaypongs(xbreframer: &mut XBReframer, ser: &XBSer) -> () {
}
/// Reply to pings
pub fn pong(xbreframer: &mut XBReframer, ser: &XBSer, sender: crossbeam_channel::Sender<XBTX>) -> io::Result<()> {
pub fn pong(xbreframer: &mut XBReframer, ser: &mut XBSerReader, sender: crossbeam_channel::Sender<XBTX>) -> io::Result<()> {
loop {
let (fromu64, _addr_16, payload) = xbreframer.rxframe(ser);
if payload.starts_with(b"Ping ") {

View File

@ -46,7 +46,7 @@ pub fn stdin_processor(dest: u64, maxframesize: usize,
}
}
pub fn stdout_processor(xbreframer: &mut XBReframer, ser: &XBSer) -> io::Result<()> {
pub fn stdout_processor(xbreframer: &mut XBReframer, ser: &mut XBSerReader) -> io::Result<()> {
let mut stdout = io::stdout();
loop {
let (_fromu64, _fromu16, payload) = xbreframer.rxframe(ser);

View File

@ -25,18 +25,19 @@ use std::time::Duration;
use std::path::PathBuf;
use bytes::*;
#[derive(Clone)]
pub struct XBSer {
// BufReader can't be cloned. Sigh.
pub br: Arc<Mutex<BufReader<Box<dyn SerialPort>>>>,
pub swrite: Arc<Mutex<Box<dyn SerialPort>>>,
pub portname: PathBuf
pub struct XBSerReader {
pub br: BufReader<Box<dyn SerialPort>>,
pub portname: PathBuf,
}
impl XBSer {
pub struct XBSerWriter {
pub swrite: Box<dyn SerialPort>,
pub portname: PathBuf,
}
/// Initialize the serial system, configuring the port.
pub fn new(portname: PathBuf) -> io::Result<XBSer> {
pub fn new(portname: PathBuf) -> io::Result<(XBSerReader, XBSerWriter)> {
let settings = SerialPortSettings {
baud_rate: 115200, // FIXME: make this configurable, default 9600
data_bits: DataBits::Eight,
@ -48,16 +49,17 @@ impl XBSer {
let readport = serialport::open_with_settings(&portname, &settings)?;
let writeport = readport.try_clone()?;
Ok(XBSer {br: Arc::new(Mutex::new(BufReader::new(readport))),
swrite: Arc::new(Mutex::new(writeport)),
portname})
Ok((
XBSerReader {br: BufReader::new(readport), portname: portname.clone()},
XBSerWriter {swrite: writeport, portname}))
}
impl XBSerReader {
/// Read a line from the port. Return it with EOL characters removed.
/// None if EOF reached.
pub fn readln(&mut self) -> io::Result<Option<String>> {
let mut buf = Vec::new();
let size = self.br.lock().unwrap().read_until(0x0D, &mut buf)?;
let size = self.br.read_until(0x0D, &mut buf)?;
let buf = String::from_utf8_lossy(&buf);
if size == 0 {
debug!("{:?}: Received EOF from serial port", self.portname);
@ -69,15 +71,17 @@ impl XBSer {
}
}
}
impl XBSerWriter {
/// Transmits a command with terminating EOL characters
pub fn writeln(&mut self, data: &str) -> io::Result<()> {
trace!("{:?} SEROUT: {}", self.portname, data);
let mut data = BytesMut::from(data.as_bytes());
data.put(&b"\r\n"[..]);
// Give the receiver a chance to process
// FIXME: lock this only once
self.swrite.lock().unwrap().write_all(&data)?;
self.swrite.lock().unwrap().flush()
self.swrite.write_all(&data)?;
self.swrite.flush()
}
}

View File

@ -16,7 +16,7 @@
*/
use crate::ser::XBSer;
use crate::ser::*;
use log::*;
use std::fs;
use std::io::{BufRead, BufReader, Error, ErrorKind};
@ -30,6 +30,7 @@ use std::path::PathBuf;
use bytes::Bytes;
use std::convert::TryInto;
use crate::xbpacket::*;
use serialport::prelude::*;
pub fn mkerror(msg: &str) -> Error {
Error::new(ErrorKind::Other, msg)
@ -45,7 +46,7 @@ pub enum XBTX {
/// Main XBeeNet struct
pub struct XB {
pub ser: XBSer,
pub ser_reader: XBSerReader,
/// My 64-bit MAC address
pub mymac: u64,
@ -76,21 +77,21 @@ impl XB {
May panic if an error occurs during initialization.
*/
pub fn new(mut ser: XBSer, initfile: Option<PathBuf>) -> (XB, crossbeam_channel::Sender<XBTX>, thread::JoinHandle<()>) {
pub fn new(mut ser_reader: XBSerReader, mut ser_writer: XBSerWriter, initfile: Option<PathBuf>) -> (XB, crossbeam_channel::Sender<XBTX>, thread::JoinHandle<()>) {
// FIXME: make this maximum of 5 configurable
let (writertx, writerrx) = crossbeam_channel::bounded(5);
debug!("Configuring radio");
thread::sleep(Duration::from_secs(2));
trace!("Sending +++");
ser.swrite.lock().unwrap().write_all(b"+++").unwrap();
ser.swrite.lock().unwrap().flush().unwrap();
ser_writer.swrite.write_all(b"+++").unwrap();
ser_writer.swrite.flush().unwrap();
loop {
// There might be other packets flowing in while we wait for the OK. FIXME: this could still find
// it prematurely if OK\r occurs in a packet.
trace!("Waiting for OK");
let line = ser.readln().unwrap().unwrap();
let line = ser_reader.readln().unwrap().unwrap();
if line.ends_with("OK") {
trace!("Received OK");
break;
@ -105,48 +106,47 @@ impl XB {
for line in reader.lines() {
let line = line.unwrap();
if line.len() > 0 {
ser.writeln(&line).unwrap();
assert_eq!(ser.readln().unwrap().unwrap(), String::from("OK"));
ser_writer.writeln(&line).unwrap();
assert_eq!(ser_reader.readln().unwrap().unwrap(), String::from("OK"));
}
}
}
// Enter API mode
ser.writeln("ATAP 1").unwrap();
assert_eq!(ser.readln().unwrap().unwrap(), String::from("OK"));
ser_writer.writeln("ATAP 1").unwrap();
assert_eq!(ser_reader.readln().unwrap().unwrap(), String::from("OK"));
// Standard API output mode
ser.writeln("ATAO 0").unwrap();
assert_eq!(ser.readln().unwrap().unwrap(), String::from("OK"));
ser_writer.writeln("ATAO 0").unwrap();
assert_eq!(ser_reader.readln().unwrap().unwrap(), String::from("OK"));
// Get our own MAC address
ser.writeln("ATSH").unwrap();
let serialhigh = ser.readln().unwrap().unwrap();
ser_writer.writeln("ATSH").unwrap();
let serialhigh = ser_reader.readln().unwrap().unwrap();
let serialhighu64 = u64::from_str_radix(&serialhigh, 16).unwrap();
ser.writeln("ATSL").unwrap();
let seriallow = ser.readln().unwrap().unwrap();
ser_writer.writeln("ATSL").unwrap();
let seriallow = ser_reader.readln().unwrap().unwrap();
let seriallowu64 = u64::from_str_radix(&seriallow, 16).unwrap();
let mymac = serialhighu64 << 32 | seriallowu64;
// Get maximum packet size
ser.writeln("ATNP").unwrap();
let maxpacket = ser.readln().unwrap().unwrap();
ser_writer.writeln("ATNP").unwrap();
let maxpacket = ser_reader.readln().unwrap().unwrap();
let maxpacketsize = usize::from(u16::from_str_radix(&maxpacket, 16).unwrap());
// Exit command mode
ser.writeln("ATCN").unwrap();
assert_eq!(ser.readln().unwrap().unwrap(), String::from("OK"));
ser_writer.writeln("ATCN").unwrap();
assert_eq!(ser_reader.readln().unwrap().unwrap(), String::from("OK"));
debug!("Radio configuration complete");
let ser2 = ser.clone();
let writerthread = thread::spawn(move || writerthread(ser2, maxpacketsize, writerrx));
let writerthread = thread::spawn(move || writerthread(ser_writer, maxpacketsize, writerrx));
(XB {
ser,
ser_reader,
mymac,
maxpacketsize,
}, writertx, writerthread)
@ -154,7 +154,7 @@ impl XB {
}
fn writerthread(ser: XBSer, maxpacketsize: usize,
fn writerthread(mut ser: XBSerWriter, maxpacketsize: usize,
writerrx: crossbeam_channel::Receiver<XBTX>) {
for item in writerrx.iter() {
match item {
@ -165,13 +165,12 @@ fn writerthread(ser: XBSer, maxpacketsize: usize,
match packetize_data(maxpacketsize, &dest, &data) {
Ok(packets) => {
let mut serport = ser.swrite.lock().unwrap();
for packet in packets.into_iter() {
match packet.serialize() {
Ok(datatowrite) => {
trace!("TX to {:?} data {}", &dest, hex::encode(&datatowrite));
serport.write_all(&datatowrite).unwrap();
serport.flush().unwrap();
ser.swrite.write_all(&datatowrite).unwrap();
ser.swrite.flush().unwrap();
},
Err(e) => {
error!("Serialization error: {:?}", e);

View File

@ -18,7 +18,7 @@
*/
use crate::ser::XBSer;
use crate::ser::*;
use crate::xbpacket::*;
use log::*;
use std::fs;
@ -35,12 +35,11 @@ use std::collections::HashMap;
/** Attempts to read a packet from the port. Returns
None if it's not an RX frame, or if there is a checksum mismatch. */
pub fn rxxbpacket(ser: &XBSer) -> Option<RXPacket> {
pub fn rxxbpacket(ser: &mut XBSerReader) -> Option<RXPacket> {
let mut junkbytes = BytesMut::new();
let mut serport = ser.br.lock().unwrap();
loop {
let mut startdelim = [0u8; 1];
serport.read_exact(&mut startdelim).unwrap();
ser.br.read_exact(&mut startdelim).unwrap();
if startdelim[0] != 0x7e {
if junkbytes.is_empty() {
error!("Receiving junk");
@ -61,17 +60,17 @@ pub fn rxxbpacket(ser: &XBSer) -> Option<RXPacket> {
// Read the length.
let mut lenbytes = [0u8; 2];
serport.read_exact(&mut lenbytes).unwrap();
ser.br.read_exact(&mut lenbytes).unwrap();
let length = usize::from(u16::from_be_bytes(lenbytes));
// Now read the rest of the frame.
let mut inner = vec![0u8; length];
serport.read_exact(&mut inner).unwrap();
ser.br.read_exact(&mut inner).unwrap();
// And the checksum.
let mut checksum = [0u8; 1];
serport.read_exact(&mut checksum).unwrap();
ser.br.read_exact(&mut checksum).unwrap();
if xbchecksum(&inner) != checksum[0] {
error!("SERIN: Checksum mismatch; data: {}", hex::encode(inner));
@ -94,7 +93,7 @@ pub fn rxxbpacket(ser: &XBSer) -> Option<RXPacket> {
}
/// Like rxxbpacket, but wait until we have a valid packet.
pub fn rxxbpacket_wait(ser: &XBSer) -> RXPacket {
pub fn rxxbpacket_wait(ser: &mut XBSerReader) -> RXPacket {
loop {
if let Some(packet) = rxxbpacket(ser) {
return packet;
@ -117,7 +116,7 @@ impl XBReframer {
}
/// Receive a frame. Indicate the sender (u64, u16) and payload.
pub fn rxframe(&mut self, ser: &XBSer) -> (u64, u16, Bytes) {
pub fn rxframe(&mut self, ser: &mut XBSerReader) -> (u64, u16, Bytes) {
loop {
let packet = rxxbpacket_wait(ser);
let mut frame = BytesMut::new();
@ -135,7 +134,7 @@ impl XBReframer {
}
}
pub fn discardframes(&mut self, ser: &XBSer) -> () {
pub fn discardframes(&mut self, ser: &mut XBSerReader) -> () {
loop {
let _ = self.rxframe(ser);
}