FlinkCEP事件处理

这篇文章将为大家详细讲解有关Flink CEP事件处理,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

成都创新互联公司是一家专注于做网站、成都做网站与策划设计,港南网站建设哪家好?成都创新互联公司做网站,专注于网站建设十载,网设计领域的专业建站公司;建站业务涵盖:港南等地区。港南做网站价格咨询:18980820575

什么是CEP

复杂事件处理,允许在无界数据流中检测出特定事件模型

单个模式

单个模式指一个模式,可以是一个单例也可以是循环模式。
模式都是单例的,可以通过量词转换成循环模式。每个模式可以有一个或多个条件来决定接受哪些事件。

量词
  1. pattern.oneOrMore():期望给定的事件出现一次或多次

  2. pattern.times(#oftimes):期望一个给定事件出现特定次数的模式

  3. pattern.times(#fromTimes, #toTimes):期望一个给定事件出现次数在一个最小值与最大值中间

  4. pattern.greedy():贪心算法,尽可能多匹配,还不能让模式组贪心

  5. pattern.optional():变为可选

示例:
// 期望出现4次
start.times(4);
// 期望出现0或者4次
start.times(4).optional();
// 期望出现2、3或者4次
start.times(2, 4);
// 期望出现2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).greedy();
// 期望出现0、2、3或者4次
start.times(2, 4).optional();
// 期望出现0、2、3或者4次,并且尽可能的重复次数多
start.times(2, 4).optional().greedy();
// 期望出现1到多次
start.oneOrMore();
// 期望出现1到多次,并且尽可能的重复次数多
start.oneOrMore().greedy();
// 期望出现0到多次
start.oneOrMore().optional();
// 期望出现0到多次,并且尽可能的重复次数多
start.oneOrMore().optional().greedy();
// 期望出现2到多次
start.timesOrMore(2);
// 期望出现2到多次,并且尽可能的重复次数多
start.timesOrMore(2).greedy();
// 期望出现0、2或多次
start.timesOrMore(2).optional();
// 期望出现0、2或多次,并且尽可能的重复次数多
start.timesOrMore(2).optional().greedy();
条件

判断事件属性的条件可以是以下方法

  1. pattern.where()

  2. pattern.or()

  3. pattern.until()
    这些方法入参可以是IterativeCondition或SimpleCondition

pattern.subtype方法限制接受事件类型是初始事件的子类型。

  1. 迭代条件IterativeCondition

  2. 简单条件SimpleCondition

  3. 组合条件.where().or()等

  4. 停止条件.until()

组合模式

FlinkCEP支持事件之间如下形式的连续策略

  1. 严格连续:期望所有匹配事件严格的一个接一个出现,中间没有任何不匹配事件

  2. 松散连续:忽略匹配的事件之间的不匹配的事件

  3. 不确定的松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配

1. next() 指定严格连续
2. followedBy() 指定松散连续
3. followedByAny() 不确定松散连续
4. notNext() 如果不想后面直接连着一个特定事件
5. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。
ps: 模式序列不能以notFollowedBy()结尾
    一个NOT模式前面不能是可选的模式

定义模式一个有效时间约束:pattern.within()方法指定有效时间内发生。
模式序列只能有一个时间限制,如果限制多个时间在不同的模式上,会使用最小的时间限制。

循环模式默认是松散连续,如果合用严格连续,需使用consecutive()方法明确指定。如果想使用不确定松散连续,可以使用allowCombinations()方法
==示例:consecutive==

Pattern.begin("start").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().consecutive()
.followedBy("end1").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

输入:C D A1 A2 A3 D A4 B,会产生下面的输出:

如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}

不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

==示例:allowCombinations==

Pattern.begin("start").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("c");
  }
})
.followedBy("middle").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("a");
  }
}).oneOrMore().allowCombinations()
.followedBy("end1").where(new SimpleCondition() {
  @Override
  public boolean filter(Event value) throws Exception {
    return value.getName().equals("b");
  }
});

输入:C D A1 A2 A3 D A4 B,会产生如下的输出:

如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}

如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}
模式组

定义一个模式序列作为begin,followedBy,followedByAny和next条件

关于“Flink CEP事件处理”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。


网页名称:FlinkCEP事件处理
网站链接:http://scyanting.com/article/jdpppi.html