与许多网络协议一样,Redis客户端和服务器端之间的通信可以使用帧结构作为基本的构成指令。mini-redis中命令模块在命令解析时就是从帧结构出发。本篇就对帧的解析进行分析和实现。
Redis客户端与服务端之间的通信,整体上属于请求-响应模型,但是引入了流水线、推送等一些特例情况。在Redis的请求和响应中具有一些固定的形态。在请求中,命令总是表示为批量字符串的的数组,响应则可以是帧结构中的各种类型。每种类型消息的第一个字节总是用来标识消息的类型。\r\n
作为协议中的终止符,用于划分不同部分。
下面是一些消息的格式或示例,在文章的后半部分需要对这些命令进行解析并封装为 Rust 中的数据结构。
1 2 3 4 5 6
| +OK\r\n -Error message\r\n :[<+|->]<value>\r\n $<length>\r\n<data>\r\n _\r\n *<number-of-elements>\r\n<element-1>...<element-n>
|
这些消息代表的类型分别为简单字符串、错误、整型、批量字符串、Null、数组。在 mini-redis 支持的几个命令中,利用这些帧结构即可实现支持的几个命令。
暴露的结构体和关联方法
帧解析部分需要提供的功能就是将从流中解析上面的几种类型并转换为 Rust 中的结构表示。接口中使用 Cursor 作为数据来源,可以实现从多种结构上进行解析,也便于进行测试。
Frame是对帧结构的统一抽象,根据不同帧类型的特点,得到下面的帧类型及其内部数据表示的定义。简单字符串、错误中都只能存储一个字符串,因此对应的 Simple、Error 类型内部都只有一个 String 存储,表示整型的 Integer 帧则存储一个 u64 作为整形的存储。批量字符串中存储的数据很可能是不可读的二进制串,因此使用 Bytes 进行存储。Null 类型由于不存储任何数据,因此不需要内部类型。数组类型正如其定义,数据为帧的数组,由于其长度并不固定,我们使用 Vec 进行存储即可。
1 2 3 4 5 6 7 8
| enum Frame { Simple(String), Error(String), Integer(u64), Bulk(Bytes), Null, Array(Vec<Frame>), }
|
从数据流中进行解析时,我们很可能无法判断帧类型,甚至是帧结构是否完整,因此将解析入口放在 Frame 这一结构上。对于帧结构是否完整也提供一个方法进行检查。由此引入了 check 和 parse 两个方法。
1 2 3 4 5 6 7 8
| impl Frame { fn check(src: &mut Cursor<&[u8]>) -> Result<(), Error> { todo!() } fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> { todo!() } }
|
上面的验证和解析函数中,返回结果中的错误类型 Error 均为针对解析帧解析场景特殊定义的错误类型。这里使用具体的错误类型,而不是普通的错误类型综合考虑了几个因素。其一,进行帧解析时底层数据未全部到达的情况并不能简单理解为错误,很可能在收到新的数据后能够成功完成。其次,数据不完整的情况触发频次很高,使用特化的类型而不是通用的 dyn trait 可以取得更好的性能。
这一错误结构定义如下。
1 2 3 4
| enum Error { Incomplete, Other(crate::Error), }
|
针对从字节流中解析帧结构的场景,这些函数就足够了。但在其他模块中,实际上还会构造并使用数组结构的帧。因此这里为它们提供一些辅助函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| impl Frame { pub(crate) fn array() -> Frame { Frame::Array(vec![]) } pub(crate) fn push_bulk(&mut self, bytes: Bytes) { match self { Frame::Array(vec) => { vec.push(Frame::Bulk(bytes)) } _ => panic!("not an array frame") } } pub(crate) fn push_int(&mut self, value: u64) { match self { Frame::Array(vec) => { vec.push(Frame::Integer(value)) } _ => panic!("not an array frame") } } }
|
具体实现
在解析函数中,用于检查有效性的 check 函数和执行解析并返回结果的 parse 函数具有相似的结构。parse 函数实现如下所示,其中利用了 get_xxx 和 peek_xxx 两类函数作为对 cursor 的追追缠线操作动作。get_xxx 系列从 cursor 中读取需要的类型的数据,并将 cursor 游标推进到读取完成后的位置。peek_xxx 系列从 cursor 中读取数据,但保持游标位置,可以重新对其中的内容进行读取。如下面的代码所示,parse 函数结构上首先读取一个字节,并根据该字节按 Redis 协议转到不同的类型进行处理。另外,skip 函数根据参数将游标前进给定的步数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| fn parse(src: &mut Cursor<&[u8]>) -> Result<Frame, Error> { match get_u8(src)? { b'+' => { let line = get_line(src)?.to_vec(); let string = String::from_utf8(line)?; Ok(Frame::Simple(string)) } b'-' => { let line = get_line(src)?.to_vec(); let string = String::from_utf8(line)?; Ok(Frame::Error(string)) } b':' => { let len = get_decimal(src)?; Ok(Frame::Integer(len)) }, b'$' => { if b'-' == peek_u8(src)? { let line = get_line(src)?; if line != b"-1" { return Err("protocol error; invalid frame format".into()) } Ok(Frame::Null) } else { let len = get_decimal(src)?.try_into()?; let n = len + 2; if src.remaining() < n { return Err(Error::Incomplete); } let data = Bytes::copy_from_slice(&src.chunk()[..len]); skip(src, n)?; Ok(Frame::Bulk(data)) } } b'*' => { let len = get_decimal(src)?.try_into()?; let mut out = Vec::with_capacity(len); for _ in 0..len { let frame = Frame::parse(src)?; out.push(frame); } Ok(Frame::Array(out)) } actual => Err(format!("protocol error; invalid frame type byte `{}`", actual).into()) } }
|
get 系列解析函数
这些函数的实现典型情况如下面的 get_line 函数所示。在 get_line 函数中,读取游标位置并获取底层数据切片。从数据切片中检查是否出现行尾结束符,从而确定行结束位置以及行内容。这里,使用了生命周期泛型来明确输入输出数据的生存时间关系。这里需要使用生命周期泛型的原因是函数的输入和输出都使用了引用的形式,因此需要为编译器提供信息证明引用的存活关系。在这个具体的情境中,返回值的引用是从输入的引用中衍生的,因此返回值的生存时间不能超出输入的生存时间。因此使用相同的生命周期泛型注解 'a
告诉编译器它们具有相同的生存时间。
1 2 3 4 5 6 7 8 9 10 11 12 13
| fn get_line<'a>(src: &mut std::io::Cursor<&'a [u8]>) -> Result<&'a [u8], Error> { let start = src.position() as usize; let end = src.get_ref().len() - 1;
for i in start..end { if src.get_ref()[i] == b'\r' && src.get_ref()[i + 1] == b'\n' { src.set_position((i + 2) as u64); return Ok(&src.get_ref()[start..i]); } } Err(Error::Incomplete) }
|
peek 系列解析函数
这些函数的实现典型情况如下面的 peek_u8 函数所示。在 peek_u8 函数中,读取数据时使用的是 chunk 获取的引用,并从中获取 u8 的拷贝。没有使用任何会导致游标前进的函数。
1 2 3 4 5 6
| fn peek_u8(src: &mut Cursor<&[u8]>) -> Result<u8, Error> { if !src.has_remaining() { return Err(Error::Incomplete); } Ok(src.chunk()[0]) }
|
skip 函数
skip 函数将游标前进指定的步数。在特定场景下,对游标中的后续部分已读取完成,因此可以直接跳过。
1 2 3 4 5 6 7
| fn skip(src: &mut Cursor<&[u8]>, n: usize) -> Result<(), Error> { if src.remaining() < n { return Err(Error::Incomplete); } src.advance(n); Ok(()) }
|
错误转换
在 check 函数和 parse 函数中,使用了大量的 ?
语法糖将错误返回。而这些错误都需要转换为统一的错误类型,因此为可能返回的真实错误类型实现向 Error 转换的 From trait. 如下面的实现,将整数解析中返回的错误类型转换为 Error 类型。
1 2 3 4 5
| impl From<TryFromIntError> for Error { fn from(_src: TryFromIntError) -> Self { "protocol error; invalid frame format".into() } }
|
简单测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| #[cfg(test)] mod tests { use std::io::Cursor; use crate::frame::{Error, Frame};
#[test] fn parse_incomplete() { let mut cursor = Cursor::new(&b"+"[..]); match Frame::check(&mut cursor) { Ok(_) => unreachable!(), Err(Error::Other(_)) => unreachable!(), _ => () } }
#[test] fn parse_string() { let mut cursor = Cursor::new(&b"+123\r\n"[..]); assert_eq!(Frame::Simple("123".to_string()), Frame::parse(&mut cursor).unwrap()); }
#[test] fn parse_array() { let mut cursor = Cursor::new(&b"*2\r\n+123\r\n:666\r\n"[..]); let array = Frame::Array(vec![Frame::Simple("123".to_string()), Frame::Integer(666)]); assert_eq!(array, Frame::parse(&mut cursor).unwrap()); } }
|
参考资料
Redis serialization protocol specification | Docs