我有一个事件流,我需要与 ktable/changelog 主题进行匹配,但匹配是通过对 ktable 条目的属性进行模式匹配来完成的。所以我无法根据 key 加入流,因为我还不知道哪个匹配。
例子:
表X:
{
[abc]: {id: 'abc', prop: 'some pattern'},
[efg]: {id: 'efg', prop: 'another pattern'}
}
流A:
{ id: 'xyz', match: 'some pattern'}
所以 流 A 应该转发类似
{match: 'abc'}
的内容
所以我基本上需要遍历 ktable 条目并通过此属性的模式匹配找到匹配的条目。
创建基于 ktable 的全局状态存储然后从处理器 API 访问它并迭代条目是否可行?
我还可以将 ktable 的所有条目聚合到 1 个集合中,然后加入一个“假”键?但这似乎也相当hacky。
或者我只是强制一些不是真正流的东西,而只是将它放入带有普通消费者 API 的 redis 缓存中,这也有点尴尬,因为我宁愿让 RocksDB 支持它。
编辑:我想这与 this question 有点相关
请您参考如下方法:
一个 GlobalKTable
不起作用,因为流全局表连接允许您从流中提取非键连接属性——但对表的查找仍然基于表键。
但是,您可以将表输入主题阅读为 KStream
,提取 join 属性,将其设置为键,并进行聚合返回集合(即 List、Set 等)。这样,您可以对键进行流表连接,然后是 flatMapValues()
(或 flatMap()
)将连接结果拆分为多个记录(取决于表集合中有多少记录)。
只要您的 join 属性没有太多重复项(对于表输入主题),因此表中的值侧集合不会变得太大,这应该可以正常工作。您需要提供自定义值-Serde 来(反)序列化集合数据。