flume总结

一、 Flume 简介

案例场景1:

公司使用python爬虫,从网上爬取了3T的数据存储到本地磁盘中,而且每天都会增加。那么这样会产生一个问题,磁盘会面临容量瓶颈,存不下了!!那么怎么办呢?

解决方案一:使用hadoop 的put 命令或者书写mapreduce,来把数据剪切到HDFS中。

这个方案的好处就是利用了hadoop的hdfs可以存储大量数据的好处,弊端就是,假设爬虫实时爬取的数据很大,那么在转移数据到hdfs之前,马上就达到了瓶颈,那么就会出现问题。而且使用这种方案,一次性转移大量的数据,可能会使得hadoop崩溃等等。

Flume\ 就是能够解决这个问题,实时的进行数据采集保存到hdfs***。***

1) Flume 提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,

Flume 只能在 Unix 环境下运行。

2) Flume 基于流式架构,容错性强,也很灵活简单。

3) Flume、Kafka 用来实时进行数据收集,Spark、Storm 用来实时处理数据,使用MR或者Hive处理离线数据,impala 用来实时查询。

Flume***和* *Kafka**都是可以实时进行数据采集,那么他们两个的区别是什么呢?flume适合操作存储在**磁盘上的实时数据**,kafka适合操作保存在**内存中的实时数据***

二、Flume 角色

image-20200510093005794

2.1、Source

用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据流传输到 Channel,

这个有点类似于 Java IO 部分的 Channel。

2.2、Channel

用于桥接 Sources 和 Sinks,类似于一个队列。

2.3、Sink

从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS 或者 HBase)。

2.4、Event

传输单元,Flume 数据传输的基本单元,以事件的形式将数据从源头送至目的地。

三、Flume 传输过程

source 监控某个文件或数据流,数据源产生新的数据,拿到该数据后,将数据封装在一个

Event 中,并 put 到 channel 后 commit 提交,channel 队列先进先出,sink 去 channel 队列中

拉取数据,然后写入到 HDFS 中。

四、Flume 部署及使用

4.1、文件配置

flume-env.sh 涉及修改项:

export JAVA_HOME=/home/admin/modules/jdk1.8.0_121

4.1.1 关于文件夹中的cdh版本的flume

image-20200510093406019

这个版本和apache官方发行的flume有什么区别呢?

image-20200510093419367

Clodera发布的cdh版本,里面包含了关于大数据一整套常用的资源。例如上面的cdh就包含了

image-20200510093433629

他这么做的目的就是为了适配各个***hadoop**常用框架之间的兼容性,能够保证这里面的版本使用起来没有问题***

但是如果单独使用apache发行的flume,那么可能会和其他hadoop框架发生不兼容问题,需要自己去解决。例如flume1.7 跟 hadoop3.0 就会不兼容

4.2、案例

4.2.1、案例一:监控端口数据

目标:Flume 监控一端 Console,另一端 Console 发送消息,使被监控端实时显示。

分步实现:

1) 安装 telnet 工具 (rpm 需要事先下载,在有网的环境下可以通过yum下载)

$ sudo rpm -ivh xinetd-2.3.14-40.el6.x86_64.rpm

$ sudo rpm -ivh telnet-0.17-48.el6.x86_64.rpm

$ sudo rpm -ivh telnet-server-0.17-48.el6.x86_64.rpm

2) 创建 Flume Agent 配置文件 flume-telnet.conf

具体配置意义推荐查看官方文档:第一部门,定义agent相关信息,source和sink以及channels的名称(不能够重复)

第二部分:定义source

第三部门:定义sink的输出位置

第四部门:定义channel

第五部分:source绑定channel和sink绑定channel。

a1.sources.r1.channels 可以看到这里是复数channel,表示source可以绑定多个channel。然后一个channel只能够绑定一个sink,只能够有一个输出

al 表示的是 整个agent的别名 – 必须是唯一的\一因为你可能同时启动了多个flume任务,那么如果agent的id不是唯一,就会出现数据紊乱

r1 和 k1 分别是 source和sink的id,只要在当前agent内保证不相同即可,没有类似***agent**的**id**全局唯一的要求***

\# Name the components on thisagent a1.sources = r1
a1.sinks = k1
a1.channels = c1
\# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
\# Describe the sink
a1.sinks.k1.type = logger
\# Use a channel which buffers events in memory a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
\# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3) 判断 44444 端口是否被占用

​ $ netstat -tunlp | grep 44444

4) 先开启 flume 先听端口(启动后会挂在哪里,需要打开另一个窗口发送数据)

$ bin/flume-ng agent –conf conf/ –name a1 –conf-file job/flume-telnet.conf

-Dflume.root.logger==INFO,console

5) 使用 telnet 工具向本机的 44444 端口发送内容

​ $ telnet localhost 44444

4.2.2、案例二:实时读取本地文件到 HDFS

目标:实时监控 hive 日志,并上传到 HDFS 中

分步实现:

1) 拷贝 Hadoop 相关 jar Flume lib 目录下(要学会根据自己的目录和版本查找 jar 包**)**

$ cp share/hadoop/common/lib/hadoop-auth-2.5.0-cdh5.3.6.jar ./lib/ $ cp share/hadoop/common/lib/commons-configuration-1.6.jar ./lib/

$ cp share/hadoop/mapreduce1/lib/hadoop-hdfs-2.5.0-cdh5.3.6.jar ./lib/ $ cp share/hadoop/common/hadoop-common-2.5.0-cdh5.3.6.jar ./lib/

$ cp ./share/hadoop/hdfs/lib/htrace-core-3.1.0-incubating.jar ./lib/

$ cp ./share/hadoop/hdfs/lib/commons-io-2.4.jar ./lib/

尖叫提示:标红的 jar 为 1.99 版本 flume 必须引用的 jar

2) 创建 flume-hdfs.conf 文件

# Name the components on this agent a2.sources = r2

a2.sinks = k2

a2.channels = c2

# Describe/configure the source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log

a2.sources.r2.shell = /bin/bash –c //执行上述command的命令前缀是/bin/bash –c

# Describe the sink a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://linux01:8020/flume/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照时间滚动文件夹

a2.sinks.k2.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k2.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k2.hdfs.batchSize = 1000

#设置文件类型,可支持压缩

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k2.hdfs.rollInterval = 600

#设置每个文件的滚动大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k2.hdfs.rollCount = 0

#最小冗余数

a2.sinks.k2.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

为什么要指定 a2.sources.r2.shell 因为我们知道我们在执行linux命令的时候,例如cat xxx.txt

实际上完整的命令应该是 /bin/bash –c cat xxx.txt. 因为linux 系统已经帮我们配置好了所以不需要显式输入。但是我们知道可能每台linux 的环境变量配置都不一样,那么所以需要显式指定bash地址

3) 执行监控配置

​ $ bin/flume-ng agent –conf conf/ –name a2 –conf-file job/flume-hdfs.conf

4.2.3、案例三:实时读取目录文件到 HDFS

目标:使用 flume 监听整个目录的文件

分步实现

1) 创建配置文件 flume-dir.conf

a3.sources = r3

a3.sinks = k3

a3.channels = c3

# Describe/configure the source a3.sources.r3.type = spooldir

a3.sources.r3.spoolDir = /home/admin/modules/apache-flume-1.7.0-bin/upload

a3.sources.r3.fileSuffix = .COMPLETED //上传完后 upload目录下的文件后缀加上.COMPLETED

a3.sources.r3.fileHeader = true

#忽略所有以.tmp 结尾的文件,不上传

a3.sources.r3.ignorePattern = ([^ ]*.tmp)

# Describe the sink a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://linux01:8020/flume/upload/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.k3.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是 128M

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k3.hdfs.rollCount = 0

#最小冗余数

a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2) 执行测试:执行如下脚本后,请向 upload 文件夹中添加文件试试

​ $ bin/flume-ng agent –conf conf/ –name a3 –conf-file job/flume-dir.conf

尖叫提示: 在使用 Spooling Directory Source 时

1) 不要在监控目录中创建并持续修改文件

2) 上传完成的文件会以.COMPLETED 结尾

3) 被监控文件夹每 500 毫秒扫描一次文件变动

4.2.4、案例四:Flume 与 Flume 之间数据传递:单 Flume 多 Channel、Sink,

image-20200510093945863

目标:使用 flume-1 监控文件变动,flume-1 将变动内容传递给 flume-2,flume-2 负责存储到HDFS。同时 flume-1 将变动内容传递给 flume-3,flume-3 负责输出到。local filesystem。

分步实现:

1) 创建 flume-1.conf,用于监控 hive.log 文件的变动,同时产生两个 channel 和两个 sink 分别输送给 flume-2 和 flume3:

\# Name the components on this agent a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
\# 将数据流复制给多个 channel
a1.sources.r1.selector.type = replicating
\# Describe/configure the source a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log
a1.sources.r1.shell = /bin/bash -c
\# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux01
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux01
a1.sinks.k2.port = 4142
\# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.chanels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channel.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
\# Bind the source and sink to the channel a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

2) 创建 flume-2.conf,用于接收 flume-1 的 event,同时产生 1 个 channel 和 1 个 sink,将数据输送给 hdfs:

# Name the components on this agent a2.sources = r1

a2.sinks = k1

a2.channels = c1

# Describe/configure the source

a2.sources.r1.type = avro

a2.sources.r1.bind = linux01

a2.sources.r1.port = 4141

# Describe the sink a2.sinks.k1.type = hdfs

a2.sinks.k1.hdfs.path = hdfs://linux01:8020/flume2/%Y%m%d/%H

#上传文件的前缀

a2.sinks.k1.hdfs.filePrefix = flume2-

#是否按照时间滚动文件夹

a2.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a2.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a2.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳

a2.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a2.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a2.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件

a2.sinks.k1.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是 128M

a2.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a2.sinks.k1.hdfs.rollCount = 0

#最小冗余数

a2.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel a2.sources.r1.channels = c1

a2.sinks.k1.channel = c1

3) 创建 flume-3.conf,用于接收 flume-1 的 event,同时产生 1 个 channel 和 1 个 sink,将数据输送给本地目录:

# Name the components on this agent a3.sources = r1

a3.sinks = k1

a3.channels = c1

# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.bind = linux01

a3.sources.r1.port = 4142

# Describe the sink

a3.sinks.k1.type = file_roll

a3.sinks.k1.sink.directory = /home/admin/Desktop/flume3

# Describe the channel

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

尖叫提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

4) 执行测试:分别开启对应 flume-job(依次启动 flume-1,flume-2,flume-3),同时产生文件变动并观察结果:

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job1/flume-1.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job1/flume-2.conf
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job1/flume-3.conf

4.2.5、案例五:Flume 与 Flume 之间数据传递,多 Flume 汇总数据到单 Flume

image-20200510094447729

目标:flume-1 监控文件 hive.log,flume-2 监控某一个端口的数据流,flume-1 与 flume-2 将数据发送给 flume-3,flume3 将最终数据写入到 HDFS。

分步实现:

1) 创建 flume-1.conf,用于监控 hive.log 文件,同时 sink 数据到 flume-3:

# Name the components on this agent a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/admin/modules/apache-hive-1.2.2-bin/hive.log

a1.sources.r1.shell = /bin/bash -c

# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = linux01

a1.sinks.k1.port = 4141

# Describe the channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2) 创建 flume-2.conf,用于监控端口 44444 数据流,同时 sink 数据到 flume-3:

# Name the components on this agent a2.sources = r1

a2.sinks = k1

a2.channels = c1

# Describe/configure the source

a2.sources.r1.type = netcat

a2.sources.r1.bind = linux01

a2.sources.r1.port = 44444

# Describe the sink

a2.sinks.k1.type = avro

a2.sinks.k1.hostname = linux01

a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a2.sources.r1.channels = c1

a2.sinks.k1.channel = c1

3) 创建 flume-3.conf,用于接收 flume-1 与 flume-2 发送过来的数据流,最终合并后 sink 到

HDFS:

# Name the components on this

agent a3.sources = r1

a3.sinks = k1

a3.channels = c1

# Describe/configure the source

a3.sources.r1.type = avro

a3.sources.r1.bind = linux01

a3.sources.r1.port = 4141

# Describe the sink a3.sinks.k1.type = hdfs

a3.sinks.k1.hdfs.path = hdfs://linux01:8020/flume3/%Y%m%d/%H

#上传文件的前缀

a3.sinks.k1.hdfs.filePrefix = flume3-

#是否按照时间滚动文件夹

a3.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个 Event 才 flush 到 HDFS 一次

a3.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.k1.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是 128M

a3.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与 Event 数量无关

a3.sinks.k1.hdfs.rollCount = 0

#最小冗余数

a3.sinks.k1.hdfs.minBlockReplicas = 1

# Describe the channel

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

4) 执行测试:分别开启对应 flume-job(依次启动 flume-1,flume-2,flume-3),同时产生文件变动并观察结果:

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group-job2/flume-1.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group-job2/flume-2.conf
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group-job2/flume-3.conf

尖叫提示:测试时记得启动 hive 产生一些日志,同时使用 telnet 向 44444 端口发送内容,如:

$ bin/hive
$ telnet linux01 44444

l 数据追加到一个文件中***(**注意:数据的存入是无序的,也就是说在某个时间点**flume1**存入**flume3**的数据可能早于**flume2**,反之亦然**—**数据交叉存放**)**,如果设置了滚动时间等等其他参数,那么条件一到就会在**hdfs**生成新的文件来进行存储***

解决***flume**之间数据输入不按照顺序的问题,利用**kafka**的队列即可。**Flume**先把数据输入到**kafka**中再输入到**flume**然后进行保存***

五、Flume 监控之 Ganglia

5.1 Ganglia 的安装与部署

1) 安装 httpd 服务与 php

​ # yum -y install httpd php

2) 安装其他依赖

# yum -y install rrdtool perl-rrdtool rrdtool-devel

# yum -y install apr-devel

3) 安装 ganglia

# rpm -Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

# yum -y install ganglia-gmetad //这个相当于ganglia的服务端

# yum -y install ganglia-web //服务端的信息展示web界面

# yum install -y ganglia-gmond //相当于客户端 – 可以启动多个客户端,数据会发布到服务端,可以通过这个特性实现分布式

4) 修改配置文件文件 ganglia.conf**:**

# vi /etc/httpd/conf.d/ganglia.conf

修改为:

#

# Ganglia monitoring system php web frontend – 配置文件是配置,允许那些主机访问ganglia服务器

#

Alias /ganglia /usr/share/ganglia





Order deny,allow



Deny from all



Allow from all # 表示允许所有主机访问 – 默认是只允许本机



# Allow from 127.0.0.1



# Allow from ::1



# Allow from .example.com

文件 gmetad.conf**: 主节点 - 需要注意的是 gmetad.conf不一定在/etc/ganglia/这个目录下因为上面我们是使用yum自动安装默认保存在这个位置**如果手动安装则以你设置路径为主

# vi /etc/ganglia/gmetad.conf

修改为:第二个参数表示,当前ganglia集群名称,以及主句所在服务器ip

data_source “linux” 192.168.216.20

文件 gmond.conf**:从节点 从节点向主节点发送信息的相关配置**

# vi /etc/ganglia/gmond.conf

修改为:

cluster {

name = “linux”

owner = “unspecified”

latlong = “unspecified”

url = “unspecified”

}

udp_send_channel {

#bind_hostname = yes # Highly recommended, soon to be default.

# This option tells gmond to use a source address

# that resolves to the machine’s hostname. Without

# this, the metrics may appear to come from any

# interface and the DNS names associated with

# those IPs will be used to create the RRDs.

# mcast_join = 239.2.11.71 #这个意思是使用广播的形式发送数据到ganglia集群-我们下面使用单播形式

host = 192.168.216.20

port = 8649

ttl = 1

}

udp_recv_channel {

# mcast_join = 239.2.11.71 port = 8649

bind = 192.168.216.20 retry_bind = true

# Size of the UDP buffer. If you are handling lots of metrics you really

# should bump it up to e.g. 10MB or even higher.

# buffer = 10485760

}

文件 config**:(如果不配置这个selinux linux的安全子系统,那么我们在第六步访问web时会提示权限不足)**

# vi /etc/selinux/config

修改为:

# This file controls the state of SELinux on the system.

# SELINUX= can take one of these three values:

# enforcing - SELinux security policy is enforced.

# permissive - SELinux prints warnings instead of enforcing.

# disabled - No SELinux policy is loaded. SELINUX=disabled

# SELINUXTYPE= can take one of these two values:

# targeted - Targeted processes are protected,

# mls - Multi Level Security protection. SELINUXTYPE=targeted

尖叫提示:selinux 本次生效关闭必须重启,如果此时不想重启,可以临时生效之:

​ $ sudo setenforce 0

不配置此项时***,**访问**gangliaweb**报错***

5) 启动 ganglia

$ sudo service httpd start

$ sudo service gmetad start

$ sudo service gmond start

6) 打开网页浏览 ganglia 页面

http://192.168.216.20/ganglia

尖叫提示:如果完成以上操作依然出现权限不足错误,请修改/var/lib/ganglia 目录的权限:

​ $ sudo chmod -R 777 /var/lib/ganglia

5.2 操作 Flume 测试监控

监控类型 ganglia – 默认值

汇报信息的ip和端口 – 就是刚才在主节点配置的

1) 修改 flume-env.sh 配置:

export JAVA_OPTS=”-Dflume.monitoring.type=ganglia

-Dflume.monitoring.hosts=192.168.216.20:8649

-Xms100m

-Xmx200m”

2) 启动 flume 任务

​ $ bin/flume-ng agent \

–conf conf/ \

–name a1 \

–conf-file job/group-job0/flume-telnet.conf \

-Dflume.root.logger==INFO,console \

-Dflume.monitoring.type=ganglia \

-Dflume.monitoring.hosts=192.168.216.20:8649

后两个参数其实可以去掉,因为刚才我们在flume-env.sh 已经配置**这里的配置原因是想覆盖配置文件里面配置的ip和端口号**以当前为准

3) 发送数据观察 ganglia 监测图

​ $ telnet localhost 44444

样式如图:

图例说明:

字段(图表名称) 字段含义
EventPutAttemptCount source 尝试写入 channel 的事件总数量
EventPutSuccessCount 成功写入 channel 且提交的事件总数量
EventTakeAttemptCount sink 尝试从 channel 拉取事件的总数量。这不
意味着每次事件都被返回,因为 sink 拉取的
时候 channel 可能没有任何数据。
EventTakeSuccessCount sink 成功读取的事件的总数量
StartTime channel 启动的时间(毫秒)
StopTime channel 停止的时间(毫秒)
ChannelSize 目前 channel 中事件的总数量
ChannelFillPercentage channel 占用百分比
ChannelCapacity channel 的容量

六、练习

目标:

1) flume-1 监控 hive.log 日志,flume-1 的数据传送给 flume-2,flume-2 将数据追加到本地文件,同时将数据传输到 flume-3。

2) flume-4 监控本地另一个自己创建的文件 any.txt,并将数据传送给 flume-3。

3) flume-3 将汇总数据写入到 HDFS。

请先画出结构图,再开始编写任务脚本。

如果你感觉文章对你又些许感悟,你可以支持我!!
-------------本文结束感谢您的阅读-------------