fix: cleanup for 0.0.1 release
This commit is contained in:
parent
797b68b9e0
commit
89f300c0ed
6 changed files with 294 additions and 2 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,2 @@
|
||||||
|
/target
|
||||||
|
Cargo.lock
|
||||||
29
Cargo.toml
Normal file
29
Cargo.toml
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
[package]
|
||||||
|
name = "fs_esl_codec"
|
||||||
|
version = "0.0.1"
|
||||||
|
edition = "2021"
|
||||||
|
authors = ["semubico@protogen.engineering"]
|
||||||
|
description = "Freeswitch esl socket decoder for tokio Framed socket reading"
|
||||||
|
readme = "README.md"
|
||||||
|
license-file = "LICENSE"
|
||||||
|
repository = "https://git.protogen.engineering/semubico/freeswitch-esl/"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
crate-type = ["staticlib", "rlib"]
|
||||||
|
bench = false
|
||||||
|
|
||||||
|
[[example]]
|
||||||
|
name = "untyped"
|
||||||
|
path = "examples/untyped.rs"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { version = "^1.0.210", features = ["derive"] }
|
||||||
|
tokio-util = { version = "0.7.12", features = ["codec"] }
|
||||||
|
thiserror = "^1.0.64"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
anyhow = "1.0.89"
|
||||||
|
tokio = { version = "1.0", features = ["full"]}
|
||||||
|
futures = "^0.3.30"
|
||||||
|
serde_json = { version = "^1.0.128" }
|
||||||
|
|
||||||
|
|
@ -1,3 +1,8 @@
|
||||||
# freeswitch-esl
|
fs_esl_codec
|
||||||
|
---
|
||||||
|
|
||||||
|
This is a freeswitch esl codec for parsing event stream from freeswitch in inbound mode.
|
||||||
|
This crate only provides a codec. The authentication and the initial request for events in the appropriate format is to be done separately (see examples/untyped.rs for the most basic one).
|
||||||
|
The framing mechanism returns a packet consisting of ESL headers (not to be confused with the actual event headers) such as content-type, and a String buffer, that you can deserialize however you want using the parser appropriate for the type of events requested (events plain/events json/events xml).
|
||||||
|
This way you can put this codec on a reader of a .split(), or onto an another socket entirely.
|
||||||
|
|
||||||
Ad-hoc ESL ibrary in rust for Freeswitch
|
|
||||||
126
examples/untyped.rs
Normal file
126
examples/untyped.rs
Normal file
|
|
@ -0,0 +1,126 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio_util::codec::{FramedRead};
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
|
||||||
|
use fs_esl_codec::EslCodec;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
|
// Open socket
|
||||||
|
let listen = std::env::args().nth(1).expect("Expected SockAddr of server");
|
||||||
|
let mut stream = TcpStream::connect(&listen).await.unwrap();
|
||||||
|
|
||||||
|
// Authorise against an ESL
|
||||||
|
let pass = std::env::args().nth(2).expect("Expected ESL auth pass");
|
||||||
|
let message = format!("auth {}\n\n", &pass);
|
||||||
|
stream.write_all(message.as_bytes()).await.unwrap();
|
||||||
|
|
||||||
|
// Subscribe to all types of events in json format
|
||||||
|
let message = b"event json ALL\n\n";
|
||||||
|
stream.write_all(message).await.unwrap();
|
||||||
|
|
||||||
|
// Instantiate ESL parser
|
||||||
|
let in_codec = EslCodec::new();
|
||||||
|
|
||||||
|
// Create tokio framedreader using EslCodec
|
||||||
|
let mut framed_read = FramedRead::new(stream, in_codec);
|
||||||
|
|
||||||
|
// Read freeswitch-emitted messages one-by-one
|
||||||
|
while let Some(Ok(data)) = framed_read.next().await {
|
||||||
|
|
||||||
|
// Decode into a hashmap of string key-value pairs
|
||||||
|
if let Ok(event_data) = serde_json::from_str::<HashMap<String,String>>(&data.payload.unwrap_or_default()) {
|
||||||
|
|
||||||
|
match event_data.get("Event-Name").map(|v| v.as_ref()) {
|
||||||
|
|
||||||
|
// First inbound INVITE received
|
||||||
|
Some("CHANNEL_CREATE") if event_data.get("Call-Direction").map(|v| v.as_ref()) == Some("inbound") => {
|
||||||
|
println!("[{}] <{}> new incoming [{}] {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Channel-Call-UUID").unwrap(),
|
||||||
|
event_data.get("Call-Direction").unwrap(),
|
||||||
|
event_data.get("Channel-Name").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
// leg b originated
|
||||||
|
Some("CHANNEL_OUTGOING") => {
|
||||||
|
println!("[{}] <{}> trying [{}] {} for {} {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Channel-Call-UUID").unwrap(),
|
||||||
|
event_data.get("Call-Direction").unwrap(),
|
||||||
|
event_data.get("Channel-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
|
||||||
|
// leg b answered
|
||||||
|
Some("CHANNEL_ANSWER") => {
|
||||||
|
println!("[{}] <{}> answered [{}] {} for {} {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Channel-Call-UUID").unwrap(),
|
||||||
|
event_data.get("Call-Direction").unwrap(),
|
||||||
|
event_data.get("Channel-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
// Leg a bridged to leg b
|
||||||
|
Some("CHANNEL_BRIDGE") => {
|
||||||
|
println!("[{}] <{}> bridge [{}] {} for {}{}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Channel-Call-UUID").unwrap(),
|
||||||
|
event_data.get("Call-Direction").unwrap(),
|
||||||
|
event_data.get("Channel-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Name").unwrap(),
|
||||||
|
event_data.get("Caller-Caller-ID-Number").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
// Leg b hangup
|
||||||
|
Some("CHANNEL_HANGUP_COMPLETE") => {
|
||||||
|
println!("[{}] <{}> hangup [{}] {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Channel-Call-UUID").unwrap(),
|
||||||
|
event_data.get("Call-Direction").unwrap(),
|
||||||
|
event_data.get("Channel-Name").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
// Periodic stats updates (good for prometheus metrics or smth)
|
||||||
|
Some("HEARTBEAT") => {
|
||||||
|
println!("[{}] STAT {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Up-Time").unwrap(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
|
||||||
|
// mod Sofia and other events
|
||||||
|
Some("CUSTOM") => match event_data.get("Event-Subclass").map(|v| v.as_str()) {
|
||||||
|
|
||||||
|
Some("sofia::gateway_state") =>
|
||||||
|
println!("[{}] Trunk {} (ping={}) changed state to {} with status {}",
|
||||||
|
event_data.get("Event-Date-GMT").unwrap(),
|
||||||
|
event_data.get("Gateway").unwrap(),
|
||||||
|
event_data.get("Ping-Status").unwrap(),
|
||||||
|
event_data.get("State").unwrap(),
|
||||||
|
event_data.get("Status").map_or("", |v| v)
|
||||||
|
),
|
||||||
|
|
||||||
|
_ => { }
|
||||||
|
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => { }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
127
src/codec.rs
Normal file
127
src/codec.rs
Normal file
|
|
@ -0,0 +1,127 @@
|
||||||
|
use tokio_util::bytes::{BytesMut};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use tokio_util::codec::{Decoder};
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct EslCodec {
|
||||||
|
offset: Option<usize>,
|
||||||
|
length: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EslCodec {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct EslPacket {
|
||||||
|
pub headers: HashMap<String, String>,
|
||||||
|
pub payload: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum EslCodecError {
|
||||||
|
#[error("Failed to parse text as UTF-8")]
|
||||||
|
MalformedUtf8,
|
||||||
|
#[error("Failed to parse packet Content-Length field")]
|
||||||
|
InvalidContentLength,
|
||||||
|
#[error("Failed to parse headers, socket stream may not be aligned ")]
|
||||||
|
InvalidHeaders,
|
||||||
|
#[error("IO error")]
|
||||||
|
IoError(#[from] std::io::Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Decoder for EslCodec {
|
||||||
|
type Item = EslPacket;
|
||||||
|
type Error = EslCodecError;
|
||||||
|
|
||||||
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||||
|
|
||||||
|
let delim_gap = b"\n\n".len();
|
||||||
|
|
||||||
|
match self {
|
||||||
|
|
||||||
|
// packet found, detecting end of headers
|
||||||
|
Self { offset: None, .. } if src.windows(2).position(|each| each == b"\n\n").is_some() =>
|
||||||
|
{
|
||||||
|
let headers_end_ix = src.windows(2).position(|each| each == b"\n\n").unwrap(); // SAFETY: just guard-mathced against this very condition
|
||||||
|
let length = try_parse_content_length(&src)?;
|
||||||
|
*self = Self { offset: Some(headers_end_ix), length };
|
||||||
|
self.decode(src)
|
||||||
|
},
|
||||||
|
|
||||||
|
// Packet has no Content-Length - decoding headers only
|
||||||
|
Self { offset: Some(headers_end_index), length: None } => {
|
||||||
|
|
||||||
|
let result = src.split_to(*headers_end_index);
|
||||||
|
let headers = head_to_map(&result)?;
|
||||||
|
|
||||||
|
// Move past the gap to read the next header
|
||||||
|
let _ = src.split_to(delim_gap);
|
||||||
|
|
||||||
|
*self = Self::default();
|
||||||
|
Ok(Some(EslPacket {
|
||||||
|
headers: headers,
|
||||||
|
payload: None
|
||||||
|
}))
|
||||||
|
},
|
||||||
|
|
||||||
|
// Packet has Content-length, and current buffer holds no less than content-length bytes of payload content - decoding headers, then body
|
||||||
|
Self { offset: Some(offset), length: Some(length) } if *offset > 0 && src[*offset + delim_gap ..].len() >= *length => {
|
||||||
|
|
||||||
|
let result = src.split_to(*length + *offset + delim_gap);
|
||||||
|
|
||||||
|
let headers = &result[.. *offset];
|
||||||
|
let payload = &result[*offset + delim_gap ..];
|
||||||
|
|
||||||
|
let headers = head_to_map(&headers)?;
|
||||||
|
|
||||||
|
*self = Self::default();
|
||||||
|
Ok(Some(EslPacket {
|
||||||
|
headers: headers,
|
||||||
|
payload: Some(String::from(std::str::from_utf8(payload).map_err(|_| Self::Error::MalformedUtf8)?)),
|
||||||
|
}))
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
fn head_to_map(head: &[u8]) -> Result<HashMap<String, String>, EslCodecError> {
|
||||||
|
|
||||||
|
let headers = std::str::from_utf8(&head).map_err(|_| EslCodecError::MalformedUtf8)?;
|
||||||
|
|
||||||
|
let mut result = HashMap::new();
|
||||||
|
|
||||||
|
for line in headers.lines() {
|
||||||
|
let mut it = line.split(": ");
|
||||||
|
let k = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
|
||||||
|
let v = String::from(it.next().ok_or(EslCodecError::InvalidHeaders)?);
|
||||||
|
let _ = result.insert(k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn try_parse_content_length(src: &[u8]) -> Result<Option<usize>, EslCodecError> {
|
||||||
|
if !src.starts_with(b"Content-Length: ") {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let len = b"Content-Length: ".len();
|
||||||
|
let pos = src.iter().position(|c| *c == b'\n').ok_or(EslCodecError::InvalidContentLength)?;
|
||||||
|
let src = &src[len..pos];
|
||||||
|
|
||||||
|
let str = std::str::from_utf8(&src).map_err(|_| EslCodecError::InvalidContentLength)?;
|
||||||
|
let len = str.parse::<usize>().map_err(|_| EslCodecError::InvalidContentLength)?;
|
||||||
|
|
||||||
|
Ok(Some(len))
|
||||||
|
}
|
||||||
|
|
||||||
3
src/lib.rs
Normal file
3
src/lib.rs
Normal file
|
|
@ -0,0 +1,3 @@
|
||||||
|
pub mod codec;
|
||||||
|
|
||||||
|
pub use codec::*;
|
||||||
Loading…
Add table
Add a link
Reference in a new issue