mini-redis源码分析-存储篇

概述篇对mini-redis的模块结构进行了介绍,并构建了具有日志能力的基本项目结构。存储篇将对mini-redis的数据存储进行分析,并在存储上实现其需要对外提供的功能。

提供的功能

在分析存储模块的实现前,我们首先对存储模块所需提供的功能进行分析。mini-redis 在整体上提供了两套功能,带有过期能力的KV存储、发布-订阅功能。

在KV存储方面,需要提供根据键获取值的 get 功能和提供键、值以及可选的有效期的 set 功能。同时,需要在请求-响应之外提供按照过期时间清理过期数据的能力。

在发布-订阅方面,需要提供 subscribe 功能用于订阅特定通道,从而在给定的通道上发布消息时进行接收;还需要提供 publish 功能用于向给定的通道上发布消息。

由于不同客户端的处理任务需要共享相同的存储,存储模块需要能够在不同任务间共享,并保证线程安全。

暴露的结构体和关联方法

基于对存储模块的需求分析,下面对存储模块的结构进行设计。

DbDropGuard

为了明确所有权,我们构造一个 DbDropGuard 作为存储模块的所有权的最终者,收到连接时从中衍生数据存储的引用传递给连接处理任务。构造 DbDropGuard 时,就构造真正的数据存储。DbDropGuard 被释放时,表示数据存储结构可以被释放。基于这些特点,需要为 DbDropGuard 提供 new 方法执行内部的构造任务,并实现 Drop trait 在其中对数据存储执行必要的清理工作。为了便于从 DbDropGuard 生成供连接处理任务使用的数据存储引用,提供一个 db 方法生成数据存储引用。为了能够生成这个数据存储的引用,DbDropGuard 中就需要存储这份数据存储的引用。由此我们得到下面的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#[derive(Debug)]
pub(crate) struct DbDropGuard {
db: Db, // 数据存储的引用
}
impl DbDropGuard {
pub(crate) fn new() -> DbDropGaurd {
todo!() // 创建数据存储
}
pub(crate) fn db(&self) -> Db {
self.db.clone() // 复制存储的引用供连接处理任务使用
}
}
impl Drop for DbDropGuard {
fn drop(&mut self) {
todo!() // 执行必要的清理工作
}
}

Db

DbDropGuard 作为出数据存储的持有者,由相对外层的服务器任务持有。而从中产生的存储引用才是移动到连接处理任务中最多使用的结构。为了能够在多个任务中移动,同时方便地产生新的引用,直接使用原子引用 Arc 来引用数据存储,并派生 Clone trait。这里将该结构体命名为 Db ,并在该结构体上实现连接处理任务需要用到的各种功能函数。get / set 功能的接口都非常符合直觉。发布-订阅功能方面,这里直接使用 tokio 提供的广播通道来实现发布订阅的功能,因此订阅时返回一个通道接收器。订阅后收到的消息都将在广播通道上发布,并由订阅的接收器接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#[derive(Debug, Clone)]
pub(crate) struct Db {
shared: Arc<Shared>, // 到真实的数据存储的引用
}
// Db 上提供的功能接口
impl Db {
pub(crate) fn get(&self, key: &str) -> Option<Bytes> {
todo!()
}
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
todo!()
}
pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
todo!()
}
pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize {
todo!()
}
}

为了及时清理过期的键值对存储,在数据存储创建时需要为其生成一个关联的清理任务。这个清理任务也会持有数据存储的引用,因此需要在数据存储关闭时退出。为了实现这些工作,创建一个 new 函数实现数据存储和清理任务的创建,创建一个 shutdown_purge_task 函数通知清理任务退出。

1
2
3
4
5
6
7
8
9
// 方便维护正确性提供的便利接口
impl Db {
pub(crate) fn new() -> Db {
todo!()
}
fn shutdown_purge_task(&self) {
todo!()
}
}

内部结构体

暴露的结构体和方法确定后,就可以针对提供的功能设计内部存储结构了。

键值对存储种的值类型设计。mini-redis 中存储的值均为 Bytes 类型,因此在值类型中定义 data 字段用于存储其实际值。由于每个值都可能有关联的过期时间,将可选的过期时间点也一起存储在值类型中。

1
2
3
4
struct Entry {
data: Bytes,
expires_at: Option<Instant>
}

由于发布订阅结构通过 tokio 提供的广播通道实现,不需要针对发布订阅关系设计自己的存储类型。

基于值类型和需要提供的功能,设计一个不考虑线程安全的结构并不十分困难。mini-redis 目标不是在性能上击败 Redis ,因此在存储上可以简单设计。在键值对存储方面,直接采用具有键值对存储特性的 HashMap 即可。为了快速找到需要过期的键值对,使用具有优先级队列和集合特性的 BTreeMap 来存储过期项,从而根据过期时间找到下一个需要过期的项目和对应的键。因为有 Instant 完全重复的可能,在插入过期项时将对应的键也作为数据的一部分进行保存。在实现发布订阅关系时,mini-redis 使用 tokio 提供的广播通道,为存在订阅的每个通道创建对应的 tokio 通道。为了根据通道名快速找到对应的 tokio 广播通道,也维护一个 HashMap 。考虑到 tokio 广播通道的接收方由对应的客户端连接处理任务持有,只在向通道中发布消息时需要根据通道名找到 tokio 广播通道,HashMap 中只需要存储发送器即可。接收器只需在必要时从发送器重新生成。

1
2
3
4
5
6
struct State {
entries: HashMap<String , Entry>, // 键值对存储
expirations: BTreeMap<(Instant, String)>, // 待过期项目
pub_sub: HashMap<String, broadcast::Sender<Bytes>>, // 发布订阅关系
shutdown: bool, // 数据存储是否已关闭
}

State结构体能在不考虑线程安全的情况下正常工作,但为了能在多线程环境下工作,mini-redis 使用互斥锁来保护,并形成了 Shared 结构体。考虑到清理任务需要在必要时唤醒执行,这里为 Shared 添加了 Notify 用于唤醒清理任务。

1
2
3
4
struct Shared {
state: Mutex<State>, // 互斥锁保护的数据存储
background_task: Notify, // 清理任务唤醒
}

实现

基于定义的数据结构和关联方法,内部具体实现及相关注意点如下。

数据存储的创建和后台清理任务处理

Db::new 中需要完成两项工作,Db 结构体创建和后台清理任务启动。结构体的创建只需按照结构初始化内部字段即可。后台清理任务则通过 tokio::spawn 传入后台清理函数来启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl Db {
fn new() -> Db {
let shared = Arc::new(Shared {
state: Mutex::new(State {
entries: HashMap::new(),
expirations: BTreeSet::new(),
pub_sub: HashMap::new(),
shutdown: false,
}),
background_task: Notify::new(),
});
// 启动后台清理任务
tokio::spawn(purge_expired_tasks(shared.clone()));
Db { shared }
}
}

后台清理函数 purge_expired_tasks 作为独立的函数,循环扫描过期键值对进行清理。清理任务的整体退出条件就是数据存储关闭。而一轮清理完成后,则需要等待下一个可清理的时间到来或被外部唤醒来开启下一轮的清理动作。等待的过程通过 Shared 结构体中的 Notify 来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async fn purge_expired_tasks(shared: Arc<Shared>) {
// 循环执行清理任务直到数据存储关闭
while !shared.is_shutdown() {
// 执行清理任务并取得下一个需要清理的时间
if let Some(when) = shared.purge_expired_keys() {
tokio::select! {
// 到了下一个需要清理的时间,醒过来继续清理
_ = time::sleep_until(when) => {}
// 休眠后重新被唤醒,可能插入了新过期键值,也可能数据存储已关闭
_ = shared.background_task.notified() => {}
}
} else {
// 已经没有需要过期的键值对了,休眠到被重新唤醒
shared.background_task.notified().await;
}
}
debug!("Purge background task shut down");
}

purge_expired_tasks 只是作为清理任务的整体框架来工作,实质的清理任务则需要在 Shared 上实现的 purge_expired_keys 函数来真正执行。这一真正的清理工作在完成一轮后,需要返回下一轮唤醒时间来为提高整体清理工作的效率,减少无用的唤醒。其内部也需要对数据存储关闭的情况进行处理来尽快发现这种情况并结束清理任务。值得注意的是,这一函数中使用了一个引用类型的转换来帮助借用检查器通过对代码的借用检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
impl Shared {
fn is_shutdown(&self) -> bool {
self.state.lock().unwrap().shutdown
}
fn purge_expired_keys(&self) -> Option<Instant> {
let mut state = self.state.lock().unwrap();
if state.shutdown {
// 数据存储已关闭,不需要再处理过期键值对了
return None;
}

/// 这段转换是必须的,否则无法通过借用检查。这是因为套了一层 MutexGuard 后,借用检查其将 state 作整体检查
/// state.expirations.iter() state.entries.remove(key) 中都对 state 进行借用,且 remove 部分为
/// 可变借用。同时,state.expirations.iter() 执行完成后不能及时结束对 state 的借用。
/// 标注后,借用检查器将 state.expirations 和 state.entries 视为两个独立变量,不存在借用检查的问题。
let mut state = &mut *state;

let now = Instant::now();
while let Some(&(when, ref key)) = state.expirations.iter().next() {
// 已过期的键值对清理完成,返回下一次需要清理的时间
if when > now {
return Some(when);
}
// 清理已过期键
state.entries.remove(key);
state.expirations.remove(&(when, key.clone()));
}
None
}
}

结合清理任务的执行流程,可以知道要结束清理任务,需要将 State 内部的 shutdown 设为关闭。这一功能通过 Db::shutdown_purge_task 来实现。与设计 State 数据的所有工作一样,在执行前需要竞争互斥锁。除此之外的工作就很简单,只需将 shutdown 设为 true 即可。但是,设置完成后需要唤醒可能正在休眠状态的后台清理任务以发现这一变化。在此之前,我们手动释放取得的互斥锁来减少唤醒后却无法获得互斥锁的锁竞争情况。

1
2
3
4
5
6
7
8
9
10
11
impl Db {
fn shutdown_purge_task(&self) {
// 将数据存储状态标记为已关闭
let mut state = self.shared.state.lock().unwrap();
state.shutdown = true;
// 提前释放互斥锁,减少清理任务被唤醒后却无法获得互斥锁的锁竞争
// 将前面的代码放在代码块中也能实现同样的功能,但使用drop的语义更清晰
drop(state);
self.shared.background_task.notify_one();
}
}

键值对存取

键值对的存储与获取通过 Db::get 和 Db::set 函数实现。

其中,读取键值对的 Db::get 函数只涉及读操作,因只需要获取互斥锁后取出数据并返回即可。

1
2
3
4
5
6
impl Db {
pub(crate) fn get(&self, key: &str) -> Option<Bytes> {
let state = self.shared.state.lock().unwrap();
state.entries.get(key).map(|entry| entry.data.clone())
}
}

存储键值对的 Db::set 函数则需要考虑键值对存储本身、过期项记录并考虑后台清理任务的唤醒,因此相对复杂。其整体结构思路为:插入键值对存储,必要时先删除过期项再重新插入过期项,必要时释放互斥锁并唤醒后台清理任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
impl Db {
pub(crate) fn set(&self, key: String, value: Bytes, expire: Option<Duration>) {
let mut state = self.shared.state.lock().unwrap();
let mut notify = false;
let expires_at = expire.map(|duration| {
let when = Instant::now() + duration;
notify = state.next_expiration()
.map(|expiration| expiration > when) // 新加入的键值对比之前的更早过期,需要唤醒
.unwrap_or(true); // 之前没有要过期的,产生了新过期任务,需要唤醒
when
});
// 将新的键值对插入数据存储中,必要时清理老数据的过期项
let prev = state.entries.insert(key.clone(), Entry { data: value, expires_at });
if let Some(entry) = prev {
if let Some(expires_at) = entry.expires_at {
// 同一个键上替换了新数据,且老数据设置了过期时间,需要清理过期项
state.expirations.remove(&(expires_at, key.clone()));
}
}
// 设置了过期时间的情况下,需要将新过期项插入并在必要时唤醒后台清理任务及时清理
if let Some(expires_at) = expires_at {
state.expirations.insert((expires_at, key.clone()));
}
drop(state); // 唤醒清理任务前主动释放互斥锁以减少锁竞争
if notify {
self.shared.background_task.notify_one();
}
}
}
impl State {
fn next_expiration(&self) -> Option<Instant> {
self.expirations
.iter()
.next()
.map(|expiration| expiration.0)
}
}

发布-订阅

发布-订阅关系的维护使用了 tokio 提供的广播通道来完成,内部存储只需保存通道与发送器的关系即可。

无人订阅的通道中,发送任何消息都可以直接丢弃。因此我们在 subscribe 中创建这一对应关系结构。实现时需要在通道不存在时创建广播通道。返回值则可以通过存储的发送器来生成对应的接收器供客户端使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
impl Db {
pub(crate) fn subscribe(&self, key: String) -> broadcast::Receiver<Bytes> {
use std::collections::hash_map::Entry; // 覆盖 Entry 优先级
let mut state = self.shared.state.lock().unwrap();
match state.pub_sub.entry(key) {
Entry::Occupied(entry) => {
// 通道已存在,订阅并生成即可
entry.get().subscribe()
}
Entry::Vacant(entry) => {
// 通道不存在,需要先生成广播通道并将发送器存入发布订阅列表中
let (tx, rx) = broadcast::channel(1024);
entry.insert(tx);
rx
}
}
}
}

发送消息时,只需在通道哈希表中找出通道对应的发送器,并在发送器上发送传入的消息即可。

1
2
3
4
5
6
7
8
9
impl Db {
pub(crate) fn publish(&self, key: &str, value: Bytes) -> usize {
let state = self.shared.state.lock().unwrap();
state.pub_sub
.get(key)
.map(|tx| tx.send(value).unwrap_or(0))
.unwrap_or(0)
}
}

简单测试

键值对存储及过期机制

通过 tokio::spawn 生成模拟 get 和 set 的两个任务,在同一 key 下进行读取和写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn key_value_get_set_expires() {
let db_holder = DbDropGuard::new();
let db_tx = db_holder.db();
let tx = tokio::spawn(async move {
for _ in 0..5 {
time::sleep(Duration::from_secs(1)).await;
let value = format!("{:?}", Instant::now());
db_tx.set("now".to_string(), Bytes::from(value.clone()), Some(Duration::from_secs(5)));
println!("set value: {}", value);
}
});
let db_rx = db_holder.db();
let rx = tokio::spawn(async move {
for _ in 0..15 {
time::sleep(Duration::from_secs(1)).await;
let get_value = db_rx.get("now");
let value = String::from_utf8(get_value.unwrap_or(Bytes::new()).to_vec()).unwrap_or("Error".to_string());
println!("get value: {}", value);
}
});
join!(tx, rx);
}
}

可以看到,数据的值可以在两个任务间通过 db 正常传递。

key_value_get_set_expires

发布-订阅机制

通过 tokio::spawn 生成模拟 publish 和 subscribe 的两个任务,在同一 key 下进行发布和订阅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
mod test {
#[tokio::test(flavor = "multi_thread")]
async fn key_value_pub_sub() {
let db_holder = DbDropGuard::new();
let db_pub = db_holder.db();
let tx = tokio::spawn(async move {
for _ in 0..5 {
time::sleep(Duration::from_secs(1)).await;
let value = format!("{:?}", Instant::now());
db_pub.publish("msg", Bytes::from(value.clone()));
println!("publish value: {}", value);
}
});
let db_sub = db_holder.db();
let rx = tokio::spawn(async move {
let mut receiver = db_sub.subscribe("msg".to_string());
for _ in 0..5 {
time::sleep(Duration::from_secs(1)).await;
let sub_value = receiver.recv().await;
let value = String::from_utf8(sub_value.unwrap_or(Bytes::new()).to_vec()).unwrap_or("Error".to_string());
println!("subscribe value: {}", value);
}
});
join!(tx, rx);
}
}

可以看到,消息能正常在发布者和订阅者之间传递。

key_value_pub_sub