我正在为一个事件溯源应用程序建模,遇到一个概念上的疑问,我将使用一个典型的购物领域来展示它:

假设一个客户主题接收以下类型的事件:

CustomerCreated id = x, name= xxx, address = xxx 
CustomerUpdated id = x, name = xxx 
CustomerUpdated id = x, address = xxx 

请注意,更新事件不一定会更改/通知所有客户字段。

我正在使用 KTable 实现这个主题并使用它的存储来运行交互式查询:

KTable<Integer, Customer> customers = builder.table(Topics.CUSTOMER.keySerde(), Topics.CUSTOMER.valueSerde(), Topics.CUSTOMER_STORE.name()); 

假设会有很多客户,我想使用一个紧凑的客户主题。这不适用于恢复,因为压缩主题采用中间消息,在我的情况下,该消息不能包含客户的全部信息(它可能是包含部分信息的更新事件)。

根据 KStreamBuilder.table 的 javadoc,创建的 KTable 存储不是更改日志,因此它是从原始主题恢复的。

The resulting KTable will be materialized in a local KeyValueStore with the given storeName. However, no internal changelog topic is created since the original input topic can be used for recovery  

在我的例子中,我如何才能为客户提供一个紧凑的主题,同时拥有一个根据该主题创建的商店,该商店可以使用客户的完整信息进行恢复?

请您参考如下方法:

正如您正确指出的那样,您的输入主题无法压缩,因为每个更新记录都被解释为对前一个记录的覆盖,因此必须是“完整”更新(更改日志主题不支持“部分”更新)。

将主题读取为 KTable 遵循相同的语义,并将通过“放入”操作将主题具体化为键值存储(逻辑删除作为删除执行)。

如果您想使用 Kafka Streams 进行部分更新,您可以通过将输入主题读取为 KStream 来使用聚合:

KTable table = builder.stream(...).groupByKey().aggregate(...); 

这允许您使用可以执行部分​​更新的自定义 Aggregator。对于每条输入记录,您将获得旧/当前状态和当前输入记录(即可能的部分更新),Aggregator 返回新(更新)状态。这为您提供了最大的灵 active ,您可以根据需要更新状态。

对于这种情况,不需要压缩输入主题。结果 KTable 将由更新日志主题支持,该主题包含带有状态完整副本的更新记录。此更改日志主题将自动配置日志压缩,因此永远不会丢失其状态。

您还可以将生成的更改日志主题写入应配置日志压缩的输出主题:

table.toStream().to(...); 

您可能希望通过参数 Materialized 在聚合步骤中禁用缓存。有关详细信息,请参阅文档:https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!