FLINKSIDDHIADDON学习笔记

SIDDHI 是一款功能强大的CEP 引擎,具有自己的DSL,丰富的模式匹配功能和可扩展性, 感谢陈浩同学提供了SIDDHI和FLINK的整合功能 https://github.com/haoch/flink-siddhi 本文主要介绍了这个ADDON的一些实现思路

创新互联是专业的秀屿网站建设公司,秀屿接单;提供成都网站设计、做网站、成都外贸网站建设公司,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行秀屿网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

  1. 将FLINK STREAM 转化为 SIDDHI STREAM 定义

  用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames)

  通过 FlinkDataStream.getType 获得流对象的类型定义.registerStream方法会构造一个 SiddhiStreamSchema 对象,根据流对象的类型定义,自动获取每个field对应的数据类型存在内部的fieldTypes数组中.

  SiddhiStreamSchema 内部会创建一个Siddhi StreamDefinition对象, StreamDefinition的attribute的定义根据fieldNames + fieldTypes 来添加.SiddhiTypeFactory.getAttributeType 负责将Flink 的数据类型映射为Siddhi的数据类型, 并可自动生成一段Define Stream的定义(见 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n])

  SiddhiStreamSchema 包括一个StreamSerializer: 用于将DataStream 中的对象转化为 Siddhi Stream 的输入(Object Array):
    如果流对象是一个简单类型 Atomic Type 直接将流对象放到 ARRAY中
    如果流对象是Tuple 类型,直接将Tuple 中前N个值放到ARRAY中
    如果流对象是Pojo或者CaseClass类型,直接根据每个fieldName 获取Class对应的属性放到ARRAY中

  1. 串联FLINK STREAM 和 SIDDHI STREAM

  SiddhiStream: 抽象的Stream基类

  convertDataStream 将原始的FLINK流转化为Tuple类型的流,Tuple的第一个元素为StreamId, 第二个元素为原来流中的数据,支持普通Stream 和 KeyedStream

  ExecutionSiddhiStream: 构建 SiddhiOperatorContext 并调用SiddhiStreamFactory.createDataStream 创建了集成Siddhi的 DataStream. DataStream的类型为Tuple的子类.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是通过Siddhi输出Stream的StreamDefinition获得其Attribute的定义,再通过 TypeInfoParser.parse构造Flink Tuple 类型的定义

  ExecutableStream 根据Siddhi query 创建ExecutionSiddhiStream对象
  SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子类,支持Fluent Style的链式调用. UnionSiddhiStream 调用了DataStream.union 方法

  SiddhiStreamFactory.createDataStream 通过 FLINK DataStream的transform方法使用了自定义的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中创建SiddhiManager 和 SiddhiAppRuntime 并注册了 InputHandler 和 OutputCallback (StreamOutputHandler)

  SiddhiStreamOperator.processElement 需要处理两种场景:
    Flink TimeCharacteristic = ProcessingTime: 先调用StreamSerializer将数据转化为Object Array, 再直接调用InputHandler.send将数据发送给Siddhi处理
    Flink TimeCharacteristic = EventTime: 缓存接收到的StreamRecord 到内部的priorityQueue中,直到收到Watermark, 将priorityQueue中小于watermark的StreamRecord一次发送给Siddhi处理

  StreamOutputHandler:根据Output的TypeInfo将Siddhi Event 转化为 Flink StreamRecord. 再转发到SiddhiStreamOperator的Output

  1. CHECKPOINT

  SiddhiStreamOperator中保留了两种State信息,一种是priorityQueue中保存的由于watermark未发送给Siddhi的消息. 另一种是Siddhi本身的State, 通过SiddhiAppRuntime.snapshot() 获得


分享名称:FLINKSIDDHIADDON学习笔记
分享网址:http://scyanting.com/article/iheice.html