1.定义

StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。

2.主要功能和设计思路

提供编程接口StreamExecutionEnvironment提供了Java和Scala编程接口,使得开发者可以使用自己熟悉的编程语言来定义和执行流处理程序。

定义数据源StreamExecutionEnvironment提供了多种定义数据源的方法,包括从文件、socket、Kafka等读取数据,也可以通过自定义数据源来读取数据。

定义数据处理操作StreamExecutionEnvironment提供了多种数据处理操作,包括map、filter、reduce、keyBy、window等,可以根据需求进行灵活配置和组合。

定义数据输出StreamExecutionEnvironment提供了多种数据输出方法,包括将数据输出到文件、数据库、Kafka等,也可以通过自定义输出方法来输出数据。

定义并行度StreamExecutionEnvironment可以根据数据处理任务的特点和集群资源的情况来动态调整并行度,以提高程序的执行效率。

集成第三方库StreamExecutionEnvironment集成了多种第三方库,如Apache Kafka、Apache Cassandra、Elasticsearch等,方便开发者使用这些库来进行数据处理和存储。

支持流式迭代StreamExecutionEnvironment支持流式迭代,即在数据流中对数据进行多次迭代处理,以支持迭代算法和机器学习等应用场景。

支持事件时间处理StreamExecutionEnvironment支持事件时间处理,即在数据流中根据事件发生的时间进行数据处理和聚合,以支持实时数据分析和监控等应用场景。

3.使用示例

 

4.源代码核心方法剖析

     4.1方法说明

构造函数StreamExecutionEnvironment有多个构造函数,其中默认构造函数是私有的,不能从外部直接创建StreamExecutionEnvironment实例。另外一个常用的构造函数是fromXXX()系列方法,例如fromElements()、fromCollection()、fromParallelCollection(),用于创建DataStream的执行环境。

setParallelism()方法用于设置任务的并行度,即任务中并行执行的任务数。默认情况下,Flink会根据系统资源和任务的特性自动设置并行度,但用户也可以通过setParallelism()方法手动设置并行度。

enableCheckpointing()方法用于启用和配置checkpoint机制。checkpoint是一种容错机制,用于将任务执行过程中的中间结果保存到外部存储中,以便在任务失败时恢复任务执行进度。enableCheckpointing()方法可以设置checkpoint的间隔时间、超时时间、最大并发数等参数。

setStreamTimeCharacteristic()方法用于设置任务的时间特性,即任务如何处理事件时间和处理时间。Flink支持三种时间特性:ProcessingTime、EventTime和IngestionTime。ProcessingTime表示任务处理数据时使用的本地系统时间;EventTime表示数据本身携带的时间戳信息,用于实现基于事件时间的数据处理;IngestionTime表示数据进入系统的时间,用于实现基于系统时间的数据处理。

addSource()方法用于向任务中添加数据源。addSource()方法可以接受多种类型的数据源,例如Kafka、Socket、文件、集合等。

addSink()方法用于向任务中添加数据汇。addSink()方法可以接受多种类型的数据汇,例如Kafka、Socket、文件、集合等。

execute()方法用于启动流处理任务的执行。execute()方法会将所有的Transformation操作和数据源、数据汇等组件构建成一个StreamGraph,并将StreamGraph转换为JobGraph,最终提交给Flink集群执行。

4.2部分核心方法源码示例

  StreamExecutionEnvironment的代码比较复杂,这里只提供其中一部分的示例代码作为参考:

public class StreamExecutionEnvironment {

//默认的执行环境并行度

private final int defaultLocalParallelism;

//默认的执行环境

private final ExecutorService defaultExecutorService;

//配置文件

private final Configuration configuration;

//执行环境的ID

private final String executorId;

//用户自定义的类加载器

private final ClassLoader userClassLoader;

//数据源注册中心

private final SourceFunctionRegistry sourceFunctionRegistry;

//转换器注册中心

private final StreamOperatorFactory operatorFactory;

//配置信息

private final CheckpointConfig checkpointConfig;

//时间特性

private final TimeCharacteristic timeCharacteristic;

//状态后端

private final StateBackend stateBackend;

/**

* 构造方法,用于创建一个StreamExecutionEnvironment对象

*

* @param executorService 默认的执行环境

* @param configuration 配置文件

* @param userClassLoader 用户自定义的类加载器

* @param defaultLocalParallelism 默认的执行环境并行度

* @param executorId 执行环境的ID

*/

public StreamExecutionEnvironment(

ExecutorService executorService,

Configuration configuration,

ClassLoader userClassLoader,

int defaultLocalParallelism,

String executorId) {

this.defaultLocalParallelism = defaultLocalParallelism;

this.defaultExecutorService = executorService;

this.configuration = configuration == null ? new Configuration() : configuration;

this.executorId = executorId != null ? executorId : UUID.randomUUID().toString();

this.userClassLoader = userClassLoader == null ? getClass().getClassLoader() : userClassLoader;

this.sourceFunctionRegistry = new SourceFunctionRegistry();

this.operatorFactory = new StreamOperatorFactory<>();

this.checkpointConfig = new CheckpointConfig();

this.timeCharacteristic = TimeCharacteristic.ProcessingTime;

this.stateBackend = null;

}

/**

* 获取数据流处理的默认并行度

*

* @return 默认并行度

*/

public int getDefaultLocalParallelism() {

return defaultLocalParallelism;

}

/**

* 获取配置文件

*

* @return 配置文件

*/

public Configuration getConfiguration() {

return configuration;

}

/**

* 获取执行环境的ID

*

* @return 执行环境的ID

*/

public String getId() {

return executorId;

}

/**

* 获取用户自定义的类加载器

*

* @return 用户自定义的类加载器

*/

public ClassLoader getUserClassLoader() {

return userClassLoader;

}

/**

* 获取数据源注册中心

*

* @return 数据源注册中心

*/

public SourceFunctionRegistry getSourceFunctionRegistry() {

return sourceFunctionRegistry;

}

/**

* 获取转换器注册中心

*

* @return 转换器注册中心

*/

public StreamOperatorFactory getOperatorFactory() {

return operatorFactory;

}

/**

* 获取检查点配置

*

* @return 检查点配置

*/

public CheckpointConfig getCheckpointConfig() {

return checkpointConfig;

}

/**

* 获取时间特性

*

* @return 时间特性

*/

public TimeCharacteristic getTimeCharacteristic() {

return timeCharacteristic;

}

/**

* 获取状态后端

*

* @return 状态后端

*/

public StateBackend getStateBackend() {

return stateBackend;

}

/**

* 设置执行环境的默认并行度

*

* @param parallelism 并行度

*/

public void setDefaultLocalParallelism(int parallelism) {

configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, parallelism);

}

/**

* 获取一个DataStream对象

*

* @param source 数据源

* @param type 数据类型

* @param 数据类型

* @return DataStream对象

*/

public DataStream addSource(SourceFunction source, TypeInformation type) {

// 创建SourceTransformation对象,表示对数据源进行转换操作

SourceTransformation transform = new SourceTransformation<>(source, "Source", type, defaultLocalParallelism);

// 将SourceTransformation对象添加到转换器注册中心中

operatorFactory.addOperator(transform);

// 返回转换后的DataStream对象

return new DataStream<>(this, getNewNodeId(), transform.getOutputType());

}

/**

* 获取一个DataStream对象

*

* @param source 数据源

* @param 数据类型

* @return DataStream对象

*/

public DataStreamSource addSource(SourceFunction source) {

return addSource(source, TypeExtractor.createTypeInfo(SourceFunction.class, source.getClass(), 0));

}

/**

* 获取一个Table对象

*

* @return Table对象

*/

public TableEnvironment createTableEnvironment() {

// 创建TableEnvironment对象

return TableEnvironment.create(configuration, executorComponents);

}

// 其他操作的实现略

}

 

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。