计算机科学中的两个难题:缓存失效和缓存雪崩
计算机科学中有两个经典难题:缓存失效、命名问题和差一错误。这个常被归功于Phil Karlton的经典笑话,突显了软件开发人员面临的一个非常真实且持久的挑战。我们不断努力构建更快、响应更迅速的系统,而缓存是实现这一目标的基本策略。但是,虽然缓存提供了显著的性能提升,但它引入了一个复杂的新问题:如何确保缓存的数据始终新鲜和准确?这个挑战被称为缓存失效,如果处理不当,可能导致向用户提供过时数据,或者在最坏情况下,触发称为缓存雪崩的灾难性连锁反应。
在本文中,我们将尝试使用Debezium平台在Valkey中解决这些问题。
什么是缓存失效和缓存雪崩?
在进一步探讨这些问题之前,我们必须理解"缓存失效"和"缓存雪崩"是什么。
Valkey的一个常见用例是数据库查询缓存,您将数据库查询结果存储在Valkey中,以提高请求的处理时间并减少数据库系统的负载。但是由于Valkey是与数据库分离的系统,它如何知道其缓存的查询结果何时更新并开始提供新数据?这就是缓存失效:当对数据集进行更改时,例如对数据库执行UPDATE语句,我们需要一种方法来使Valkey中存储的数据失效,以确保反映更新的数据。
如果缓存未失效,则存在向用户显示过时信息的风险,这可能导致混淆甚至隐私问题。
缓存失效通常通过为缓存数据设置过期日期来处理。当数据不存在于缓存中(缓存未命中)时,要么因为条目已过期并从Valkey中删除,要么它最初就不存在于Valkey中,应用程序将从数据库层获取它,并将其存储在缓存中以供将来使用。
但是,如果同时出现太多缓存未命中,要么因为多个缓存条目同时过期,要么因为太多会话请求相同的过期条目,将导致数据库负载急剧增加。在最坏的情况下,这将导致性能下降或崩溃,因为每个连接都将尝试从数据库更新缺失的缓存条目。这个问题被称为缓存雪崩。
使用变更数据捕获解决问题
变更数据捕获(CDC)是捕获数据更改的过程/设计模式,例如在MySQL数据库中执行INSERT、UPDATE和DELETE语句。然后可以将这些更改应用于其他数据存储,如数据仓库和数据湖,实现实时数据处理并提供时间敏感的洞察。
CDC也可用于更新缓存,这是本文讨论的用例。
使用Debezium平台设置从MySQL到Valkey的CDC管道
从Debezium文档中:Debezium是一组分布式服务,用于捕获数据库中的更改,以便您的应用程序可以看到这些更改并对其作出响应。Debezium在每个数据库表的更改事件流中记录所有行级更改,应用程序只需读取这些流即可按发生顺序查看更改事件。
Debezium是一个流行的开源CDC解决方案。它支持从广泛使用的数据库系统(如MySQL、PostgreSQL、MongoDB等)捕获数据。Debezium提供debezium-api模块,允许我们在Java项目中轻松配置Debezium连接器。
对于演示,我们将设置一个小的Java程序,将更改从MySQL流式传输到Valkey作为JSON对象。
依赖项
要开始演示项目,我们需要在系统上安装一些东西:
- OpenJDK:对于本文,我使用的是JDK版本17
- Apache Maven:用于管理Java项目依赖项
- Docker:用于部署MySQL和Valkey实例
对于此演示,我们需要使用Maven为Debezium、MySQL和Valkey添加依赖项。这需要将以下内容添加到应用程序的POM中,其中${version.debezium}是您使用的Debezium平台版本,或者是包含Debezium版本字符串的Maven属性,对我来说是3.3.0.Alpha2 - 撰写本文时的最新版本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.valkey</groupId>
<artifactId>valkey-java</artifactId>
<version>LATEST</version>
</dependency>
|
在代码中定义MySQL连接
我们将首先定义MySQL连接器的配置,该连接器连接到在localhost:3306上运行的实例,用户为’mysqluser’。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.flush.interval.ms", "60000");
// 定义数据库连接
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "<your actual password>");
props.setProperty("database.server.id", "85744");
// 定义连接器元数据存储
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/tmp/schemahistory.dat");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/tmp/offsets.dat");
// 禁用ChangeEvent中的模式部分以获得更小的消息
props.setProperty("value.converter.schemas.enable", "false");
props.setProperty("key.converter.schemas.enable", "false");
|
当连接器运行时,它从源读取信息并定期记录"偏移量",这些偏移量定义了它已处理了多少信息。如果进程重新启动,它可以从停止的地方继续,防止重复消息,如果不小心处理,可能会影响数据完整性。
Debezium MySQL连接器读取服务器的二进制日志,其中包括对数据库进行的所有数据更改和模式更改。由于对数据的所有更改都是根据记录更改时所属表的模式结构化的,因此连接器需要跟踪所有模式更改,以便它可以正确解码更改事件。连接器记录模式信息,以便在连接器重新启动并从最后记录的偏移量恢复读取时,它确切地知道该偏移量处的数据库模式是什么样子。
在此演示中,我们将偏移量信息和数据库模式历史记录都存储为系统上的本地文件,偏移量存储在/tmp/offsets.dat,模式历史记录存储在/tmp/schemahistory.dat。
最后,对于CDC引擎在不同数据库系统之间自动准确同步数据,它需要了解它正在同步的数据的元数据/模式的几件事,即列/字段的数据类型是什么,列应该有多大等。因此,需要有一个模式供引擎识别数据库的结构,或者它最近是否已更改,以便尽可能准确地复制数据。但是在像将更改流式传输到非RDBMS数据存储的情况下,我们不需要那些模式,因此可以从事件中禁用/删除它们,以获得更小的消息和更快的处理速度。
将更改事件打印到控制台
指定配置后,我们可以创建DebeziumEngine的实例。此对象将每10毫秒轮询MySQL服务器一次,并将每个捕获的ChangeEvent打印到控制台。
1
2
3
4
5
6
7
8
|
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class)
.using(props)
.notifying(r -> {
System.out.println(r.value());
})
.build();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(engine, 0, 10, TimeUnit.MILLISECONDS);
|
控制台的输出将类似于这样:
1
2
3
4
5
|
Sep 15, 2025 11:33:07 AM com.github.shyiko.mysql.binlog.BinaryLogClient requestBinaryLogStreamMysql
INFO: Requesting streaming from position filename: binlog.000002, position: 3258
Sep 15, 2025 11:33:07 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to localhost:3306 at binlog.000002/3258 (sid:85744, cid:26)
{"before":null,"after":{"id":1,"value":"hello world"},"source":{"version":"3.3.0.Alpha2","connector":"mysql","name":"my-app-connector","ts_ms":1757910721000,"snapshot":"false","db":"test","sequence":null,"ts_us":1757910721000000,"ts_ns":1757910721000000000,"table":"t","server_id":1,"gtid":null,"file":"binlog.000002","pos":3467,"row":0,"thread":16,"query":null},"transaction":null,"op":"c","ts_ms":1757910787191,"ts_us":1757910787191237,"ts_ns":1757910787191237000}
|
使用JSON.SET将更改事件写入Valkey
虽然JSON已成为Redis的内置数据类型,但对于Valkey来说还不是这样。因此,客户端库不会为我们提供JSON命令。但是我们可以使用ProtocolCommand接口快速实现它。
1
2
3
4
5
6
7
8
9
10
11
|
public enum Command implements ProtocolCommand {
JSON_SET("JSON.SET");
private final byte[] raw;
Command(String str) {
raw = str.getBytes();
}
@Override
public byte[] getRaw() {
return raw;
}
}
|
然后我们可以将更改事件作为JSON对象写入Valkey:
1
2
3
4
5
6
7
8
9
10
11
|
String valkeyHostname = "localhost";
int valkeyPort = 6379;
JedisPool jedisPool = new JedisPool(valkeyHostname, valkeyPort);
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class)
.using(props)
.notifying(r -> {
jedisPool.getResource().sendCommand(Command.JSON_SET, r.key(), ".", r.value());
})
.build();
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(engine, 0, 10, TimeUnit.MILLISECONDS);
|
在写入Valkey之前修改更改事件
查看JSON.SET命令的输出,我们可以看到虽然更改事件确实出现在Valkey上,但数据不是很有用:键没有告诉我们它属于什么表/键模式,值包含不必要的信息(例如,使用键的应用程序很可能不需要知道binlog详细信息)。
1
2
3
4
|
127.0.0.1:6379> keys *
1) "{"id":1}"
127.0.0.1:6379> JSON.GET "{"id":1}"
"{"before":null,"after":{"id":1,"value":"hello world"},"source":{"version":"3.3.0.Alpha2","connector":"mysql","name":"my-app-connector","ts_ms":1757911355000,"snapshot":"false","db":"test","sequence":null,"ts_us":1757911355000000,"ts_ns":1757911355000000000,"table":"t","server_id":1,"gtid":null,"file":"binlog.000002","pos":5588,"row":0,"thread":16,"query":null},"transaction":null,"op":"c","ts_ms":1757911355791,"ts_us":1757911355791396,"ts_ns":1757911355791396000}"
|
如果我们需要在将ChangeEvent记录写入缓存之前对其进行更高级的处理(例如,将记录键转换为<table-name>:<primary-key-value>等格式,或从记录中删除更改事件元数据),我们可以使用io.debezium.engine.DebeziumEngine.ChangeConsumer<R>来实现。
在以下示例中,我们将转换记录,以便:
- ChangeEvent键将采用
<table-name>:<id>格式
- 从ChangeEvent值中删除所有元数据
- 如果我们正在处理的事件是DELETE语句(操作类型:“d”),则从缓存中删除键
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class ValkeyChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
@Override
public void handleBatch(
List<ChangeEvent<String, String>> list,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> recordCommitter
) throws InterruptedException {
for (ChangeEvent<String, String> event : list) {
try {
JSONObject recV = new JSONObject(event.value());
String opType = recV.getString("op");
String key;
if (opType.equalsIgnoreCase("d")) {
key = String.valueOf(recV.getJSONObject("before").get("id"));
} else {
key = String.valueOf(recV.getJSONObject("after").get("id"));
}
String tableName = recV.getJSONObject("source").getString("table");
String valkeyK = String.format("%s:%s", tableName, key);
if (opType.equalsIgnoreCase("d")) {
jedisPool.getResource().del(valkeyK);
} else {
String valkeyV = recV.getJSONObject("after").toString();
jedisPool.getResource().sendCommand(Command.JSON_SET, valkeyK, ".", valkeyV);
}
recordCommitter.markProcessed(event);
} catch (Exception e) {
e.printStackTrace();
}
}
recordCommitter.markBatchFinished();
}
}
|
然后我们可以通过将ValkeyChangeConsumer的实例传递给notifying API来使用它
1
2
3
4
|
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class, Json.class)
.using(props)
.notifying(new ValkeyChangeConsumer())
.build();
|
更改事件现在在Valkey上呈现得更好
1
2
3
4
5
|
127.0.0.1:6379> keys *
1) "t:1"
127.0.0.1:6379> JSON.GET t:1
"{"id":1,"value":"hello world"}"
|
将所有内容整合在一起
Java程序的源代码可在Percona Lab GitHub帐户上找到:https://github.com/Percona-Lab/valkey-cdc-debezium
首先,我们将部署MySQL和Valkey实例。
- 对于MySQL,我们将创建用户mysqluser
- 对于Valkey,我们将使用valkey-bundle Docker镜像,其中包括valkey-json、valkey-search和valkey-ldap模块。
1
2
3
4
|
docker run --name mysql -e MYSQL_ROOT_PASSWORD=<your actual password> -p 3306:3306 -d percona/percona-server:8.4
docker exec -ti mysql mysql -uroot -pmy-secret-pw -e "CREATE USER 'mysqluser'@'%' IDENTIFIED BY '<your actual password>';"
docker exec -ti mysql mysql -uroot -pmy-secret-pw -e "GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'mysqluser'@'%';"
docker run --name valkey -p 6379:6379 -d valkey/valkey-bundle:9.0-rc1-alpine
|
下载Java程序源代码并编译它。在编译之前,请记得更新数据库密码配置键。
1
2
3
4
|
wget https://github.com/Percona-Lab/valkey-cdc-debezium/archive/refs/heads/main.zip
unzip main.zip
cd valkey-cdc-debezium-main
mvn clean package
|
运行程序
1
|
java -jar target/dbewithvalkey-1.0-SNAPSHOT.jar
|
总结
这种变更数据捕获(CDC)与Valkey的集成为管理缓存失效和雪崩问题提供了显著的好处。通过利用Debezium Engine将数据库更改实时流式传输到Valkey,应用程序可以确保其缓存数据始终是最新的,降低了提供过时信息的风险和缓存雪崩发生的风险。
进一步阅读
- Debezium Engine文档:https://debezium.io/documentation/reference/stable/development/engine.html