假设我有一个使用自定义对象列表的 API 操作。对于这些对象中的每一个,它都会调用一个创建 Mono 的服务方法。如何以惯用且非阻塞的方式从这些 Mono 对象创建 Flux?

我现在想出的是这个。我更改了方法名称以更好地反射(reflect)其预期目的。

fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject> { 
 
    return Flux.create { sink -> 
        customObjs.forEach { 
 
            service.persistAndReturnMonoOfCustomObject(it).map { 
                sink.next(it) 
            } 
        } 
        sink.complete() 
    } 
} 

此外,我是否需要订阅通量才能真正让它返回一些东西?

请您参考如下方法:

我相信你可以使用 concat() 代替:

/** 
 * Concatenate all sources provided as a vararg, forwarding elements emitted by the 
 * sources downstream. 
 * <p> 
 * Concatenation is achieved by sequentially subscribing to the first source then 
 * waiting for it to complete before subscribing to the next, and so on until the 
 * last source completes. Any error interrupts the sequence immediately and is 
 * forwarded downstream. 
 * <p> 
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" alt=""> 
 * <p> 
 * @param sources The {@link Publisher} of {@link Publisher} to concat 
 * @param <T> The type of values in both source and output sequences 
 * 
 * @return a new {@link Flux} concatenating all source sequences 
 */ 
@SafeVarargs 
public static <T> Flux<T> concat(Publisher<? extends T>... sources) { 

或者merge():

/** 
 * Merge data from {@link Publisher} sequences contained in an array / vararg 
 * into an interleaved merged sequence. Unlike {@link #concat(Publisher) concat}, 
 * sources are subscribed to eagerly. 
 * <p> 
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" alt=""> 
 * <p> 
 * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with 
 * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source 
 * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to 
 * another source. 
 * 
 * @param sources the array of {@link Publisher} sources to merge 
 * @param <I> The source type of the data sequence 
 * 
 * @return a merged {@link Flux} 
 */ 
@SafeVarargs 
public static <I> Flux<I> merge(Publisher<? extends I>... sources) { 


评论关闭
IT序号网

微信公众号号:IT虾米 (左侧二维码扫一扫)欢迎添加!