一、背景

Flink 由于阿里在国内的助推,火爆程度可以想象,且目前Flink 有非常明显的趋势是往SQL 方向进行的。很多大厂已经实现了Flink SQL化,那我们怎么去实现一个流式计算平台呢?

打开网易新闻 查看精彩图片

二、Flink SQL 初探以及代码实现

连接kafka 对数据进行处理写入mysql

package org.example;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class SqlDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
TableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
//把kafka 中的topic映射成一个输入临时表
tableEnv.executeSql(
"create table sensor_source (id string,name string) with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'test_info_test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json')"
);
//把mysql 中的表映射成一个输出临时表
String sql = "CREATE TABLE print_table (\n" +
" id STRING,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
String mysql_sql = "CREATE TABLE mysql_sink (\n" +
" id string,\n" +
" name string\n" +
" ) WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://ip:8081/kafka?serverTimezone=UTC',\n" +
" 'table-name' = 'test_info',\n" +
" 'username' = 'kafka',\n" +
" 'password' = 'Bonc@123'\n" +
" )";
String kafka_sink_sql=
"create table kafka_sink (id string,name string) with (" +
" 'connector' = 'kafka'," +
" 'topic' = 'test_info_2'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json')";
tableEnv.executeSql(mysql_sql);
//tableEnv.executeSql(kafka_sink_sql);
//tableEnv.executeSql(sql);
//插入数据的sql语句
//tableEnv.executeSql("insert into print_table select * from sensor_source");
tableEnv.executeSql("insert into mysql_sink select * from sensor_source");
//tableEnv.executeSql("insert into kafka_sink select * from sensor_source");
}
}

运行之后mysql 里面数据就有了【关注尚硅谷,轻松学IT】

打开网易新闻 查看精彩图片

三、Flink 实时计算平台

依据上面的代码,我们可以抽象出一层Flink 实时计算平台www.atguigu.com。

打开网易新闻 查看精彩图片
打开网易新闻 查看精彩图片

文章来源于诸葛子房