我正在尝试实现一个 Transformer 类

public class StreamSorterByTimeStampWithDelayTransformer < V >  
    implements Transformer< Long, V, KeyValue< Long, V > > 

类的构造函数为每个实例创建一个 StateStore,因此:

this.state_store_name = "state_store_" + this.hashCode(); 
 
KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name ); 
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder =  
    Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() ); 
stream_builder.addStateStore( key_value_store_builder ); 

Transformer init 方法因此引用了 StateStore:

public void init(ProcessorContext context) { 
    this.context = context; 
    this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name ); 
    // schedule a punctuate() method every "poll_ms" (wall clock time) 
    this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME,  
            (timestamp) -> pushOutOldEnoughEntries() ); 
} 

我想我必须遗漏一个步骤,因为当调用 getStateStore 时,它 导致异常说:

Processor KSTREAM-TRANSFORM-0000000003 has no access to StateStore 

我遗漏了什么或做错了什么?

请您参考如下方法:

您需要将商店附加到变压器:

stream.transform(..., this.state_store_name); 


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!