目录

一、Flume是什么?+项目背景

1.Flume介绍

二、Flume基础架构

 1、核心组件

(1)Agent

(2)Source

(3)channel

(4)Sink

(5)Event

2、工作流程 

3、Flume的事务性

4、 Flume会丢失数据吗?

三、Flume安装+基本探索

1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径

2、 配置conf下的flume-env.sh

 3、实现一个官网例子

(1)官网如下

(2)我们修改为自己的执行语句 

(3)再另开一个窗口

(4)接着启动监控

(5)再接着启动一个界面,接着启动

四、开发Flume脚本

1、spooldir模式监控本地文件上传HDFS

(1)创建文件 spooldir-flume-hdfs.conf

(2)启动配置监控

(3)往我们的监控文件夹下cp一个文件

(4)查看我们的hdfs相应设置的目录

2、taildir模式监控本地文件上传HDFS

(1)创建文件 taildir-flume-hdfs.conf

(2)启动配置监控

(3)往我们的监控文件夹下cp一个文件

(4)查看相应HDFS

五、 编写Flume拦截器

1、flume自定义拦截器步骤

2.pom文件配置

3、编写拦截器代码

(1)代码如下:

(2)使用 package 打包

4、将生成的jar放入 Flume下的lib里

 5、编辑  taildir-hdfs.conf  文件

6、开启文件监控

7、传一个文件试一下

8、查看下HDFS

9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf

10、启动脚本监控

11、测试各类型是否可以正常加载

(1)页面类型

(2)action类型 

(3)login 类型

(4)exposure 类型

12、编写一个flume 启停脚本 如下

13、测试一下脚本

六、将数据加载到hive表

1、编写加载到hive的project库下表的脚本

2、执行脚本

3、查看 hive表中数据

一、Flume是什么?+项目背景

1.Flume介绍

Flume 基于流式架构是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

本项目 Flume 实时读取服务器本地目录下生成的埋点数据,将数据实时写入到HDFS。

有的公司涉及几十甚至上百的的web服务器

操作流程可能如下:

二、Flume基础架构

 1、核心组件

(1)Agent

Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目标地。

主要有 3 个部分组成,Source、Channel、Sink。

(2)Source

Source 是负责接收数据到 Flume Agent 的组件。

主要包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。

(3)channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。

注:一个channel可对应多个sink,但是一个sink只能对应一个channel。

Flume 自带两种 Channel:Memory Channel 和 File Channel。

Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。

File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

(4)Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储系统、或者被发送到另一个 Flume Agent。

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr。

(5)Event

传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

Event 由 Headers 和 Body 两部分组成,Headers 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

2、工作流程 

(1)数据通过Source采集进入Flume,Flume以通过Agent以事件的形式将数据从源头到目的地。

(2)进入事件处理,Event为传输单元,由可选的header和载有数据的body(byte array)构成。

(3)在agent可对数据进行粗步拦截,排除某些不采集的文件,文件类型。

(4)Channel选择器,两种Channel Selector,一种是Replicating channel另一种是Multiplexing Channel,前者将source的event发往左右的channel,适用于数据多副本存储,后者将event发往指定的channel,适用于数据定向发送。

(5)将event写入对应channel列表

(6)SinkProcessor从Channel中拉数据

分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor

DefaultSinkProcessor 对 应 的 是 单 个 的 Sink , LoadBalancingSinkProcessor 和FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以错误恢复的功能

(7)最后把数据Sink出去

3、Flume的事务性

Put事务:Source到Channel doPut:将数据从souce写入临时缓冲区putList doCommit:检查Channel内存队列是否足够合并 doRollback:channel 内存队列空间不足,则回滚数据

take事务:channel到sink doTake:将数据取到临时缓冲区takeList doCommit:如果数据全部发送成功,则清除临时缓冲区takeList doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列

4、 Flume会丢失数据吗?

Source到Channel是事务性的,

Channel到Sink也是事务性的,

这两个环节都不可能丢失数据。

唯一可能丢失数据的是Channel采用MemoryChannel,

(1)在agent宕机时候导致数据在内存中丢失;

(2)Channel存储数据已满,导致Source不再写入数据,造成未写入的数据丢失;

具体分析:

flume传输是否会丢失或重复数据?   这个问题需要分情况来看,需要结合具体使用的source、channel和sink来分析。

首先,分析source:

(1)exec source ,后面接 tail -f ,这个数据也是有可能丢的。

(2)TailDir source ,这个是不会丢数据的,它可以保证数据不丢失。

其次,分析sink:

(1)hdfs/kakfa sink,数据有可能重复,但是不会丢失。

一般生产过程中,都是使用 TailDir source 和 HDFS sink,所以数据会重复但是不会丢失。

最后,分析channel 要想数据不丢失的话,还是要用 File channel,而memory channel 在flume挂掉的时候还是有可能造成数据的丢失的。

三、Flume安装+基本探索

1、从官网下载apache-flume-1.9.0-bin.tar.gz,放到服务器并解压至相关路径

2、 配置conf下的flume-env.sh

将java路径添加进去

 3、实现一个官网例子

(1)官网如下

bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

(2)我们修改为自己的执行语句 

bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template

在服务器执行

[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent -n peizk -c conf -f conf/flume-conf.properties.template

Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access

Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access

+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application -n peizk -f conf/flume-conf.properties.template

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)再另开一个窗口

在flume目录下新建一个文件

[peizk@hadoop flume-1.9.0]$ vim example.conf

将我们官网代码放入

# example.conf: A single-node Flume configuration

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# Describe the sink

a1.sinks.k1.type = logger

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

(4)接着启动监控

bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

 --conf/-c:表示配置文件存储在 conf/目录 --name/-n:表示给 agent 起名为 a1 --conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf 文件。-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error

(5)再接着启动一个界面,接着启动

[peizk@hadoop flume-1.9.0]$ telnet localhost 44444

Trying ::1...

telnet: connect to address ::1: Connection refused

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

写一些数据进去

[peizk@hadoop flume-1.9.0]$ telnet localhost 44444

Trying ::1...

telnet: connect to address ::1: Connection refused

Trying 127.0.0.1...

Connected to localhost.

Escape character is '^]'.

hello peizk

OK

查看 第二个窗口

完成官网例子,通过flume监控4444端口,往里存什么,拿什么 

四、开发Flume脚本

1、spooldir模式监控本地文件上传HDFS

(1)创建文件 spooldir-flume-hdfs.conf

内容如下:

a2.sources = r2

a2.sinks = k2

a2.channels = c2

#配置source

a2.sources.r2.type = spooldir

a2.sources.r2.spoolDir = /home/peizk/app/flume-1.9.0/test_log

a2.sources.r2.fileSuffix = .COMPLETED

a2.sources.r2.fileHeader = true

#忽略所有以.tmp 结尾的文件,不上传

a2.sources.r2.ignorePattern = ([^ ]*\.tmp)

#配置sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/spooldir/%Y-%m-%d/

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = app-

#是否按照时间滚动文件夹

a2.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k2.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k2.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k2.hdfs.rollCount = 0

#配置channel

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

#配置关联关系

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

(2)启动配置监控

[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/spooldir-flume-hdfs.conf

Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access

Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access

+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a2 --conf-file job/spooldir-flume-hdfs.conf

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)往我们的监控文件夹下cp一个文件

[peizk@hadoop test_log]$ cp ../logs/flume.log ./

(4)查看我们的hdfs相应设置的目录

成功!

2、taildir模式监控本地文件上传HDFS

(1)创建文件 taildir-flume-hdfs.conf

内容如下:

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# 配置source

a3.sources.r3.type = TAILDIR

a3.sources.r3.positionFile = /home/peizk/app/flume-1.9.0/taildir.json

a3.sources.r3.filegroups = f1

a3.sources.r3.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/.*page.*

a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 配置sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = page-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.k3.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k3.hdfs.rollCount = 0

#配置channel

#channe类型

a3.channels.c3.type = memory

#储存在channel中最大event数量

a3.channels.c3.capacity = 1000

#从channel中去给一个sink,每个事务中最大的事件数

a3.channels.c3.transactionCapacity = 100

#配置关联关系

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

(2)启动配置监控

[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf

Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access

Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access

+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a3 --conf-file job/taildir-flume-hdfs.conf

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

(3)往我们的监控文件夹下cp一个文件

[peizk@hadoop test_log]$ cp ~/testpage.log ./

(4)查看相应HDFS

五、 编写Flume拦截器

1、flume自定义拦截器步骤

1)继承接口:org.apache.flume.interceptor.Interceptor 2)实现接口中的4个抽象方法:初始化 initialize()、单个事件拦截 intercept(Event event)、批量事件拦截 intercept(List events)、关闭io流 close() 3)创建一个静态内部类Builder,并实现接口implements Interceptor.Builder。我们自定义的拦截器这个类,没有办法直接new,而是在flume的配置文件中进行配置,通过配置文件调用静态内部类,来间接地调用自定义的拦截器对象。

拦截器是拦截一条一条的事件、启动时会初始化,会存在数据流,initialize开启数据流、close关闭数据流,event包括header+body

2.pom文件配置

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

Flume

1.0

org.apache.maven.plugins

maven-compiler-plugin

6

6

maven-compiler-plugin

2.3.2

1.8

1.8

maven-assembly-plugin

jar-with-dependencies

make-assembly

package

single

org.apache.flume

flume-ng-core

1.9.0

provided

com.alibaba

fastjson

1.2.4

3、编写拦截器代码

(1)代码如下:

package sm.flume;

import com.alibaba.fastjson.JSONObject;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;

import java.util.List;

import java.util.Map;

public class LogInterceptor implements Interceptor {

public void initialize() {

}

public Event intercept(Event event) {

byte[] body = event.getBody();

String s = new String(body, Charset.forName("utf-8"));

JSONObject jsonObject = JSONObject.parseObject(s);

String timestamp = jsonObject.getString("ts");

String type = jsonObject.getString("type");

Map headers = event.getHeaders();

headers.put("timestamp",timestamp);

headers.put("type",type);

return event;

}

public List intercept(List events) {

for (Event event : events) {

intercept(event);

}

return events;

}

public void close() {

}

public static class Builder implements Interceptor.Builder{

@Override

public Interceptor build() {

return new LogInterceptor();

}

@Override

public void configure(Context context) {

}

}

}

(2)使用 package 打包

4、将生成的jar放入 Flume下的lib里

 5、编辑  taildir-hdfs.conf  文件

[peizk@hadoop job]$ vim taildir-hdfs.conf

内容为:

a4.sources = r1

a4.channels = c1

a4.sinks = k1

# 配置source

a4.sources.r1.type = TAILDIR

a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs.json

a4.sources.r1.filegroups = f1

a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.*

a4.sources.r1.ignorePattern = ([^ ]*\.tmp)

a4.sources.r1.interceptors = i1

a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder

# 配置sink

a4.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d

#上传文件的前缀

a4.sinks.k1.hdfs.filePrefix = page-

#是否按照时间滚动文件夹

a4.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a4.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a4.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳

#a4.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a4.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a4.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件

a4.sinks.k1.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a4.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a4.sinks.k1.hdfs.rollCount = 0

#配置channel

a4.channels.c1.type = memory

a4.channels.c1.capacity = 1000

a4.channels.c1.transactionCapacity = 100

#配置关联关系

a4.sources.r1.channels = c1

a4.sinks.k1.channel = c1

6、开启文件监控

[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs.conf

Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access

Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access

+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs.conf

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

7、传一个文件试一下

[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./

8、查看下HDFS

时间戳已改变,成功!

9、添加对不同类型的更改 新建文件 taildir-hdfs-all.conf

[peizk@hadoop job]$ vim taildir-hdfs-all.conf

内容如下:

a4.sources = r1

a4.channels = c1 c2 c3 c4

a4.sinks = k1 k2 k3 k4

# 配置source

a4.sources.r1.type = TAILDIR

a4.sources.r1.positionFile = /home/peizk/app/flume-1.9.0/taildir_hdfs_all.json

a4.sources.r1.filegroups = f1

a4.sources.r1.filegroups.f1 = /home/peizk/app/flume-1.9.0/test_log/app.*

a4.sources.r1.ignorePattern = ([^ ]*\.tmp)

a4.sources.r1.interceptors = i1

a4.sources.r1.interceptors.i1.type = sm.flume.LogInterceptor$Builder

a4.sources.r1.selector.type = multiplexing

a4.sources.r1.selector.header = type

a4.sources.r1.selector.mapping.page = c1

a4.sources.r1.selector.mapping.exposure = c2

a4.sources.r1.selector.mapping.actions = c3

a4.sources.r1.selector.mapping.login = c4

# 配置sink

a4.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.path = hdfs://hadoop:9000/user/flume/app-page/%Y-%m-%d

#上传文件的前缀

a4.sinks.k1.hdfs.filePrefix = page-

#是否按照时间滚动文件夹

a4.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a4.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a4.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳

#a4.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a4.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a4.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件

a4.sinks.k1.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a4.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a4.sinks.k1.hdfs.rollCount = 0

a4.sinks.k2.type = hdfs

a4.sinks.k2.hdfs.path = hdfs://hadoop:9000/user/flume/app-exposure/%Y-%m-%d

#上传文件的前缀

a4.sinks.k2.hdfs.filePrefix = exposure-

#是否按照时间滚动文件夹

a4.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a4.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a4.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

#a4.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a4.sinks.k2.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a4.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a4.sinks.k2.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a4.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a4.sinks.k2.hdfs.rollCount = 0

a4.sinks.k3.type = hdfs

a4.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/flume/app-actions/%Y-%m-%d

#上传文件的前缀

a4.sinks.k3.hdfs.filePrefix = actions-

#是否按照时间滚动文件夹

a4.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a4.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a4.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

#a4.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a4.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a4.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件

a4.sinks.k3.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a4.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a4.sinks.k3.hdfs.rollCount = 0

a4.sinks.k4.type = hdfs

a4.sinks.k4.hdfs.path = hdfs://hadoop:9000/user/flume/app-login/%Y-%m-%d

#上传文件的前缀

a4.sinks.k4.hdfs.filePrefix = login-

#是否按照时间滚动文件夹

a4.sinks.k4.hdfs.round = true

#多少时间单位创建一个新的文件夹

a4.sinks.k4.hdfs.roundValue = 1

#重新定义时间单位

a4.sinks.k4.hdfs.roundUnit = hour

#是否使用本地时间戳

#a4.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a4.sinks.k4.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a4.sinks.k4.hdfs.fileType = DataStream

#多久生成一个新的文件

a4.sinks.k4.hdfs.rollInterval = 60

#设置每个文件的滚动大小大概是 128M

a4.sinks.k4.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a4.sinks.k4.hdfs.rollCount = 0

#配置channel

a4.channels.c1.type = memory

a4.channels.c1.capacity = 1000

a4.channels.c1.transactionCapacity = 100

a4.channels.c2.type = memory

a4.channels.c2.capacity = 1000

a4.channels.c2.transactionCapacity = 100

a4.channels.c3.type = memory

a4.channels.c3.capacity = 1000

a4.channels.c3.transactionCapacity = 100

a4.channels.c4.type = memory

a4.channels.c4.capacity = 1000

a4.channels.c4.transactionCapacity = 100

#配置关联关系

a4.sources.r1.channels = c1 c2 c3 c4

a4.sinks.k1.channel = c1

a4.sinks.k2.channel = c2

a4.sinks.k3.channel = c3

a4.sinks.k4.channel = c4

10、启动脚本监控

[peizk@hadoop flume-1.9.0]$ bin/flume-ng agent --conf conf/ --name a4 --conf-file job/taildir-hdfs-all.conf

Info: Sourcing environment configuration script /home/peizk/app/flume-1.9.0/conf/flume-env.sh

Info: Including Hadoop libraries found via (/home/peizk/app/hadoop-3.1.3/bin/hadoop) for HDFS access

Info: Including Hive libraries found via (/home/peizk/app/hive-3.1.2) for Hive access

+ exec /home/peizk/app/jdk1.8.0_212/bin/java -Xmx20m -cp '/home/peizk/app/flume-1.9.0/conf:/home/peizk/app/flume-1.9.0/lib/*:/home/peizk/app/hadoop-3.1.3/etc/hadoop:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/hdfs/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/mapreduce/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/lib/*:/home/peizk/app/hadoop-3.1.3/share/hadoop/yarn/*:/home/peizk/app/hive-3.1.2/lib/*' -Djava.library.path=:/home/peizk/app/hadoop-3.1.3/lib/native org.apache.flume.node.Application --name a4 --conf-file job/taildir-hdfs-all.conf

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/flume-1.9.0/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

11、测试各类型是否可以正常加载

(1)页面类型

[peizk@hadoop test_log]$ cp ../log_data/page_data/app_page_log_20220101.log ./

(2)action类型 

[peizk@hadoop test_log]$ cp ../log_data/actions_data/app_actions_log_20220101.log ./

(3)login 类型

[peizk@hadoop test_log]$ cp ../log_data/login_data/app_login_log_20220101.log ./

(4)exposure 类型

[peizk@hadoop test_log]$ cp ../log_data/exposure_data/app_exposure_log_20220101.log ./

12、编写一个flume 启停脚本 如下

[peizk@hadoop bin]$ vim flum.sh

#! /bin/bash

case $1 in

"start"){

for i in hadoop

do

echo " --------启动 $i 采集flume-------"

ssh $i "nohup /home/peizk/app/flume-1.9.0/bin/flume-ng agent --conf-file /home/peizk/app/flume-1.9.0/job/taildir-hdfs-all.conf --name a3 -Dflume.root.logger=INFO,LOGFILE 2>&1 &"

done

};;

"stop"){

for i in hadoop

do

echo " --------停止 $i 采集flume-------"

ssh $i "ps -ef | grep taildir-hdfs-all.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "

done

};;

esac

13、测试一下脚本

六、将数据加载到hive表

1、编写加载到hive的project库下表的脚本

内容如下:

#!/bin/bash

# 如果传入日期则etl_date等于传入的日期,否则赋值前一天日期

if [ -n "$2" ] ;then

etl_date=$2

else

etl_date=`date -d "-1 day" +%F`

fi

case $1 in

"page")

echo "------------开始加载$1表${etl_date}分区数据----------------"

hive -e "load data inpath '/user/flume/app-page/${etl_date}/*' into table project.ods_log_page_incr partition(pt='${etl_date}')"

;;

"actions")

echo "------------开始加载$1表${etl_date}分区数据----------------"

hive -e "load data inpath '/user/flume/app-actions/${etl_date}/*' into table project.ods_log_actions_incr partition(pt='${etl_date}')"

;;

"login")

echo "------------开始加载$1表${etl_date}分区数据----------------"

hive -e "load data inpath '/user/flume/app-login/${etl_date}/*' into table project.ods_log_login_incr partition(pt='${etl_date}')"

;;

"exposure")

echo "------------开始加载$1表${etl_date}分区数据----------------"

hive -e "load data inpath '/user/flume/app-exposure/${etl_date}/*' into table project.ods_log_exposure_incr partition(pt='${etl_date}')"

;;

"all")

echo "------------开始加载全部日志表${etl_date}分区数据----------------"

hive -e "load data inpath '/user/flume/app-page/${etl_date}/*' into table project.ods_log_page_incr partition(pt='${etl_date}')"

hive -e "load data inpath '/user/flume/app-actions/${etl_date}/*' into table project.ods_log_actions_incr partition(pt='${etl_date}')"

hive -e "load data inpath '/user/flume/app-login/${etl_date}/*' into table project.ods_log_login_incr partition(pt='${etl_date}')"

hive -e "load data inpath '/user/flume/app-exposure/${etl_date}/*' into table project.ods_log_exposure_incr partition(pt='${etl_date}')"

esac

2、执行脚本

[peizk@hadoop job]$ sh load_hive.sh all 2022-01-01

------------开始加载全部日志表2022-01-01分区数据----------------

which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Hive Session ID = 0d0cd9d8-c692-49d0-856b-2b3b12d0969f

Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true

Hive Session ID = 76e99fe1-f958-4264-b171-d7732719b551

Loading data to table project.ods_log_page_incr partition (pt=2022-01-01)

OK

Time taken: 1.962 seconds

which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Hive Session ID = 2e8f8319-c819-4d0e-85d6-44bcd913fee5

Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true

Hive Session ID = 81624ba6-94cd-43ae-8938-1c9ac040bb70

Loading data to table project.ods_log_actions_incr partition (pt=2022-01-01)

OK

Time taken: 1.936 seconds

which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Hive Session ID = 898ac37e-0919-4d31-9870-574d5c5fdf00

Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true

Hive Session ID = 43e6b9e3-4cb7-4590-a2e2-0e23731f28e8

Loading data to table project.ods_log_login_incr partition (pt=2022-01-01)

OK

Time taken: 1.952 seconds

which: no hbase in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/peizk/app/jdk1.8.0_212/bin:/home/peizk/app/hadoop-3.1.3/bin:/home/peizk/app/hadoop-3.1.3/sbin:/home/peizk/app/hive-3.1.2/bin:/home/peizk/app/scala-2.12.15/bin:/home/peizk/app/maven-3.8.5/bin:/home/peizk/app/spark-3.2.1/bin:/home/peizk/app/datax/bin:/root/bin)

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/home/peizk/app/hive-3.1.2/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/home/peizk/app/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Hive Session ID = 810102d2-3d20-4bcd-a0c2-34006c2b7ea2

Logging initialized using configuration in jar:file:/home/peizk/app/hive-3.1.2/lib/hive-common-3.1.2.jar!/hive-log4j2.properties Async: true

Hive Session ID = 533d6173-a204-49c1-a654-a6d3ccd23c66

Loading data to table project.ods_log_exposure_incr partition (pt=2022-01-01)

OK

Time taken: 1.927 seconds

3、查看 hive表中数据

 导入成功!!!

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。