Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架,Flink 批流统一,同一套代码,可以跑流也可以跑批,同一个SQL,可以跑流也可以跑批,可连接到常用的存储系统,如kafka、hive、JDBC、HDFS、Redis
DataStream API
Table API & SQL
在 Flink 中,Table API 和 SQL 可以看作联结在一起的一套 API,这套 API 的核心概念是一个可以用作 Query 输入和输出的表 Table。在我们程序中,输入数据可以定义成一张表,然后对这张表进行查询得到一张新的表,最后还可以定义一张用于输出的表,负责将处理结果写入到外部系统。
我们可以看到,程序的整体处理流程与 DataStream API 非常相似,也可以分为读取数据源(Source)、转换(Transform)、输出数据(Sink)三部分。只不过这里的输入输出操作不需要额外定义,只需要将用于输入和输出的表 Table 定义出来,然后进行转换查询就可以了。
// 创建执行环境 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 创建输入表
String sourceSql = "CREATE TABLE datagen_table (\n" +
" word STRING,\n" +
" frequency int\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.word.kind' = 'random',\n" +
" 'fields.word.length' = '1',\n" +
" 'fields.frequency.min' = '1',\n" +
" 'fields.frequency.max' = '9'\n" +
")";
tableEnv.executeSql(sourceSql);
// 创建输出表
String sinkSql = "CREATE TABLE print_table (\n" +
" word STRING,\n" +
" frequency INT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
tableEnv.executeSql(sinkSql);
// 执行计算并输出
String sql = "INSERT INTO print_table\n" +
"SELECT word, SUM(frequency) AS frequency\n" +
"FROM datagen_table\n" +
"GROUP BY word";
tableEnv.executeSql(sql);
Flink SQL 是 Flink 实时计算 为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。
Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。
📚参考资料
- 尚硅谷Flink教程 (confucianzuoyuan.github.io)
- (上)史上最全干货!Flink SQL 成神之路
- 8.1. Table API & SQL 综述 — Flink 原理与实践 (godaai.org)
❤️Sponsor
您的支持是我不断前进的动力,如果您感觉本文对您有所帮助的话,可以考虑打赏一下本文,用以维持本博客的运营费用,拒绝白嫖,从你我做起!🥰🥰🥰
支付宝 | 微信 |