Codec
As I've eluded to, the framed/chunked/message based APIs of Websockets or HTTP2, or even TLS for that matter, will end up being sent over a byte-stream write. Similarly, a byte-stream reader will end up being buffered then parsed into messages.
Because this is so common in many protocols (HTTP2, TLS, Postgres, MySQL, Websockets), I want to share a great utility
provided by tokio-util
called codec
.
Sending data
#![allow(unused)] fn main() { let mut stream = TcpStream::connect(...).await?; // create a new buffer let mut buf = BytesMut::new(); // receive some message from a stream while let Some(msg) = recv_msg().await { // encode the message into our buffer msg.encode_into(&mut buf); // split the buffer in two. // `msg_data` contains all data written from the previous encoding // `buf` will now be empty // this will not need to allocate let msg_data = buf.split(); // turn this buffer into a shared buffer let msg_data = msg_data.freeze(); // send the data chunk to the tcp stream. stream.write_all(msg_data).await; } }
Receiving data
#![allow(unused)] fn main() { let mut stream = TcpStream::connect(...).await?; // create a new buffer let mut buf = BytesMut::new(); loop { // read some data into our buffer if stream.read_buf(&mut buf).await? == 0 { break; } // try find where our next message separator is while let Some(end_of_msg) = peek_msg(&buf) { // split the buffer in two. // `msg_data` contains all the data for a single message // `buf` will be advanced forward to not contain that message // this will not need to allocate let msg_data = buf.split_to(end_of_msg); // turn this buffer into a shared buffer let msg_data = msg_data.freeze(); // parse the data and process let msg = parse_msg(msg_data)?; handle(msg).await; } } }
tokio_util::codec
allows you to abstract the "encode"/"peek"/"parse" APIs into an Encoder
and Decoder
trait. They then provide the types FramedWrite
and FramedRead
as appropriate
to convert from your messages/chunks/frames into a byte stream, and vice versa.
Sending data
#![allow(unused)] fn main() { let stream = TcpStream::connect(...).await?; let mut writer = FramedWrite::new(stream, MyCodec); // receive some message from a stream while let Some(msg) = recv_msg().await { writer.send(msg).await?; } }
Receiving data
#![allow(unused)] fn main() { let stream = TcpStream::connect(...).await?; let mut reader = FramedRead::new(stream, MyCodec); // receive some message from a stream while let Some(msg) = reader.try_next().await? { handle(msg).await; } }
Of course, I've skipped over how MyCodec
works, but it's a couple traits to implement
that defines the functions like encode_into
and parse_msg
that I didn't define above,
and it helps clean up the core logic into a cleaner abstraction.
Let's see what a codec might look like for JSONLines using serde
and serde_json
.
If you're not familiar, JSONLines is a very simple modification to json:
{"foo": "bar"}
{"foo": "baz"}
Each entry is on a separate line, and there are no new-lines in the individual json entry encoding.
Encoding
Encoding data is always easier than decoding, as you don't have to program so defensively. Here's how it might look.
#![allow(unused)] fn main() { struct JsonLinesCodec<S>(PhantomData<S>); impl<S: Serialize> Encoder<S> for JsonLinesCodec<S> { type Error = std::io::Error; fn encode(&mut self, item: S, dst: &mut BytesMut) -> Result<(), Self::Error> { // json encode the item into the buffer serde_json::to_writer(dst.writer(), &item)?; // add new-line separator dst.put_u8(b'\n'); Ok(()) } } }
Decoding
Although I suggested that decoding can be more challenging, it's still reasonably easy.
#![allow(unused)] fn main() { struct JsonLinesCodec<S>(PhantomData<S>); impl<S: DeserializeOwned> Decoder for JsonLinesCodec<S> { type Item = S; type Error = std::io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> { let Some(new_line) = src.iter().position(|&b| b == b'\n') else { // not enough data ready yet return Ok(None); }; // remove full line from the buffer let line_end = new_line + 1; let json_line = src.split_to(line_end).freeze(); // parse the json line. let item = serde_json::from_slice(&json_line)?; Ok(Some(item)) } } }