目录

前提条件

基本准备

批处理API实现WordCount

流处理API实现WordCount

数据源是文件

数据源是socket文本流

打包

提交到集群运行

命令行提交作业

Web UI提交作业

上传代码到gitee

前提条件

Windows安装好jdk8、Maven3、IDEA

Linux安装好Flink集群,可参考:CentOS7安装flink1.17完全分布式  

基本准备

创建项目

使用IDEA创建一个新的Maven项目,项目名称,例如:flinkdemo

添加依赖

在项目的pom.xml文件中添加Flink的依赖。

1.17.1

org.apache.flink

flink-streaming-java

${flink.version}

org.apache.flink

flink-clients

${flink.version}

刷新依赖

刷新依赖后,能看到相关依赖如下

刷新依赖过程需要等待一些时间来下载相关依赖。

如果依赖下载慢,可以设置阿里云仓库镜像:

 1.设置maven的settings.xml

在上面一行添加阿里云仓库镜像

alimaven

aliyun maven

http://maven.aliyun.com/nexus/content/groups/public/

central

2.IDEA设置maven

数据准备

在工程的根目录下,新建一个data文件夹

并在data文件夹下创建文本文件words.txt

内容如下

hello world

hello java

hello flink

新建包

右键src/main下的java,新建Package

填写包名org.example,包名与groupId的内容一致。

批处理API实现WordCount

在org.exmaple下新建wc包及BatchWordCount类

填写wc.BatchWordCount

效果如下

BatchWordCount.java代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.api.java.operators.AggregateOperator;

import org.apache.flink.api.java.operators.DataSource;

import org.apache.flink.api.java.operators.FlatMapOperator;

import org.apache.flink.api.java.operators.UnsortedGrouping;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.util.Collector;

public class BatchWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建执行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 2. 从文件读取数据 按行读取

DataSource lineDS = env.readTextFile("data/words.txt");

// 3. 转换数据格式

FlatMapOperator> wordAndOne = lineDS.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String line, Collector> out) throws Exception {

String[] words = line.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word,1L));

}

}

});

// 4. 按照 word 进行分组

UnsortedGrouping> wordAndOneUG = wordAndOne.groupBy(0);

// 5. 分组内聚合统计

AggregateOperator> sum = wordAndOneUG.sum(1);

// 6. 打印结果

sum.print();

}

}

运行程序,查看结果

注意,以上代码的实现方式是基于DataSet API的,是批处理API。而Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。从Flink 1.12开始,官方推荐直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

$ flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

流处理API实现WordCount

数据源是文件

在org.example.wc包下新建Java类StreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class StreamWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建流式执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 读取文件

DataStreamSource lineStream = env.readTextFile("input/words.txt");

// 3. 转换、分组、求和,得到统计结果

SingleOutputStreamOperator> sum = lineStream.flatMap(new FlatMapFunction>() {

@Override

public void flatMap(String line, Collector> out) throws Exception {

String[] words = line.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word, 1L));

}

}

}).keyBy(data -> data.f0)

.sum(1);

// 4. 打印

sum.print();

// 5. 执行

env.execute();

}

}

运行结果

与批处理程序BatchWordCount的区别:

创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。 转换处理之后,得到的数据对象类型不同。 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。 代码末尾需要调用env的execute方法,开始执行任务。

数据源是socket文本流

流处理的输入数据通常是流数据,将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取socket文本流的方法socketTextStream。

在org.example.wc包下新建Java类SocketStreamWordCount,代码如下:

package org.example.wc;

import org.apache.flink.api.common.typeinfo.Types;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;

public class SocketStreamWordCount {

public static void main(String[] args) throws Exception {

// 1. 创建流式执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 读取文本流:node2表示发送端主机名(根据实际情况修改)、7777表示端口号

DataStreamSource lineStream = env.socketTextStream("node2", 7777);

// 3. 转换、分组、求和,得到统计结果

SingleOutputStreamOperator> sum = lineStream.flatMap((String line, Collector> out) -> {

String[] words = line.split(" ");

for (String word : words) {

out.collect(Tuple2.of(word, 1L));

}

}).returns(Types.TUPLE(Types.STRING, Types.LONG))

.keyBy(data -> data.f0)

.sum(1);

// 4. 打印

sum.print();

// 5. 执行

env.execute();

}

}

进入node2终端,如果没有nc命令,需要先安装nc命令,安装nc命令如下:

[hadoop@node2 ~]$ sudo yum install nc -y

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

IDEA中,运行SocketStreamWordCount程序。

往7777端口发送数据,例如发送hello world

控制台输出

继续往7777端口发送数据,例如发送hello flink

控制台输出

停止SocketStreamWordCount程序。

按Ctrl+c停止nc命令。

打包

这里的打包是将写好的程序打成jar包。

点击IDEA右侧的Maven,按住Ctrl键同时选中clean和package(第一次打包可以只选中package),点击执行打包。

打包成功后,看到如下输出信息,生成的jar包在项目的target目录下

提交到集群运行

把jar包提交到flink集群运行有两种方式:

1.通过命令行提交作业   

2.通过Web UI提交作业

命令行提交作业

将jar包上传Linux

启动flink集群

[hadoop@node2 ~]$ start-cluster.sh

Starting cluster.

Starting standalonesession daemon on host node2.

Starting taskexecutor daemon on host node2.

Starting taskexecutor daemon on host node3.

Starting taskexecutor daemon on host node4.

开启nc监听

[hadoop@node2 ~]$ nc -lk 7777

命令提交作业

开启另一个node2终端,使用flink run命令提交作业到flink集群

[hadoop@node2 ~]$ flink run -m node2:8081 -c org.example.wc.SocketStreamWordCount flinkdemo-1.0-SNAPSHOT.jar

-m指定提交到的JobManager,-c指定程序入口类。

发送测试数据

在nc监听终端,往7777端口发送数据

查看结果

Web UI查看结果

浏览器访问

node2:8081

看到正在运行的作业如下

查看结果

继续发送测试数据

在nc终端继续发送数据

Web UI刷新结果

命令行查看结果

打开新的node2终端,查看结果

[hadoop@node2 ~]$ cd $FLINK_HOME/log

[hadoop@node2 log]$ ls

flink-hadoop-client-node2.log                 flink-hadoop-standalonesession-0-node2.out

flink-hadoop-standalonesession-0-node2.log   flink-hadoop-taskexecutor-0-node2.log

flink-hadoop-standalonesession-0-node2.log.1 flink-hadoop-taskexecutor-0-node2.log.1

flink-hadoop-standalonesession-0-node2.log.2 flink-hadoop-taskexecutor-0-node2.log.2

flink-hadoop-standalonesession-0-node2.log.3 flink-hadoop-taskexecutor-0-node2.log.3

flink-hadoop-standalonesession-0-node2.log.4 flink-hadoop-taskexecutor-0-node2.log.4

flink-hadoop-standalonesession-0-node2.log.5 flink-hadoop-taskexecutor-0-node2.out

[hadoop@node2 log]$ cat flink-hadoop-taskexecutor-0-node2.out

(hello,1)

(flink,1)

(hello,2)

(world,1)

取消flink作业

点击Cancel Job取消作业 

停止nc监听

按Ctrl+c停止nc命令

Web UI提交作业

开启nc监听

开启nc监听发送数据

[hadoop@node2 ~]$ nc -lk 7777

Web UI提交作业

浏览器访问

node2:8081

点击Submit New Job

点击Add New

选择flink作业jar包所在路径

点击jar包名称

填写相关内容,点击Submit提交作业

Entry Class填写运行的主类,例如:org.example.wc.SocketStreamWordCount

Parallesim填写作业的并行度,例如:1

提交后,在Running Jobs里看到运行的作业

发送测试数据

往7777端口发送数据

查看结果

继续发送测试数据

刷新结果

取消作业

停止nc监听

按住Ctrl+c停止nc命令

关闭flink集群

[hadoop@node2 ~]$ stop-cluster.sh

Stopping taskexecutor daemon (pid: 2283) on host node2.

Stopping taskexecutor daemon (pid: 1827) on host node3.

Stopping taskexecutor daemon (pid: 1829) on host node4.

Stopping standalonesession daemon (pid: 1929) on host node2.

上传代码到gitee

登录gitee

https://gitee.com/

注意:如果还没有gitee账号,需要先注册;如果之前没有设置过SSH公钥,需要先设置SSH公钥。

创建仓库

提交代码

使用IDEA提交代码

提示有警告,忽略警告,继续提交

提交成功后,IDEA显示如下

刷新浏览器查看gitee界面,看到代码已上传成功

完成!enjoy it!

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。