1 安装kyuubi

1.1 二进制包下载

当前最新版本:1.8.0

wget http://mirrors.ustc.edu.cn/apache/kyuubi/kyuubi-1.8.0/apache-kyuubi-1.8.0-bin.tgz

解压缩到指定目录:

tar -zxvf apache-kyuubi-1.8.0-bin.tgz -C ~/softwares

准备环境:

cd $KYUUBI_HOME/conf

cp kyuubi-env.sh.template kyuubi-env.sh

cp kyuubi-defaults.conf.template kyuubi-defaults.conf

将kyuubi地址设置为localhost,如果不打开该注释,那么使用localhost是无法连接的,需要填写主机的ip地址

kyuubi.fronted.bind.host localhost

1.2 源码编译

官方文档地址:https://kyuubi.readthedocs.io/en/v1.7.1-rc0/develop_tools/distribution.html

这里以编译master分支的源码为例,如果你想编译其他分支请自行选择:

git clone https://github.com/apache/kyuubi.git

cd kyuubi

./build/dist --tgz --spark-provided --flink-provided --hive-provided

编译完成后,可以在kyuubi目录下看到apache-kyuubi-${version}-bin.tgz

2 配置引擎

2.1 flink引擎

进入kyuubi配置文件目录

cd $KYUUBI_HOME/conf

编辑kyuubi-defaults.conf,设置参数,设置flink作为执行引擎

kyuubi.engine.type FLINK_SQL

(这一步是可选的,因为可以在jdbc url中指定该配置动态改变引擎)

在kyuubi-env.sh配置FLINK_HOME和FLINK_HADOOP_CLASSPATH,要求flink版本大于1.14,

FLINK_HADOOP_CLASSPATH官方文档里没有说明要写,但是如果不写会无法连接使用flink engine,具体的版本号可自行修改为自己的hadoop平台版本号

export FLINK_HOME=/home/xcchen/software/flink/flink-1.15.3

export FLINK_HADOOP_CLASSPATH=${HADOOP_HOME}/share/hadoop/client/hadoop-client-runtime-3.3.0.jar:${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.3.0.jar

2.2 spark引擎

在kyuubi-env.sh配置SPARK_HOME,要求spark版本大于3.0.0

# 调整为自己的spark安装路径

export SPARK_HOME=/home/xcchen/softwares/spark/spark-3.1.1

3 启动kyuubi

cd $KYUUBI_HOME

bin/kyuubi start

4 Flink测试使用

为了测试方便,这里使用flink standalone集群运行任务

cd $FLINK_HOME && bin/start-cluster.sh

使用kyuubi自带的beeline客户端工具

cd $KYUUBI_HOME

bin/beeline -u 'jdbc:hive2://localhost:10009/' -n xcchen

如果你没有在配置文件中设置flink引擎

cd $KYUUBI_HOME

bin/beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL' -n xcchen

4.1 提交最基础的SQL任务

将下面三条sql一条条的执行

CREATE TABLE orders (

order_number BIGINT,

price DECIMAL(32,2),

buyer ROW,

order_time TIMESTAMP(3)

) WITH (

'connector' = 'datagen',

'rows-per-second' = '1'

);

CREATE TABLE target (

order_number BIGINT,

price DECIMAL(32,2),

buyer ROW,

order_time TIMESTAMP(3)

) WITH (

'connector' = 'print'

);

INSERT INTO target SELECT * FROM orders;

4.2 交互式查询

select * from orders;

CREATE TABLE orders_b (

order_number BIGINT,

price DECIMAL(32,2),

buyer ROW,

order_time TIMESTAMP(3)

) WITH (

'connector' = 'datagen',

'rows-per-second' = '1',

'number-of-rows' = '20'

);

select * from orders_b;

如果表是有界的,那么就可以打印出结果

如果表是无界的,那么会一直阻塞,等待任务结束才会打印结果

4.3 对接使用hive metastore

使用create catalog和use catalog语法进行使用,需要将hive连接器放在$FLINK_HOME/lib目录下

无法实现默认加载hive metastore

4.4 flink yarn session

上述的例子都是将任务跑在flink standalone集群上的,这不是一个常规的用法。接下来介绍如何将flink任务运行在yarn-session集群上

创建flink yarn session集群

export HADOOP_CLASSPATH=`hadoop classpath`

cd $FLINK_HOME

bin/yarn-session.sh -jm 1024 --detached

创建成功后,可以在命令行看到以下输出,集群id为application_1678943591760_0001

2023-03-16 13:14:15,912 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - The Flink YARN session clus

ter has been started in detached mode. In order to stop Flink gracefully, use the following command:

$ echo "stop" | ./bin/yarn-session.sh -id application_1678943591760_0001

If this should not be possible, then you can also kill Flink via YARN's web interface or via:

$ yarn application -kill application_1678943591760_0001

Note that killing Flink might not clean up all job artifacts and temporary files.

部署好session集群后,我们使用以下命令测试下往session集群提交任务:

cd $FLINK_HOME

./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

配置flink-conf.yaml,添加以下内容(或者使用jdbc连接参数指定这些信息)

execution.target: yarn-session

# Yarn Session Cluster application id.

yarn.application.id: application_1678943591760_0001

配置kyuubi-env.sh,添加下面两个配置:

export HADOOP_CONF_DIR=xxx

export FLINK_HADOOP_CLASSPATH=`hadoop classpath`

连接kyuubi

未配置flink-conf.yaml时,通过jdbc url参数指定配置:

bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;yarn.application.id=application_1679304354185_0001;execution.target=yarn-session' -n xcchen

配置了flink-conf.yaml,则无需传递参数直接使用

4.5 flink yarn application

注意,kyuubi flink yarn application仅支持kyuubi 1.8版本,且flink版本要大于1.15

bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application' -n xcchen

application模式会在yarn集群上开启一个不会退出的flink集群环境,该模式的TaskManager和session模式一样都是动态分配,没有任务就会释放,有任务要运行就会自行拉起执行任务。

该模式和session模式的区别是,session在yarn上共享集群的,无论你在调用kyuubi连接时使用什么用户都不会在yarn集群上重新创建集群;而application模式在不同用户之间是不会共享集群的,用户a在使用时会拉起集群,用户b在使用时会新拉起一个集群

5 Spark测试使用

为了测试方便,这里使用spark standalone集群

cd $SPARK_HOME

sbin/start-all.sh

还需要开启hdfs

start-dfs.sh

使用kyuubi自带的beeline客户端工具

cd $KYUUBI_HOME

bin/beeline -u 'jdbc:hive2://localhost:10009/' -n xcchen

5.1 提交最基础的SQL任务

准备数据文件

echo 'id,name\n1,zhangsan\n2,lisi' > /tmp/spark-kyuubi-source-test.csv

-- source table

create temporary view s using csv options (path 'file:/tmp/spark-kyuubi-source-test.csv',header "true");

insert overwrite directory using csv options (path 'file:/tmp/spark-kyuubi-target') select * from s;

5.2 交互式查询

基础查询:

-- basic query

select timestamp '2023-03-03';

读取文件:

-- select from csv table

select * from s;

0: jdbc:hive2://localhost:10009/> select * from s;

+-----+-----------+

| id | name |

+-----+-----------+

| 1 | zhangsan |

| 2 | lisi |

+-----+-----------+

2 rows selected (0.072 seconds)

5.3 提交带有依赖的任务

准备mysql数据

docker exec -it mysql bash

mysql -uroot -p'xxx'

mysql中建表建库

create database test default character set utf8mb4;

create user 'kyuubi'@'%' identified by 'Kyuubi123~';

grant all privileges on test.* to 'kyuubi'@'%' with grant option;

flush privileges;

use test;

create table t1 (id int primary key,name varchar(20));

insert into t1 values (1,'zhangsan');

进入beeline交互式命令行:

add jar '/home/xcchen/softwares/kyuubi/mysql-connector-java-8.0.22.jar';

create temporary view m_test

using org.apache.spark.sql.jdbc

options (url 'jdbc:mysql://localhost:3306/test',dbTable 't1',user 'kyuubi',password 'Kyuubi123~');

select * from m_test;

添加的jar包是租户级别的,退出命令行后,重新打开命令行建立jdbc连接的时候不需要再添加依赖jar包

5.4 对接使用hive metastore

启动hive metastore

hive --service metastore

配置方式:

在jdbc url中填写hms地址,这是最简单的方式:

bin/beeline -u 'jdbc:hive2://localhost:10009/;#hive.metastore.uris=thrift://localhost:9083' -n xcchen

修改$KYUUBI_HOME/conf/kyuubi-defaults.conf,添加spark.hive.metastore.uris配置项

默认的spark元数据信息只保存在会话级别,当我们在第一个命令行窗口开启会话a并且创建一系列表后,新开一个窗口开启会话b时,使用show tables会发现没有表存在。说明默认的spark元数据信息是会话级别的,如果使用了metastore那就另说

6 jdbc使用kyuubi测试

参考git项目地址:http://192.168.118.128:81/xcchen/kyuubi-jdbc-examples#

7 认证

kyuubi的认证只是针对使用kyuubi本身,作用域仅仅是kyuubi这个组件!

kyuubi支持多种验证方式

7.1 JDBC认证

下面我们以mysql来进行测试

拷贝mysql驱动到$KYUUBI_HOME/jars目录

cp mysql-connector-java-8.0.22.jar $KYUUBI_HOME/jars

准备mysql表

create database test default character set utf8mb4;

create table kyuubi_user (`user` varchar(50) primary key, password varchar(50));

insert into kyuubi_user values ('xcchen',md5(concat('1qazZSE$','xcchen')));

配置kyuubi-defaults.conf,设置jdbc认证参数:

kyuubi.authentication=JDBC

kyuubi.authentication.jdbc.driver.class = com.mysql.jdbc.Driver

kyuubi.authentication.jdbc.url = jdbc:mysql://localhost:3306/test

kyuubi.authentication.jdbc.user = root

kyuubi.authentication.jdbc.password = root

kyuubi.authentication.jdbc.query = SELECT 1 FROM kyuubi_user WHERE user=${user} AND password=MD5(CONCAT('1qazZSE$',${password}))

使用beeline连接时填写用户名和密码(其中-n代表用户名,-p代码密码):

bin/beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL' -n xcchen -p xcchen

如果用户名密码未填写正确,报错信息如下:

Connecting to jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL

Unknown HS2 problem when communicating with Thrift server.

Error: Could not open client transport with JDBC Uri: jdbc:hive2://localhost:10009/?kyuubi.engine.type=FLINK_SQL: Peer indicated failure: Error validating the login (state=08S01,code=0)

Beeline version 1.6.1-incubating by Apache Kyuubi (Incubating)

Tips:如果认证配置不生效,请仔细检查query语句和你的mysql表是否对应上了,md5的salt值是否使用正确.

7.2 kerberos认证(推荐的方式)

8 授权

多租户

flink任务没有这个概念,就算使用相同的用户,他们的环境也是隔离的

spark任务相同用户,他们的会话也是不一样的

好文推荐

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。