diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..88b8285 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "fs_esl_codec" +version = "0.0.1" +edition = "2021" +authors = ["semubico@protogen.engineering"] +description = "Freeswitch esl socket decoder for tokio Framed socket reading" +readme = "README.md" +license-file = "LICENSE" +repository = "https://github.com/elithorn/fs_esl_codec" + +[lib] +crate-type = ["staticlib", "rlib"] +bench = false + +[[example]] +name = "untyped" +path = "examples/untyped.rs" + +[dependencies] +serde = { version = "^1.0.210", features = ["derive"] } +tokio-util = { version = "0.7.12", features = ["codec"] } +thiserror = "^1.0.64" + +[dev-dependencies] +anyhow = "1.0.89" +tokio = { version = "1.0", features = ["full"]} +futures = "^0.3.30" +serde_json = { version = "^1.0.128" } + diff --git a/README.md b/README.md index 88536e3..e00b926 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,8 @@ -# freeswitch-esl +fs_esl_codec +--- + +This is a freeswitch esl codec for parsing event stream from freeswitch in inbound mode. +This crate only provides a codec. The authentication and the initial request for events in the appropriate format is to be done separately (see examples/untyped.rs for the most basic one). +The framing mechanism returns a packet consisting of ESL headers (not to be confused with the actual event headers) such as content-type, and a String buffer, that you can deserialize however you want using the parser appropriate for the type of events requested (events plain/events json/events xml). +This way you can put this codec on a reader of a .split(), or onto an another socket entirely. -Ad-hoc ESL ibrary in rust for Freeswitch \ No newline at end of file diff --git a/examples/untyped.rs b/examples/untyped.rs new file mode 100644 index 0000000..962670b --- /dev/null +++ b/examples/untyped.rs @@ -0,0 +1,126 @@ +use std::collections::HashMap; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpStream; +use tokio_util::codec::{FramedRead}; +use futures::stream::StreamExt; + +use fs_esl_codec::EslCodec; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + + // Open socket + let listen = std::env::args().nth(1).expect("Expected SockAddr of server"); + let mut stream = TcpStream::connect(&listen).await.unwrap(); + + // Authorise against an ESL + let pass = std::env::args().nth(2).expect("Expected ESL auth pass"); + let message = format!("auth {}\n\n", &pass); + stream.write_all(message.as_bytes()).await.unwrap(); + + // Subscribe to all types of events in json format + let message = b"event json ALL\n\n"; + stream.write_all(message).await.unwrap(); + + // Instantiate ESL parser + let in_codec = EslCodec::new(); + + // Create tokio framedreader using EslCodec + let mut framed_read = FramedRead::new(stream, in_codec); + + // Read freeswitch-emitted messages one-by-one + while let Some(Ok(data)) = framed_read.next().await { + + // Decode into a hashmap of string key-value pairs + if let Ok(event_data) = serde_json::from_str::>(&data.payload.unwrap_or_default()) { + + match event_data.get("Event-Name").map(|v| v.as_ref()) { + + // First inbound INVITE received + Some("CHANNEL_CREATE") if event_data.get("Call-Direction").map(|v| v.as_ref()) == Some("inbound") => { + println!("[{}] <{}> new incoming [{}] {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Channel-Call-UUID").unwrap(), + event_data.get("Call-Direction").unwrap(), + event_data.get("Channel-Name").unwrap(), + ); + }, + + // leg b originated + Some("CHANNEL_OUTGOING") => { + println!("[{}] <{}> trying [{}] {} for {} {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Channel-Call-UUID").unwrap(), + event_data.get("Call-Direction").unwrap(), + event_data.get("Channel-Name").unwrap(), + event_data.get("Caller-Caller-ID-Name").unwrap(), + event_data.get("Caller-Caller-ID-Number").unwrap(), + ); + }, + + + // leg b answered + Some("CHANNEL_ANSWER") => { + println!("[{}] <{}> answered [{}] {} for {} {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Channel-Call-UUID").unwrap(), + event_data.get("Call-Direction").unwrap(), + event_data.get("Channel-Name").unwrap(), + event_data.get("Caller-Caller-ID-Name").unwrap(), + event_data.get("Caller-Caller-ID-Number").unwrap(), + ); + }, + + // Leg a bridged to leg b + Some("CHANNEL_BRIDGE") => { + println!("[{}] <{}> bridge [{}] {} for {}{}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Channel-Call-UUID").unwrap(), + event_data.get("Call-Direction").unwrap(), + event_data.get("Channel-Name").unwrap(), + event_data.get("Caller-Caller-ID-Name").unwrap(), + event_data.get("Caller-Caller-ID-Number").unwrap(), + ); + }, + + // Leg b hangup + Some("CHANNEL_HANGUP_COMPLETE") => { + println!("[{}] <{}> hangup [{}] {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Channel-Call-UUID").unwrap(), + event_data.get("Call-Direction").unwrap(), + event_data.get("Channel-Name").unwrap(), + ); + }, + + // Periodic stats updates (good for prometheus metrics or smth) + Some("HEARTBEAT") => { + println!("[{}] STAT {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Up-Time").unwrap(), + ); + }, + + // mod Sofia and other events + Some("CUSTOM") => match event_data.get("Event-Subclass").map(|v| v.as_str()) { + + Some("sofia::gateway_state") => + println!("[{}] Trunk {} (ping={}) changed state to {} with status {}", + event_data.get("Event-Date-GMT").unwrap(), + event_data.get("Gateway").unwrap(), + event_data.get("Ping-Status").unwrap(), + event_data.get("State").unwrap(), + event_data.get("Status").map_or("", |v| v) + ), + + _ => { } + + }, + + _ => { } + } + } + } + + Ok(()) +} diff --git a/src/codec.rs b/src/codec.rs new file mode 100644 index 0000000..dc16243 --- /dev/null +++ b/src/codec.rs @@ -0,0 +1,127 @@ +use tokio_util::bytes::{BytesMut}; +use std::collections::HashMap; +use tokio_util::codec::{Decoder}; +use thiserror::Error; + +#[derive(Debug, Default)] +pub struct EslCodec { + offset: Option, + length: Option, +} + +impl EslCodec { + pub fn new() -> Self { + Self::default() + } +} + + +#[derive(Debug)] +pub struct EslPacket { + pub headers: HashMap, + pub payload: Option, +} + +#[derive(Debug, Error)] +pub enum EslCodecError { + #[error("Failed to parse text as UTF-8")] + MalformedUtf8, + #[error("Failed to parse packet Content-Length field")] + InvalidContentLength, + #[error("Failed to parse headers, socket stream may not be aligned ")] + InvalidHeaders, + #[error("IO error")] + IoError(#[from] std::io::Error) +} + +impl Decoder for EslCodec { + type Item = EslPacket; + type Error = EslCodecError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + + let delim_gap = b"\n\n".len(); + + match self { + + // packet found, detecting end of headers + Self { offset: None, .. } if src.windows(2).position(|each| each == b"\n\n").is_some() => + { + let headers_end_ix = src.windows(2).position(|each| each == b"\n\n").unwrap(); // SAFETY: just guard-mathced against this very condition + let length = try_parse_content_length(&src)?; + *self = Self { offset: Some(headers_end_ix), length }; + self.decode(src) + }, + + // Packet has no Content-Length - decoding headers only + Self { offset: Some(headers_end_index), length: None } => { + + let result = src.split_to(*headers_end_index); + let headers = head_to_map(&result)?; + + // Move past the gap to read the next header + let _ = src.split_to(delim_gap); + + *self = Self::default(); + Ok(Some(EslPacket { + headers: headers, + payload: None + })) + }, + + // Packet has Content-length, and current buffer holds no less than content-length bytes of payload content - decoding headers, then body + Self { offset: Some(offset), length: Some(length) } if *offset > 0 && src[*offset + delim_gap ..].len() >= *length => { + + let result = src.split_to(*length + *offset + delim_gap); + + let headers = &result[.. *offset]; + let payload = &result[*offset + delim_gap ..]; + + let headers = head_to_map(&headers)?; + + *self = Self::default(); + Ok(Some(EslPacket { + headers: headers, + payload: Some(String::from(std::str::from_utf8(payload).map_err(|_| Self::Error::MalformedUtf8)?)), + })) + }, + + _ => Ok(None), + } + } +} + + + +fn head_to_map(head: &[u8]) -> Result, EslCodecError> { + + let headers = std::str::from_utf8(&head).map_err(|_| EslCodecError::MalformedUtf8)?; + + let mut result = HashMap::new(); + + for line in headers.lines() { + let mut it = line.split(": "); + let k = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?); + let v = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?); + let _ = result.insert(k, v); + } + + Ok(result) +} + + +fn try_parse_content_length(src: &[u8]) -> Result, EslCodecError> { + if !src.starts_with(b"Content-Length: ") { + return Ok(None); + } + + let len = b"Content-Length: ".len(); + let pos = src.iter().position(|c| *c == b'\n').ok_or(EslCodecError::InvalidContentLength)?; + let src = &src[len..pos]; + + let str = std::str::from_utf8(&src).map_err(|_| EslCodecError::InvalidContentLength)?; + let len = str.parse::().map_err(|_| EslCodecError::InvalidContentLength)?; + + Ok(Some(len)) +} + diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..89fb4d0 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,3 @@ +pub mod codec; + +pub use codec::*;