加入收藏 | 设为首页 | 会员中心 | 我要投稿 宁波网 (https://www.ningbowang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 01:20:41 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们概要的向大家介绍了什么是好SQL,SQL和Table API是Apache Flink中的同一层次的API抽象,如下图所示: Apache Flink 针对不同的用户场景提供了三层用户API,最下层ProcessFunction API

上面我们介绍了Apache Flink Table API核心算子的语义和具体示例,这部分将选取Bounded EventTime Tumble Window为例为大家编写一个完整的包括Source和Sink定义的Apache Flink Table API Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV)。具体数据如下:

Apache Flink 漫谈系列(13) - Table API 概述

1. Source 定义

自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

(1) Source Function定义

支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:

  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  2. extends SourceFunction[T] { 
  3. override def run(ctx: SourceContext[T]): Unit = { 
  4. dataWithTimestampList.foreach { 
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  6. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  7. override def cancel(): Unit = ???} 

(2) 定义 StreamTableSource

我们自定义的Source要携带我们测试的数据,以及对应的WaterMark数据,具体如下:

  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { 
  2.  
  3. val fieldNames = Array("accessTime", "region", "userId") 
  4. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) 
  5. val rowType = new RowTypeInfo( 
  6. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], 
  7. fieldNames) 
  8.  
  9. // 页面访问表数据 rows with timestamps and watermarks 
  10. val data = Seq( 
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), 
  12. Right(1510365660000L), 
  13. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), 
  14. Right(1510365660000L), 
  15. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), 
  16. Right(1510366200000L), 
  17. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), 
  18. Right(1510366260000L), 
  19. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), 
  20. Right(1510373400000L) 
  21.  
  22. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { 
  23. Collections.singletonList(new RowtimeAttributeDescriptor( 
  24. "accessTime", 
  25. new ExistingField("accessTime"), 
  26. PreserveWatermarks.INSTANCE)) 
  27.  
  28. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { 
  29. execEnv.addSource(new MySourceFunction[Row](data)).returns(rowType).setParallelism(1) 
  30.  
  31. override def getReturnType: TypeInformation[Row] = rowType 
  32.  
  33. override def getTableSchema: TableSchema = schema 
  34.  

(3) Sink 定义

(编辑:宁波网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读