Flink水印延迟与窗口允许延迟的概念是什么
这篇文章主要讲解了“Flink水印延迟与窗口允许延迟的概念是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink水印延迟与窗口允许延迟的概念是什么”吧!
网站建设哪家好,找成都创新互联公司!专注于网页设计、网站建设、微信开发、成都小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了常山免费建站欢迎大家使用!
link 在开窗处理事件时间(Event Time) 数据时,可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性。这两者因都是设置延迟时间所以刚接触时容易混淆。本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别。
水印延迟(WaterMark)
(1) 水印
由于采用了事件时间,脱离了物理挂钟。窗口不知道什么时候需要关闭并进行计算,这个时候需要借助水印来解决该问题。当窗口遇到水位标识时就默认是窗口时间段内的数据都到齐了,可以触发窗口计算。
(2) 水印延迟
设置水印延迟时间的目的是让水印延迟到达,从而可以解决乱序问题。通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中,保证了数据的完整性。当水印到达后就会触发窗口计算,在水印之后到达的迟到数据则会被丢弃。
窗口允许延迟(allowedLateness)
使用 StreamAPI 时,在进行开窗后可设置 allowedLateness 窗口延迟。官网中对其解释如下:
默认情况下,当水印到达窗口末端时,迟到元素将会被删除。但Flink允许为window operators指定允许的最大延迟。允许延迟指定元素在被删除之前延迟的时间,默认值为0。当元素在水印经过窗口末端后到达,且它的到达时间在窗口末端加上运行延迟的时间之内,其仍会被添加到窗口中。根据所使用的触发器,延迟但未被丢弃的元素可能会再次触发窗口计算。EventTimeTrigger就是这种情况。为了做到这一点,Flink保持窗口的状态,直到它们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,正如窗口生命周期部分中所描述的那样。 |
简单理解:通常在水印到达之后迟到数据将会被删除,而窗口的延迟则是指数据在被删除之前的允许保留时间。也就是说,在水印达到之后迟到数据本该被删除,但是如果设置了窗口延迟,那么在水印之后到窗口延迟时间段内到达的迟到数据还是会被加入到窗口计算中,并再次触发窗口计算。
一个Demo 两个猜想
下面我用一个 Demo 和两个猜想来帮助大家加深理解这两个概念。
例子:接收 Kafka 数据,数据为 JSON 格式如:{"word":"a","count":1,"time":1604286564}。我们开一个 5 秒的 tumbling windows 滚动窗口,以 word 作为 key 在窗口内对 count 值进行累加。同时设置水印延迟 2 秒,窗口延迟 2 秒。代码如下:
public class MyExample { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置时间特性为 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 水印策略,其需要注入Timestamp Assigner(描述了如何访问事件时间戳)和 Watermark Generator (事件流显示的超出正常范围的程度) WatermarkStrategywatermarkStrategy = WatermarkStrategy // forBoundedOutOfOrderness 属于(periodic周期性),周期生成器通常通过onEvent()观察传入的事件,然后在框架调用onPeriodicEmit()时发出水印。 . forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner () { @Override public long extractTimestamp(WC wc, long l) { return wc.getEventTime() * 1000; } }); // Kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "Kafka地址:9092"); properties.setProperty("group.id", "test"); // Flink 需要知道如何转换Kafka消息为Java对象(反序列化),默认提供了 KafkaDeserializationSchema(序列化需要自己编写)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchema env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest()) // map 构建 WC 对象 .map(new MapFunction () { @Override public WC map(ObjectNode jsonNode) throws Exception { JsonNode valueNode = jsonNode.get("value"); WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong()); return wc; } }) // 设定水印策略 .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(WC::getWord) // 窗口设置,这里设置为滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 设置窗口延迟 .allowedLateness(Time.seconds(2)) .reduce(new ReduceFunction () { @Override public WC reduce(WC wc, WC t1) throws Exception { return new WC(wc.getWord(), wc.getCount() + t1.getCount()); } }) .print(); env.execute(); } static class WC { public String word; public int count; public long eventTime; public long getEventTime() { return eventTime; } public void setEventTime(long eventTime) { this.eventTime = eventTime; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public WC(String word, int count) { this.word = word; this.count = count; } public WC(String word, int count,long eventTime) { this.word = word; this.count = count; this.eventTime = eventTime; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } } }
猜想1:
水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。
// 往 Kafka 中写入数据 {"word":"a","count":1,"time":1604286560} //2020-11-02 11:09:20 {"word":"a","count":1,"time":1604286561} //2020-11-02 11:09:21 {"word":"a","count":1,"time":1604286562} //2020-11-02 11:09:22 {"word":"a","count":1,"time":1604286566} //2020-11-02 11:09:26 {"word":"a","count":1,"time":1604286567} //2020-11-02 11:09:27 (触发了窗口计算)
控制台输出
分析:通过测试发现最后在第 7s 也就是 11:09:27 时触发了窗口计算,这符合了我们的猜想一。水印延迟 2s 达到,所以会在第 5 + 2 = 7s 时认为 [ 0 ,5 ) 窗口的数据全部到齐,并触发窗口计算。计算结果为3,这是因为只有最前面的3条数据属于 [0,5) 窗口计算范围之内。
猜想2:
设置了窗口延迟2秒,那么只要在水印之后到窗口允许延迟的时间范围内达到且属于 [ 0,5) 窗口的迟到数据会被加入到窗口中,且再次触发窗口运算:
// 继续往 Kafka 中写入数据 {"word":"a","count":1,"time":1604286568} //2020-11-02 11:09:28 时间到达了第 8 秒 {"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模拟一个在水印之后、在窗口允许延迟范围内、且属于[0,5) 窗口的迟到数据,该数据还是会触发并参与到[0,5) 窗口的计算
控制台输出新增了一行
// 我们再继续往 Kafka 中写入数据 {"word":"a","count":1,"time":1604286569} //2020-11-02 11:09:29 时间到达第9秒 {"word":"a","count":1,"time":1604286563} //2020-11-02 11:09:23 模拟一个在水印之后且超出窗口允许延迟范围、且属于[0,5) 窗口的迟到数据,该数据不会参与和触发[0,5)窗口计算
查看控制台并没有发现新的输出打印。
解析:水印因延迟在第 7s 到达之后会触发[0,5) 窗口计算,如果没有设置窗口延迟的情况下,水印之后迟到且属于 [0,5) 窗口的数据会被丢弃。上面我们实验设置窗口延迟 2s,实现的效果就是在水印之后,窗口允许延迟时间之内(7 + 2 = 9s 之间),迟到且属于 [0,5) 窗口的数据还是会触发一次窗口计算,并参与到窗口计算中。而在 9s 之后,也就是超过窗口允许延时时间,那么迟到且属于[0,5)的数据就会被丢弃。
感谢各位的阅读,以上就是“Flink水印延迟与窗口允许延迟的概念是什么”的内容了,经过本文的学习后,相信大家对Flink水印延迟与窗口允许延迟的概念是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
文章名称:Flink水印延迟与窗口允许延迟的概念是什么
链接URL:http://scyanting.com/article/gshscd.html