fix: cleanup for 0.0.1 release

This commit is contained in:
Semubico 2025-09-14 13:36:52 +03:00
parent 797b68b9e0
commit 1e3bd4dd19
6 changed files with 294 additions and 2 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
Cargo.lock

29
Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
[package]
name = "fs_esl_codec"
version = "0.0.1"
edition = "2021"
authors = ["semubico@protogen.engineering"]
description = "Freeswitch esl socket decoder for tokio Framed socket reading"
readme = "README.md"
license-file = "LICENSE"
repository = "https://github.com/elithorn/fs_esl_codec"
[lib]
crate-type = ["staticlib", "rlib"]
bench = false
[[example]]
name = "untyped"
path = "examples/untyped.rs"
[dependencies]
serde = { version = "^1.0.210", features = ["derive"] }
tokio-util = { version = "0.7.12", features = ["codec"] }
thiserror = "^1.0.64"
[dev-dependencies]
anyhow = "1.0.89"
tokio = { version = "1.0", features = ["full"]}
futures = "^0.3.30"
serde_json = { version = "^1.0.128" }

View file

@ -1,3 +1,8 @@
# freeswitch-esl
fs_esl_codec
---
This is a freeswitch esl codec for parsing event stream from freeswitch in inbound mode.
This crate only provides a codec. The authentication and the initial request for events in the appropriate format is to be done separately (see examples/untyped.rs for the most basic one).
The framing mechanism returns a packet consisting of ESL headers (not to be confused with the actual event headers) such as content-type, and a String buffer, that you can deserialize however you want using the parser appropriate for the type of events requested (events plain/events json/events xml).
This way you can put this codec on a reader of a .split(), or onto an another socket entirely.
Ad-hoc ESL ibrary in rust for Freeswitch

126
examples/untyped.rs Normal file
View file

@ -0,0 +1,126 @@
use std::collections::HashMap;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio_util::codec::{FramedRead};
use futures::stream::StreamExt;
use fs_esl_codec::EslCodec;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Open socket
let listen = std::env::args().nth(1).expect("Expected SockAddr of server");
let mut stream = TcpStream::connect(&listen).await.unwrap();
// Authorise against an ESL
let pass = std::env::args().nth(2).expect("Expected ESL auth pass");
let message = format!("auth {}\n\n", &pass);
stream.write_all(message.as_bytes()).await.unwrap();
// Subscribe to all types of events in json format
let message = b"event json ALL\n\n";
stream.write_all(message).await.unwrap();
// Instantiate ESL parser
let in_codec = EslCodec::new();
// Create tokio framedreader using EslCodec
let mut framed_read = FramedRead::new(stream, in_codec);
// Read freeswitch-emitted messages one-by-one
while let Some(Ok(data)) = framed_read.next().await {
// Decode into a hashmap of string key-value pairs
if let Ok(event_data) = serde_json::from_str::<HashMap<String,String>>(&data.payload.unwrap_or_default()) {
match event_data.get("Event-Name").map(|v| v.as_ref()) {
// First inbound INVITE received
Some("CHANNEL_CREATE") if event_data.get("Call-Direction").map(|v| v.as_ref()) == Some("inbound") => {
println!("[{}] <{}> new incoming [{}] {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Channel-Call-UUID").unwrap(),
event_data.get("Call-Direction").unwrap(),
event_data.get("Channel-Name").unwrap(),
);
},
// leg b originated
Some("CHANNEL_OUTGOING") => {
println!("[{}] <{}> trying [{}] {} for {} {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Channel-Call-UUID").unwrap(),
event_data.get("Call-Direction").unwrap(),
event_data.get("Channel-Name").unwrap(),
event_data.get("Caller-Caller-ID-Name").unwrap(),
event_data.get("Caller-Caller-ID-Number").unwrap(),
);
},
// leg b answered
Some("CHANNEL_ANSWER") => {
println!("[{}] <{}> answered [{}] {} for {} {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Channel-Call-UUID").unwrap(),
event_data.get("Call-Direction").unwrap(),
event_data.get("Channel-Name").unwrap(),
event_data.get("Caller-Caller-ID-Name").unwrap(),
event_data.get("Caller-Caller-ID-Number").unwrap(),
);
},
// Leg a bridged to leg b
Some("CHANNEL_BRIDGE") => {
println!("[{}] <{}> bridge [{}] {} for {}{}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Channel-Call-UUID").unwrap(),
event_data.get("Call-Direction").unwrap(),
event_data.get("Channel-Name").unwrap(),
event_data.get("Caller-Caller-ID-Name").unwrap(),
event_data.get("Caller-Caller-ID-Number").unwrap(),
);
},
// Leg b hangup
Some("CHANNEL_HANGUP_COMPLETE") => {
println!("[{}] <{}> hangup [{}] {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Channel-Call-UUID").unwrap(),
event_data.get("Call-Direction").unwrap(),
event_data.get("Channel-Name").unwrap(),
);
},
// Periodic stats updates (good for prometheus metrics or smth)
Some("HEARTBEAT") => {
println!("[{}] STAT {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Up-Time").unwrap(),
);
},
// mod Sofia and other events
Some("CUSTOM") => match event_data.get("Event-Subclass").map(|v| v.as_str()) {
Some("sofia::gateway_state") =>
println!("[{}] Trunk {} (ping={}) changed state to {} with status {}",
event_data.get("Event-Date-GMT").unwrap(),
event_data.get("Gateway").unwrap(),
event_data.get("Ping-Status").unwrap(),
event_data.get("State").unwrap(),
event_data.get("Status").map_or("", |v| v)
),
_ => { }
},
_ => { }
}
}
}
Ok(())
}

127
src/codec.rs Normal file
View file

@ -0,0 +1,127 @@
use tokio_util::bytes::{BytesMut};
use std::collections::HashMap;
use tokio_util::codec::{Decoder};
use thiserror::Error;
#[derive(Debug, Default)]
pub struct EslCodec {
offset: Option<usize>,
length: Option<usize>,
}
impl EslCodec {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug)]
pub struct EslPacket {
pub headers: HashMap<String, String>,
pub payload: Option<String>,
}
#[derive(Debug, Error)]
pub enum EslCodecError {
#[error("Failed to parse text as UTF-8")]
MalformedUtf8,
#[error("Failed to parse packet Content-Length field")]
InvalidContentLength,
#[error("Failed to parse headers, socket stream may not be aligned ")]
InvalidHeaders,
#[error("IO error")]
IoError(#[from] std::io::Error)
}
impl Decoder for EslCodec {
type Item = EslPacket;
type Error = EslCodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let delim_gap = b"\n\n".len();
match self {
// packet found, detecting end of headers
Self { offset: None, .. } if src.windows(2).position(|each| each == b"\n\n").is_some() =>
{
let headers_end_ix = src.windows(2).position(|each| each == b"\n\n").unwrap(); // SAFETY: just guard-mathced against this very condition
let length = try_parse_content_length(&src)?;
*self = Self { offset: Some(headers_end_ix), length };
self.decode(src)
},
// Packet has no Content-Length - decoding headers only
Self { offset: Some(headers_end_index), length: None } => {
let result = src.split_to(*headers_end_index);
let headers = head_to_map(&result)?;
// Move past the gap to read the next header
let _ = src.split_to(delim_gap);
*self = Self::default();
Ok(Some(EslPacket {
headers: headers,
payload: None
}))
},
// Packet has Content-length, and current buffer holds no less than content-length bytes of payload content - decoding headers, then body
Self { offset: Some(offset), length: Some(length) } if *offset > 0 && src[*offset + delim_gap ..].len() >= *length => {
let result = src.split_to(*length + *offset + delim_gap);
let headers = &result[.. *offset];
let payload = &result[*offset + delim_gap ..];
let headers = head_to_map(&headers)?;
*self = Self::default();
Ok(Some(EslPacket {
headers: headers,
payload: Some(String::from(std::str::from_utf8(payload).map_err(|_| Self::Error::MalformedUtf8)?)),
}))
},
_ => Ok(None),
}
}
}
fn head_to_map(head: &[u8]) -> Result<HashMap<String, String>, EslCodecError> {
let headers = std::str::from_utf8(&head).map_err(|_| EslCodecError::MalformedUtf8)?;
let mut result = HashMap::new();
for line in headers.lines() {
let mut it = line.split(": ");
let k = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
let v = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
let _ = result.insert(k, v);
}
Ok(result)
}
fn try_parse_content_length(src: &[u8]) -> Result<Option<usize>, EslCodecError> {
if !src.starts_with(b"Content-Length: ") {
return Ok(None);
}
let len = b"Content-Length: ".len();
let pos = src.iter().position(|c| *c == b'\n').ok_or(EslCodecError::InvalidContentLength)?;
let src = &src[len..pos];
let str = std::str::from_utf8(&src).map_err(|_| EslCodecError::InvalidContentLength)?;
let len = str.parse::<usize>().map_err(|_| EslCodecError::InvalidContentLength)?;
Ok(Some(len))
}

3
src/lib.rs Normal file
View file

@ -0,0 +1,3 @@
pub mod codec;
pub use codec::*;