fix: cleanup for 0.0.1 release
This commit is contained in:
parent
797b68b9e0
commit
1015a87554
6 changed files with 292 additions and 2 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,2 @@
|
|||
/target
|
||||
Cargo.lock
|
||||
27
Cargo.toml
Normal file
27
Cargo.toml
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
[package]
|
||||
name = "fs_esl_codec"
|
||||
version = "0.0.1"
|
||||
edition = "2021"
|
||||
authors = ["semubico@protogen.engineering"]
|
||||
description = "Freeswitch esl socket decoder for tokio Framed socket reading"
|
||||
readme = "README.md"
|
||||
|
||||
[lib]
|
||||
crate-type = ["staticlib", "rlib"]
|
||||
bench = false
|
||||
|
||||
[[example]]
|
||||
name = "untyped"
|
||||
path = "examples/untyped.rs"
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "^1.0.210", features = ["derive"] }
|
||||
tokio-util = { version = "0.7.12", features = ["codec"] }
|
||||
thiserror = "^1.0.64"
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = "1.0.89"
|
||||
tokio = { version = "1.0", features = ["full"]}
|
||||
futures = "^0.3.30"
|
||||
serde_json = { version = "^1.0.128" }
|
||||
|
||||
|
|
@ -1,3 +1,8 @@
|
|||
# freeswitch-esl
|
||||
fs_esl_codec
|
||||
---
|
||||
|
||||
This is a freeswitch esl codec for parsing event stream from freeswitch in inbound mode.
|
||||
This crate only provides a codec. The authentication and the initial request for events in the appropriate format is to be done separately (see examples/untyped.rs for the most basic one).
|
||||
The framing mechanism returns a packet consisting of ESL headers (not to be confused with the actual event headers) such as content-type, and a String buffer, that you can deserialize however you want using the parser appropriate for the type of events requested (events plain/events json/events xml).
|
||||
This way you can put this codec on a reader of a .split(), or onto an another socket entirely.
|
||||
|
||||
Ad-hoc ESL ibrary in rust for Freeswitch
|
||||
126
examples/untyped.rs
Normal file
126
examples/untyped.rs
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
use std::collections::HashMap;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::codec::{FramedRead};
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
use fs_esl_codec::EslCodec;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
|
||||
// Open socket
|
||||
let listen = std::env::args().nth(1).expect("Expected SockAddr of server");
|
||||
let mut stream = TcpStream::connect(&listen).await.unwrap();
|
||||
|
||||
// Authorise against an ESL
|
||||
let pass = std::env::args().nth(2).expect("Expected ESL auth pass");
|
||||
let message = format!("auth {}\n\n", &pass);
|
||||
stream.write_all(message.as_bytes()).await.unwrap();
|
||||
|
||||
// Subscribe to all types of events in json format
|
||||
let message = b"event json ALL\n\n";
|
||||
stream.write_all(message).await.unwrap();
|
||||
|
||||
// Instantiate ESL parser
|
||||
let in_codec = EslCodec::new();
|
||||
|
||||
// Create tokio framedreader using EslCodec
|
||||
let mut framed_read = FramedRead::new(stream, in_codec);
|
||||
|
||||
// Read freeswitch-emitted messages one-by-one
|
||||
while let Some(Ok(data)) = framed_read.next().await {
|
||||
|
||||
// Decode into a hashmap of string key-value pairs
|
||||
if let Ok(event_data) = serde_json::from_str::<HashMap<String,String>>(&data.payload.unwrap_or_default()) {
|
||||
|
||||
match event_data.get("Event-Name").map(|v| v.as_ref()) {
|
||||
|
||||
// First inbound INVITE received
|
||||
Some("CHANNEL_CREATE") if event_data.get("Call-Direction").map(|v| v.as_ref()) == Some("inbound") => {
|
||||
println!("[{}] <{}> new incoming [{}] {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Channel-Call-UUID").unwrap(),
|
||||
event_data.get("Call-Direction").unwrap(),
|
||||
event_data.get("Channel-Name").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
// leg b originated
|
||||
Some("CHANNEL_OUTGOING") => {
|
||||
println!("[{}] <{}> trying [{}] {} for {} {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Channel-Call-UUID").unwrap(),
|
||||
event_data.get("Call-Direction").unwrap(),
|
||||
event_data.get("Channel-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
|
||||
// leg b answered
|
||||
Some("CHANNEL_ANSWER") => {
|
||||
println!("[{}] <{}> answered [{}] {} for {} {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Channel-Call-UUID").unwrap(),
|
||||
event_data.get("Call-Direction").unwrap(),
|
||||
event_data.get("Channel-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
// Leg a bridged to leg b
|
||||
Some("CHANNEL_BRIDGE") => {
|
||||
println!("[{}] <{}> bridge [{}] {} for {}{}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Channel-Call-UUID").unwrap(),
|
||||
event_data.get("Call-Direction").unwrap(),
|
||||
event_data.get("Channel-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
// Leg b hangup
|
||||
Some("CHANNEL_HANGUP_COMPLETE") => {
|
||||
println!("[{}] <{}> hangup [{}] {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Channel-Call-UUID").unwrap(),
|
||||
event_data.get("Call-Direction").unwrap(),
|
||||
event_data.get("Channel-Name").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
// Periodic stats updates (good for prometheus metrics or smth)
|
||||
Some("HEARTBEAT") => {
|
||||
println!("[{}] STAT {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Up-Time").unwrap(),
|
||||
);
|
||||
},
|
||||
|
||||
// mod Sofia and other events
|
||||
Some("CUSTOM") => match event_data.get("Event-Subclass").map(|v| v.as_str()) {
|
||||
|
||||
Some("sofia::gateway_state") =>
|
||||
println!("[{}] Trunk {} (ping={}) changed state to {} with status {}",
|
||||
event_data.get("Event-Date-GMT").unwrap(),
|
||||
event_data.get("Gateway").unwrap(),
|
||||
event_data.get("Ping-Status").unwrap(),
|
||||
event_data.get("State").unwrap(),
|
||||
event_data.get("Status").map_or("", |v| v)
|
||||
),
|
||||
|
||||
_ => { }
|
||||
|
||||
},
|
||||
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
127
src/codec.rs
Normal file
127
src/codec.rs
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
use tokio_util::bytes::{BytesMut};
|
||||
use std::collections::HashMap;
|
||||
use tokio_util::codec::{Decoder};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EslCodec {
|
||||
offset: Option<usize>,
|
||||
length: Option<usize>,
|
||||
}
|
||||
|
||||
impl EslCodec {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EslPacket {
|
||||
pub headers: HashMap<String, String>,
|
||||
pub payload: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum EslCodecError {
|
||||
#[error("Failed to parse text as UTF-8")]
|
||||
MalformedUtf8,
|
||||
#[error("Failed to parse packet Content-Length field")]
|
||||
InvalidContentLength,
|
||||
#[error("Failed to parse headers, socket stream may not be aligned ")]
|
||||
InvalidHeaders,
|
||||
#[error("IO error")]
|
||||
IoError(#[from] std::io::Error)
|
||||
}
|
||||
|
||||
impl Decoder for EslCodec {
|
||||
type Item = EslPacket;
|
||||
type Error = EslCodecError;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
|
||||
let delim_gap = b"\n\n".len();
|
||||
|
||||
match self {
|
||||
|
||||
// packet found, detecting end of headers
|
||||
Self { offset: None, .. } if src.windows(2).position(|each| each == b"\n\n").is_some() =>
|
||||
{
|
||||
let headers_end_ix = src.windows(2).position(|each| each == b"\n\n").unwrap(); // SAFETY: just guard-mathced against this very condition
|
||||
let length = try_parse_content_length(&src)?;
|
||||
*self = Self { offset: Some(headers_end_ix), length };
|
||||
self.decode(src)
|
||||
},
|
||||
|
||||
// Packet has no Content-Length - decoding headers only
|
||||
Self { offset: Some(headers_end_index), length: None } => {
|
||||
|
||||
let result = src.split_to(*headers_end_index);
|
||||
let headers = head_to_map(&result)?;
|
||||
|
||||
// Move past the gap to read the next header
|
||||
let _ = src.split_to(delim_gap);
|
||||
|
||||
*self = Self::default();
|
||||
Ok(Some(EslPacket {
|
||||
headers: headers,
|
||||
payload: None
|
||||
}))
|
||||
},
|
||||
|
||||
// Packet has Content-length, and current buffer holds no less than content-length bytes of payload content - decoding headers, then body
|
||||
Self { offset: Some(offset), length: Some(length) } if *offset > 0 && src[*offset + delim_gap ..].len() >= *length => {
|
||||
|
||||
let result = src.split_to(*length + *offset + delim_gap);
|
||||
|
||||
let headers = &result[.. *offset];
|
||||
let payload = &result[*offset + delim_gap ..];
|
||||
|
||||
let headers = head_to_map(&headers)?;
|
||||
|
||||
*self = Self::default();
|
||||
Ok(Some(EslPacket {
|
||||
headers: headers,
|
||||
payload: Some(String::from(std::str::from_utf8(payload).map_err(|_| Self::Error::MalformedUtf8)?)),
|
||||
}))
|
||||
},
|
||||
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
fn head_to_map(head: &[u8]) -> Result<HashMap<String, String>, EslCodecError> {
|
||||
|
||||
let headers = std::str::from_utf8(&head).map_err(|_| EslCodecError::MalformedUtf8)?;
|
||||
|
||||
let mut result = HashMap::new();
|
||||
|
||||
for line in headers.lines() {
|
||||
let mut it = line.split(": ");
|
||||
let k = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
|
||||
let v = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
|
||||
let _ = result.insert(k, v);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
|
||||
fn try_parse_content_length(src: &[u8]) -> Result<Option<usize>, EslCodecError> {
|
||||
if !src.starts_with(b"Content-Length: ") {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let len = b"Content-Length: ".len();
|
||||
let pos = src.iter().position(|c| *c == b'\n').ok_or(EslCodecError::InvalidContentLength)?;
|
||||
let src = &src[len..pos];
|
||||
|
||||
let str = std::str::from_utf8(&src).map_err(|_| EslCodecError::InvalidContentLength)?;
|
||||
let len = str.parse::<usize>().map_err(|_| EslCodecError::InvalidContentLength)?;
|
||||
|
||||
Ok(Some(len))
|
||||
}
|
||||
|
||||
3
src/lib.rs
Normal file
3
src/lib.rs
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
pub mod codec;
|
||||
|
||||
pub use codec::*;
|
||||
Loading…
Add table
Add a link
Reference in a new issue