文章目录

一. create table hints1. 语法2. 示例3. 注意

二. 实战:简化hive连接器参数设置三. select hints(ing)

SQL 提示(SQL Hints)是和 SQL 语句一起使用来改变执行计划的。本章介绍如何使用 SQL 提示来实现各种干预。

SQL 提示一般可以用于以下:

增强 planner:没有完美的 planner, SQL 提示让用户更好地控制执行;增加元数据(或者统计信息):如"已扫描的表索引"和"一些混洗键(shuffle keys)的倾斜信息"的一些统计数据对于查询来说是动态的,用提示来配置它们会非常方便,因为我们从 planner 获得的计划元数据通常不那么准确;算子(Operator)资源约束:在许多情况下,我们会为执行算子提供默认的资源配置,即最小并行度或托管内存(UDF 资源消耗)或特殊资源需求(GPU 或 SSD 磁盘)等,可以使用 SQL 提示非常灵活地为每个查询(非作业)配置资源。

 

一. create table hints

动态表选项允许动态地指定或覆盖表选项,不同于用 SQL DDL 或 连接 API 定义的静态表选项,这些选项可以在每个查询的每个表范围内灵活地指定。

因此,它非常适合用于交互式终端中的特定查询,例如,在 SQL-CLI 中,你可以通过添加动态选项/*+ OPTIONS('csv.ignore-parse-errors'='true') */来指定忽略 CSV 源的解析错误。

 

1. 语法

为了不破坏 SQL 兼容性,我们使用 Oracle 风格的 SQL hints 语法:

table_path /*+ OPTIONS(key=val [, key=val]*) */

key: string字符

val: string字符

 

2. 示例

CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);

CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);

-- `覆盖`查询语句中源表的选项

select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;

-- 覆盖 join 中源表的选项

select * from

kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1

join

kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2

on t1.id = t2.id;

-- 覆盖插入语句中结果表的选项

insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;

 

3. 注意

create table hints 传递的连接器中catalog的相关参数,即create table with下参数,具体到源代码是:context.getCatalogTable().getOptions() 。

 

如果传参无效且在日志中看到参数已经设置成功,那

可能将context.getConfiguration()中的参数传递到with参数下,比如: hive连接器下:table.exec.hive.sink.statistic-auto-gather.enable 参数由DefaultDynamicTableContext的configuration来接收。此参数为flink sql的全局参数,此时可以通过set table.exec.hive.sink.statistic-auto-gather.enable=false 语法来设定参数。

 

二. 实战:简化hive连接器参数设置

对于hive连接器,Flink实现了通过catalog的方式来管理hive表,在使用hive表时需要使用hive相关语法,此时需要声明,hive dialect,如下:

CREATE CATALOG myhive WITH (

'type' = 'hive',

'default-database' = 'aaa',

'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'

);

SET table.sql-dialect=hive;

-- 因为需要使用hive连接器中的写特性,所以需要create table ,此时sql语法为hive语法

CREATE TABLE hive_table (

user_id STRING,

order_amount DOUBLE

) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (

'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',

'sink.partition-commit.trigger'='partition-time',

'sink.partition-commit.delay'='1 h',

'sink.partition-commit.policy.kind'='metastore,success-file'

);

-- 对于某些框架例如chunjun,此处不能很好的适配:

--

SET table.sql-dialect=default;

CREATE TABLE kafka_table (

user_id STRING,

order_amount DOUBLE,

log_ts TIMESTAMP(3),

WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列声明 watermark。

) WITH (...);

-- streaming sql, insert into hive table

INSERT INTO TABLE myhive.aaa.hive_table

SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')

FROM kafka_table;

如下可以把写hive的一些行为通过sql hint方式,放到Flink sql语句中,如下整个Flink sql 会清爽很多。

CREATE CATALOG myhive WITH (

'type' = 'hive',

'default-database' = 'database_name',

'hive-conf-dir' = '/usr/bin/hadoop/software/hive/conf'

);

CREATE TABLE source_kafka (

`pv` string,

`uv` string,

`p_day_id` string

) WITH (

'connector' = 'kafka-x'

,'topic' = 'hive_kafka'

,'properties.bootstrap.servers' = 'xxx:9092'

,'properties.group.id' = 'luna_g'

,'scan.startup.mode' = 'earliest-offset'

,'json.timestamp-format.standard' = 'SQL'

,'json.ignore-parse-errors' = 'true'

,'format' = 'json'

,'scan.parallelism' = '1'

);

insert into

myhive.database_name.table_name /*+ OPTIONS('partition.time-extractor.timestamp-pattern'='$p_day_id:00:00','sink.partition-commit.policy.kind'='metastore,success-file','sink.partition-commit.success-file.name'='_SUCCESS_gao111') */

select * from source_kafka;

 

三. select hints(ing)

查询提示(Query Hints)用于为优化器修改执行计划提供建议,该修改只能在当前查询提示所在的查询块中生效(Query block, 什么是查询块)。 目前,Flink 查询提示只支持联接提示(Join Hints)。

具体见:官网

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/sql/queries/hints/#%E6%9F%A5%E8%AF%A2%E6%8F%90%E7%A4%BA

 

精彩内容

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。