文章目录

一、上传压缩包二、解压压缩包三、监控本地文件(file to kafka)3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包(1)创建maven项目(2)开发拦截器类(3)开发pom文件(4)打成jar包上传到Flume

3.2.3 修改配置文件

3.3 创建Kafka Topic3.4 启动Flume3.5 停止Flume

四、监控Kafka(kafka to hdfs)3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.33.1 自定义拦截器3.2 编写配置文件3.3 启动Flume3.4 停止Flume

五、监控 ip+port(TODO)

一、上传压缩包

官网:https://flume.apache.org/

二、解压压缩包

[mall@mall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

三、监控本地文件(file to kafka)

Flume是用java写的,所以需要确保JDK环境可用 需求描述:监控目录下多个文件写入Kafka TAILDIR SOURCE:本质是tail -F [file]命令,只能监控文件的新增和修改,不能处理历史文件。

3.1 编写配置文件

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/

[mall@mall apache-flume-1.9.0-bin]$ mkdir job

[mall@mall apache-flume-1.9.0-bin]$ cd job/

[mall@mall job]$ vim file_to_kafka.conf

内容:

# 0、配置agent:给source channel sink组件命名

a1.sources = r1

a1.channels = c1

# 1、配置source组件

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*

# 断点续传标记信息存储位置

a1.sources.r1.positionFile = /opt/module/apache-flume-1.9.0-bin/taildir_position.json

# 2、配置channel组件:event临时缓冲区

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.channels.c1.kafka.topic = topic_mall_applog

# 按照字符串类型传到kafka去

a1.channels.c1.parseAsFlumeEvent = false

# 3、配置source、channel、sink之间的连接关系

a1.sources.r1.channels = c1

3.2 自定义拦截器

作用:拦截events,经拦截器处理,输出处理后的events。 开发:创建maven项目,打成jar包形式上传到flume所在机器

3.2.1 开发拦截器jar包

(1)创建maven项目

(2)开发拦截器类

package com.songshuang.flume.interceptor;

import com.alibaba.fastjson.JSONException;

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;

import java.util.Iterator;

import java.util.List;

/**

* @date 2023/11/21 20:40

* 功能:剔除掉非json格式数据

*

* 1、实现接口

* 2、实现抽象方法

* 3、建造者模式:静态内部类

*/

public class ETLInterceptor implements Interceptor {

public void initialize() {

}

// 将log中event为非json格式数据置为null

public Event intercept(Event event) {

byte[] body = event.getBody();

// byte数组转为字符串

String log = new String(body, StandardCharsets.UTF_8);

boolean flag = false;

// 判断log是否是json格式

try {

JSONObject jsonObject = JSONObject.parseObject(log);

flag = true;

} catch (JSONException e) {

}

return flag ? event : null;

}

// 将log中event为null的删掉

public List intercept(List events) {

// 遍历events

Iterator iterator = events.iterator();

while (iterator.hasNext()) {

Event event = iterator.next();

if (intercept(event) == null) {

iterator.remove();

}

}

return events;

}

public void close() {

}

// 建造者模式

public static class Builder implements Interceptor.Builder {

@Override

public Interceptor build() {

return new ETLInterceptor();

}

@Override

public void configure(Context context) {

}

}

}

(3)开发pom文件

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.songshuang

flume_interceptor

1.0-SNAPSHOT

org.apache.flume

flume-ng-core

1.9.0

provided

com.alibaba

fastjson

1.2.62

maven-compiler-plugin

2.3.2

1.8

1.8

maven-assembly-plugin

jar-with-dependencies

make-assembly

package

single

(4)打成jar包上传到Flume

上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下

3.2.3 修改配置文件

[mall@mall job]$ vim file_to_kafka.conf

新增内容:

# 自定义拦截器

a1.sources.r1.interceptors = i1

# 指定自定义拦截器的建造者类名(入口)

a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.ETLInterceptor$Builder

3.3 创建Kafka Topic

为什么要手动创建topic:flume自动创建的topic默认1个分区,每个分区1个副本。手动创建可以指定分区和副本数,可以有效利用Kafka集群资源。 –bootstrap-server参数作用:连接Kafka集群

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog

3.4 启动Flume

注意:放开Kafka集群所在机器9092端口,对Flume所在机器放开。 原因:Flume需要向Kafka集群写入数据,所以需要具有访问Kafka集群端口的权限。 – conf参数:配置文件存储所在目录 – name参数:agent名称,每个Flume配置文件就是一个agent。 – conf-file参数:flume本次启动读取的配置文件 nohup配合&:后台运行 &>/dev/null:将标准输出重定向到 /dev/null ,即丢弃所有输出 2>/dev/null:将标准错误输出重定向到 /dev/null ,即丢弃所有错误输出

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/

[mall@mall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf &>/dev/null 2>/dev/null &

3.5 停止Flume

[mall@mall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf

[mall@mall apache-flume-1.9.0-bin]$ kill 11001

四、监控Kafka(kafka to hdfs)

需求描述:监控Kafka,将数据写入HDFS 如果想要从头消费需要设置kafka.consumer.auto.offset.reset = earliest,默认从最新offset开始 注意:需要在HDFS所在机器部署FLume,需要调用HADOOP相关jar包。

3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

否则Flume向HDFS写数据时会失败!

[hadoop@hadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar

3.1 自定义拦截器

作用:按照kafka消息中的时间字段,决定消息存储到hdfs的哪个文件中。

代码:

package com.songshuang.flume.interceptor;

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;

import java.util.List;

import java.util.Map;

/**

* @date 2023/11/22 16:52

* 作用:获取kafka中时间戳字段,放入event头中,flume写入hdfs时,从头部获取时间,作为该event放入hdfs的文件夹名称

*/

public class TimestampInterceptor implements Interceptor {

@Override

public void initialize() {

}

// 获取kafka时间戳字段,放入event的header

@Override

public Event intercept(Event event) {

byte[] body = event.getBody();

String log = new String(body, StandardCharsets.UTF_8);

JSONObject jsonObject = JSONObject.parseObject(log);

String ts = jsonObject.getString("ts");

Map headers = event.getHeaders();

headers.put("timestamp",ts); // event是引用变量类型,存储的是地址,header变了,自然event所对应地址上的值就变了

return event;

}

@Override

public List intercept(List events) {

for (Event event : events) {

intercept(event);

}

return events;

}

@Override

public void close() {

}

// 建造者模式

public static class Builder implements Interceptor.Builder {

@Override

public Interceptor build() {

return new TimestampInterceptor();

}

@Override

public void configure(Context context) {

}

}

}

3.2 编写配置文件

[hadoop@hadoop104 job]$ vim kafka_to_hdfs.conf

内容:

a1.sources.r1.kafka.consumer.group.id:消费者组名。 a1.channels.c1.type:file类型channel,缓冲数据放在磁盘中,而不是内存中。 a1.channels.c1.dataDirs:file channel缓冲内容落盘地址。 a1.channels.c1.checkpointDir:检查点存放位置,用于断点续传。 a1.sinks.k1.hdfs.fileType:指定文件的格式。包括:SequenceFile、DataStream、CompressedStream,当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值; a1.sinks.k1.hdfs.codeC:压缩编码。 a1.sinks.k1.hdfs.rollInterval:表示每隔多少秒,Flume就会将内部的缓冲区数据写入HDFS。默认值是30s。本质是rename .tmp文件。 a1.sinks.k1.hdfs.rollSize:表示当Flume的内部缓冲区达到指定字节数时,就会触发写入操作。单位是bytes。默认值是1024byte。 a1.sinks.k1.hdfs.rollCount:表示不论内部缓冲区的大小或时间,当写入的文件数量达到指定数量时,就会触发滚动操作。默认值是10。rollCount设为0表示关闭指定数量触发滚动的机制,是为了防止又出现文件数大小特别小且数量多的小文件情况;也就是不根据event(Kafka中的每条json消息)数量来滚动文件。

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 配置source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

a1.sources.r1.kafka.topics = topic_mall_applog

a1.sources.r1.kafka.consumer.group.id = consumer_group_flume

# 指定consumer从哪个offset开始消费,默认latest

# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest

# 自定义拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.TimestampInterceptor$Builder

# 配置sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /warehouse/applog/gmall/tracking_log/%Y-%m-%d

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

# 文件滚动策略

a1.sinks.k1.hdfs.rollInterval = 20

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

# 配置channel

a1.channels.c1.type = file

a1.channels.c1.dataDirs = /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs

a1.channels.c1.checkpointDir = /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3.3 启动Flume

注意1:需要放开kafka端口,即9092端口,Flume要读Kafka。

[hadoop@hadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/

[hadoop@hadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf &>/dev/null 2>/dev/null &

3.4 停止Flume

[hadoop@hadoop104 job]$ ps -ef | grep kafka_to_hdfs.conf

[hadoop@hadoop104 job]$ kill 21664

五、监控 ip+port(TODO)

相关链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。