当前位置:   article > 正文

java 缓冲流 刷新_java – 缓冲和刷新Apache Beam流数据

apache beam 缓存流

这是每个密钥和窗口

state和

timers的新功能的典型用例.

状态在a Beam blog post中描述,而对于计时器,您将不得不依赖于Javadoc.没关系javadoc所说的支持他们的跑步者,真实的状态可以在Beam的capability matrix中找到.

该模式非常类似于您所编写的模式,但是状态允许它与窗口以及捆绑包一起使用,因为它们在流式传输中可能非常小.由于必须以某种方式对state进行分区以保持并行性,因此您需要添加某种键.目前没有自动分片.

private static final class Function extends DoFn, Void> implements Serializable {

private static final long serialVersionUID = 2417984990958377700L;

private static final int LIMIT = 500;

@StateId("bufferedSize")

private final StateSpec> bufferedSizeSpec =

StateSpecs.value(VarIntCoder.of());

@StateId("buffered")

private final StateSpec> bufferedSpec =

StateSpecs.bag(StringUtf8Coder.of());

@TimerId("expiry")

private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement

public void processElement(

ProcessContext context,

BoundedWindow window,

@StateId("bufferedSize") ValueState bufferedSizeState,

@StateId("buffered") BagState bufferedState,

@TimerId("expiry") Timer expiryTimer) {

int size = firstNonNull(bufferedSizeState.read(), 0);

bufferedState.add(context.element().getValue());

size += 1;

bufferedSizeState.write(size);

expiryTimer.set(w.maxTimestamp().plus(allowedLateness));

if (size > LIMIT) {

flush(context, bufferedState, bufferedSizeState);

}

}

@OnTimer("expiry")

public void onExpiry(

OnTimerContext context,

@StateId("bufferedSize") ValueState bufferedSizeState,

@StateId("buffered") BagState bufferedState) {

flush(context, bufferedState, bufferedSizeState);

}

private void flush(

Context context,

BagState bufferedState,

ValueState bufferedSizeState) {

Iterable buffered = bufferedState.read();

// build batch request from buffered

...

// clear things

bufferedState.clear();

bufferedSizeState.clear();

}

}

在这里做几点说明:

> State取代了你的DoFn的实例变量,因为

实例变量在窗口之间没有内聚力.

>缓冲区和大小只是根据需要进行初始化

@StartBundle.

> BagState支持“盲”写入,因此不需要

任何读 – 修改 – 写,只需提交相同的新元素

当你输出时的方式.

>在同一时间重复设置一个计时器就好了;

它应该主要是一个noop.

> @OnTimer(“expiry”)取代@FinishBundle,因为

完成一个包不是每个窗口的东西,而是一个神器

跑步者如何执行您的管道.

所有这一切,如果你正在写一个外部系统,也许你会想要重新启用窗口并重新窗口进入全局窗口,然后才进行写入,其中写入的方式取决于窗口,因为“外部世界是全球窗口“.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/969881
推荐阅读
相关标签
  

闽ICP备14008679号