我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:
- def getCsvTableSink: TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- // 打印sink的文件路径,方便我们查看运行结果
- println("Sink path : " + tempFile)
- if (tempFile.exists()) {
- tempFile.delete()
- }
- new CsvTableSink(tempFile.getAbsolutePath).configure(
- Array[String]("region", "winStart", "winEnd", "pv"),
- Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))}
2. 构建主程序
主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:
- def main(args: Array[String]): Unit = {
- // Streaming 环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- // 设置EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- //方便我们查出输出数据
- env.setParallelism(1)
-
- val sourceTableName = "mySource"
- // 创建自定义source数据结构
- val tableSource = new MyTableSource
-
- val sinkTableName = "csvSink"
- // 创建CSV sink 数据结构
- val tableSink = getCsvTableSink
-
- // 注册source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注册sink
- tEnv.registerTableSink(sinkTableName, tableSink)
-
- val result = tEnv.scan(sourceTableName)
- .window(Tumble over 2.minute on 'accessTime as 'w)
- .groupBy('w, 'region)
- .select('region, 'w.start, 'w.end, 'region.count as 'pv)
-
- result.insertInto(sinkTableName)
- env.execute()
- }
3. 执行并查看运行结果
(编辑:宁波网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|