欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 时评 > flink-jdbc-driver

flink-jdbc-driver

2025/9/14 19:52:35 来源:https://blog.csdn.net/qq_36250202/article/details/142789143  浏览:    关键词:flink-jdbc-driver

        Flink JDBC 驱动程序是一个 Java 库,使客户端能够通过 SQL 网关将 Flink SQL 发送到 Flink 集群。

     首先启动:1.flink集群,随意任何集群。

     2.启动flink-sql-gateway:

sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

验证sql- gateway

curl http://localhost:8083/v1/info
  1. Maven 依赖项

    Flink JDBC 驱动是一个通过 JDBC API 访问 Flink 集群的库。有关 Java 中 JDBC 的一般用法,请参见 JDBC 教程。

  2. 在项目的pom.xml中添加以下依赖项,或者下载 flink-jdbc-driver-bundle-{VERSION}.jar 并将其添加到你的 Classpath 中。

       

 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-jdbc-driver-bundle</artifactId><version>${flink.version}</version></dependency>
  1. 使用特定 URL 连接到 Java 代码中的 Flink SQL 网关。
  2. 执行您想要的任何语句。

     

public class Main {public static void main(String[] args) throws SQLException {Properties properties = new Properties();properties.setProperty("table.exec.sink.not-null-enforcer","DROP");DataSource dataSource = new FlinkDataSource("jdbc:flink://ip:8083", properties);try (Connection connection = dataSource.getConnection()) {try (Statement statement = connection.createStatement()) {/*statement.execute("CREATE TABLE student (\n" +"  id VARCHAR(255),\n" +"  sid VARCHAR(255),\n" +"  name VARCHAR(255),\n" +"  PRIMARY KEY (id) NOT ENFORCED" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://ip:3306/test?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true',\n" +"   'username' = 'test',\n" +"   'password' = 'test@123',\n " +"   'table-name' = 'student' " +")");statement.execute("CREATE TABLE students (\n" +"  id VARCHAR(255),\n" +"  sid VARCHAR(255),\n" +"  name VARCHAR(255),\n" +"  PRIMARY KEY (id) NOT ENFORCED" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://ip:3306/test?characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true',\n" +"   'username' = 'test',\n" +"   'password' = 'test@123',\n " +"   'table-name' = 'students' " +")");*/statement.execute("CREATE TABLE students (\n" +"  name STRING,\n" +"  sid BIGINT,\n" +"  PRIMARY KEY (sid) NOT ENFORCED" +") WITH (\n" +"   'connector' = 'paimon',\n" +"   'path' = 'hdfs://ip:8072/paimon/hive/olap_mz.db/students'\n" +")");//statement.execute("INSERT INTO students VALUES ('1', 's1', 'Hi'), ('2', 's2','Hello')");try (ResultSet rs = statement.executeQuery("SELECT st.sid,count(st.sid) as coun FROM students st group by st.sid")) {while (rs.next()) {System.out.println(rs.getLong(1) +","+rs.getLong(2) );}statement.close();}connection.close();}}}
}

注释的是mysql的,运行的是paimon的。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com