命令模块接收服务端库从客户端连接中读取的帧结构,尝试按照 Redis 支持的命令格式进行解析,从而得到 Redis 命令。成功解析得到命令后,按照 Redis 命令的语义执行动作,读写存储模块并向连接中写入帧结构作为命令的响应。
命令枚举及命令上的通用动作
在 mini-redis 项目中,实现了 Redis 的 Get、Set 等几项命令。这些命令接收不同的参数,具有不同的语义,具体的实现上也有不同。所以将支持的命令定义为枚举类型,执行命令时根据命令枚举分发到不同的类型上。但为了使用上的简便性,将这些不同命令调用包装为统一的对外接口。
命令调用提供的统一对外接口定义如下。总体上共两个方法:from_frame 从帧数据中解析为一条命令;apply 方法消费命令并按命令语义进行执行,执行时可能需要读写存储库,向连接中写入帧结构,也可能需要接收服务关闭的消息以便在必要时释放资源。这些方法最终均转向各具体命令提供的实现。
1 2 3 4 5 6 7 8
| impl Command { fn from_frame(frame: Frame) -> crate::Result<Command> { todo!() } async fn apply(self, db: &Db, dst: &mut Connection, shutdown: &mut Shutdown) -> crate::Result<()> { todo!() } }
|
命令实现
每种命令中具有各自不同的实现逻辑。下面对支持的典型命令 Get 和 Subscribe 进行分析。
Get
Get 命令中存储 String 类型的键,并提供下面的方法。其中包括创建方法 new 、从解析器中继需解析命令的 parse_frames 方法。执行 Get 命令,从数据存储中取得键对应的值并将响应写回连接中的 apply 方法。用于客户端的命令帧构造方法 into_frame 。
创建方法实现如下。Get 命令中所需的参数仅有字符串类型的 key 。创建时传入即可。
1 2 3 4 5 6 7 8 9 10
| pub struct Get { key: String, } impl { fn new(key: impl ToString) -> Get { Get { key: key.to_string(), } } }
|
解析命令的 parse_frames 方法实现如下。由于调用 parse_frames 方法前,已经读取了命令中的第一个参数,因此方法中读取的第一个元素即为键值对存储所需的键。使用这一参数即可构造完整的 Get 命令。
1 2 3 4
| pub fn parse_frames(parse: &mut Parse) -> crate::Result<Get> { let key = parse.next_string()?; Ok(Get { key }) }
|
Get 命令的执行方法 apply 如下。Get 命令执行时需要从存储模块中获取对应的值。当值存在时就作为批量字符串的帧类型返回,不存在时直接返回 Null 帧。这些帧需要写回到连接中。这里使用了 tracing 中的 instrument 宏来记录方法的执行记录。
1 2 3 4 5 6 7 8 9 10 11 12 13
| #[instrument(skip(self, db, dst))] pub async fn apply(self, db: &Db, dst: &mut Connection) -> crate::Result<()> { let response = if let Some(value) = db.get(&self.key) { Frame::Bulk(value) } else { Frame::Null }; debug!(?response); dst.write_frame(&response).await?; Ok(()) }
|
命令构造方法 into_frame 实现如下。按 Redis 协议,Get 命令作为数组类型的帧发送,内部由 get 字符串帧和表示键的字符串帧组成。
1 2 3 4 5 6 7
| pub fn into_frame(self) -> Frame { let mut frame = Frame::array(); frame.push_bulk(Bytes::from("get".as_bytes())); frame.push_bulk(Bytes::from(self.key.into_bytes())); frame }
|
Subscribe
Subscribe 命令存储需要订阅的通道,命令执行时的应用逻辑 apply 函数如下所示。执行命令的通用函数中 shutdown 参数便用于 Subscribe 命令。当客户端连接执行 Subscribe 命令后,可用的命令范围便会收窄,只能执行 Subscribe 或 UnSubscribe 命令,同时在收到消息后发送给客户端。因此,在进入 apply 函数后,剩下的工作都在函数内部进行,在连接处理结束前都不会退出。其中使用 StreamMap 维护订阅的消息通道情况。对于命令中给出的需要加入订阅的通道名,使用单独的函数执行加入订阅的逻辑。完成订阅后,收到关闭信号、收到消息发布、收到新命令都是连接变动的触发点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #[instrument(skip(self, db, dst, shutdown))] pub async fn apply(mut self, db: &Db, dst: &mut Connection, shutdown: &mut Shutdown) -> crate::Result<()> { let mut subscriptions = StreamMap::new(); loop { for channel_name in self.channels.drain(..) { subscribe_to_channel(channel_name, &mut subscriptions, db, dst).await?; } select! { Some((channel_name, msg)) = subscriptions.next() => { dst.write_frame(&make_message_from_frame(channel_name, msg)).await?; } res = dst.read_frame() => { let frame = match res? { Some(frame) => frame, None => return Ok(()), }; handle_command(frame, &mut self.channels, &mut subscriptions, dst).await?; } _ = shutdown.recv() => { return Ok(()); } } } }
|
处理新增订阅的方法 subscribe_to_channel 方法如下。执行订阅时,调用存储模块即可产生一个接收器。命令模块中将接收器转换进行简单转换后即可加入连接中已订阅列表。并在通道中收到消息时得到通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| async fn subscribe_to_channel( channel_name: String, subscriptions: &mut StreamMap<String, Messages>, db: &Db, dst: &mut Connection, ) -> crate::Result<()> { let mut rx = db.subscribe(channel_name.clone()); let rx = Box::pin(async_stream::stream! { loop { match rx.recv().await { Ok(msg) => yield msg, Err(broadcast::error::RecvError::Lagged(_)) => {}, Err(_) => break, } } }); subscriptions.insert(channel_name.clone(), rx); let response = make_subscribe_frame(channel_name, subscriptions.len()); dst.write_frame(&response).await?; Ok(()) }
|
处理从客户端接收的新命令的方法 handle_command 如下。根据从连接中解析得到的命令类型,执行不同的动作。收到 Subscribe 命令时将传入的通道加入订阅命令列表,下一轮检查时就能加入连接管关注的通道中。收到 UbSubscribe 命令则会将通道从订阅中移除。其他命令则不可用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| async fn handle_command( frame: Frame, subscribe_to: &mut Vec<String>, subscriptions: &mut StreamMap<String, Messages>, dst: &mut Connection, ) -> crate::Result<()> { match Command::from_frame(frame)? { Command::Subscribe(subscribe) => { subscribe_to.extend(subscribe.channels.into_iter()); } Command::UnSubscribe(mut unsubscribe) => { if unsubscribe.channels.is_empty() { unsubscribe.channels = subscriptions.keys() .map(|channel_name| channel_name.to_string()) .collect(); } for channel_name in unsubscribe.channels { subscriptions.remove(&channel_name); let response = make_unsubscribe_frame(channel_name, subscriptions.len()); dst.write_frame(&response).await?; } } command => { let cmd = Unknown::new(command.get_name()); cmd.apply(dst).await?; } } Ok(()) }
|