【flume保姆级使用教程】

【flume保姆级使用教程】

文章目录

一、flume是什么?1、flume需要的系统环境

二、flume的部署1、获取flume安装包上传到虚拟机上2、将flume解压完之后,就可以使用了(前提是有java1.8或者java1.8以上的版本作支持)3、简单的使用一下flume

三、flume agent 组件(来自官方文档)1、Source => 获取数据源,以下只列出了个别配置选项供参考2、Channel => 传输管道,以下只列出了个别配置选项供参考3、Sink => 将数据发送到目的地,以下只列出了个别配置选项供参考

四、flume的几个实际应用示例1、采集单个文件 hadoop 的 namenode 日志数据,存入到 hdfs 的 /tmp/flume 目录2、采集端口数据,存入到 kafka 的 topic 中3、采集端口数据,存入到 hdfs 和 kafka 中(多路复用)

一、flume是什么?

Flume是一个分布式的、可靠的、可用的服务,用于高效地收集、聚合和移动大量的日志数据。 它具有基于流数据流的简单灵活的架构。 它具有健壮性和容错性,具有可调的可靠性机制和许多故障转移和恢复机制。 它使用一个简单的可扩展数据模型,允许在线分析应用程序。

一个flume agent(事务)分成了三部分,一个flume agent就是一个jvm 进程,是一个可以将数据从a发送到b的组件

source => 获取数据,数据来源是多样性的channel => 管道,用于传输数据,临时存储数据的sink => 下沉,发送数据到目标地,如将监控的某个文件的数据发送到 hdfs 中

1、flume需要的系统环境

Java 1.8及以上版本内存有足够的内存供源、通道或汇聚使用的配置使用磁盘空间有足够的磁盘空间供通道或汇聚使用的配置使用目录权限对代理使用的目录具有读/写权限

二、flume的部署

1、获取flume安装包上传到虚拟机上

flume官网网址(https://flume.apache.org/download.html)

2、将flume解压完之后,就可以使用了(前提是有java1.8或者java1.8以上的版本作支持)

# 将 flume 解压到 module 目录下

tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

将 flume 加入系统的环境变量中 vim /etc/profile 底下路径的flume目录跟上面不一样是因为将其 flume 的文件夹重新命名了

export FLUME_HOME=/opt/module/flume

export PATH=$FLUME_HOME/bin:$PATH

别忘了记得 source /etc/profile 使其环境变量生效 => flume-ng version 查看版本

3、简单的使用一下flume

首先在flume的根目录底下,创建一个task目录mkdir task=> 目的是更好的启动和管理写好的的配置文件这里以netcat起一个44444的端口作为测试,写一个flume的agent事件,以监听44444端口的数据,发送到终端,以日志的形式打印出来使用命令yum install nc -y在虚拟机中安装netcat服务,nc -h可查看 nc 的命令帮助flume的 agent 配置文件 => vim test.conf 在task目录底下创建

# 给这个agent的各个组件起个名字,a1指的是agent事件的代号是a1,也可以换成其他代号

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 配置 a1 的 source 组件 => source 数据来源类型是 netcat 类型,监听 localhost的 端口数据

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444

# 配置 a1 的 sink 组件 => 使用的是日志打印输出

a1.sinks.k1.type = logger

# 配置 a1 的 channel 组件 => 这里使用的管道是 内存,注意的是 transactionCapacity 的值要比 capacity 的值小

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 绑定 source 和 sink 使用的 channel 通道,需要注意的是 sink 的 channel 是只能有一个,所以命名上面 channel 不能加 s

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

先使用以下命令启动flume的agent,再使用nc localhost 44444连接44444端口,往这个端口发送数据,查看flume终端。在flume根目录底下执行此命令

# -n 指的是创建的agent的代码 a1 -c 指的是flume的配置文件 -f 指向的是agent的配置文件"test.conf", 这里采用了终端输出日志信息

flume-ng agent -n a1 -c conf/ -f task/test.conf -Dflume.root.logger=INFO,console

成功的截图: ↓↓↓↓↓

netcat 发送数据 flume 接受数据

三、flume agent 组件(来自官方文档)

1、Source => 获取数据源,以下只列出了个别配置选项供参考

Avro Source => 监听Avro端口并接收来自外部Avro客户端流的事件。如图为官网给出的相关配置选项,黑体为必需项 Example for agent named a1=> 给出的示例配置

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

Exec Source => 启动时运行给定的命令获取数据

Example for agent named a1=> 给出的示例配置

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /var/log/secure

a1.sources.r1.channels = c1

Spooling Directory Source => spool 跟 exec都可以实现监控文件的功能,spool与exec有点区别在于,spool更加可靠,不会因为flume的启动、停止而丢失数据

Example for agent named a1=> 给出的示例配置

a1.channels = ch-1

a1.sources = src-1

a1.sources.src-1.type = spooldir

a1.sources.src-1.channels = ch-1

a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool

a1.sources.src-1.fileHeader = true

2、Channel => 传输管道,以下只列出了个别配置选项供参考

Memory Channel => 内存通道,利用内存临时将数据存储起来,等待发送给接收者

Example for agent named a1=> 给出的示例配置

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

JDBC Channel => 将事件数据存储在数据库中,支持持久存储

Example for agent named a1=> 给出的示例配置

a1.channels = c1

a1.channels.c1.type = jdbc

3、Sink => 将数据发送到目的地,以下只列出了个别配置选项供参考

HDFS Sink => 将事件数据写入 hdfs 中

Example for agent named a1=> 给出的示例配置

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

Hive Sink => 将事件数据写入到 hive 表中

Example for agent named a1=> 给出的示例配置

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = hive

a1.sinks.k1.channel = c1

a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083

a1.sinks.k1.hive.database = logsdb

a1.sinks.k1.hive.table = weblogs

a1.sinks.k1.hive.partition = asia,%{country},%Y-%m-%d-%H-%M

a1.sinks.k1.useLocalTimeStamp = false

a1.sinks.k1.round = true

a1.sinks.k1.roundValue = 10

a1.sinks.k1.roundUnit = minute

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.serdeSeparator = '\t'

a1.sinks.k1.serializer.fieldnames =id,,msg

Logger Sink => 将事件数据以 info 等级的日志显示

Example for agent named a1=> 给出的示例配置

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

Kafka Sink => 将事件数据发送给 kafka 的 topic 中

Example for agent named a1=> 给出的示例配置

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = localhost:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

四、flume的几个实际应用示例

1、采集单个文件 hadoop 的 namenode 日志数据,存入到 hdfs 的 /tmp/flume 目录

默认已经配置好 hadoop,进入logs目录,查看日志的最后几行,截图下来,vim hadoop-root-namenode-master.log,如图为namenode的日志截图,等会跟flume传输好的数据进行对比,看是否正确

编写 flume agent a1 配置 => vim /opt/module/flume/task/task_01.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/module/hadoop/logs/hadoop-root-namenode-master.log

a1.sources.r1.shell = /bin/sh -c

a1.sources.r1.channels = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 0

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.batchSize = 100

a1.sinks.k1.channel = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

在根目录下启动flume agent=> flume-ng agent -n a1 -c conf/ -f task/task_01.conf -Dflume.root.logger=INFO,console 执行此命令遇到如下报错信息,是因为flume的guava包版本太低了,复制一个hadoop的guava包过去flume底下就行,记得把原本flume/lib底下的guava包删掉。重新启动flume agent cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flume/lib/

查看hdfs对应目录底下的信息 hdfs dfs -cat /tmp/flume/FlumeData.1731051715793.tmp | head -n 100 这里查看的是前100条信息 跟hadoop的namenode的日志相同,到此成功启动 agent

2、采集端口数据,存入到 kafka 的 topic 中

默认已经启动kafka,创建topic => test kafka-topics.sh --create --bootstrap-server master:9092 --topic test

编写 flume agent a2 配置 => vim /opt/mdule/flume/task/task_02.conf

a2.sources = r1

a2.sinks = k1

a2.channels = c1

a2.sources.r1.type = netcat

a2.sources.r1.port = 11111

a2.sources.r1.bind = localhost

a2.sources.r1.channels = c1

a2.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a2.sinks.k1.kafka.topic = test

a2.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092

a2.sinks.k1.kafka.flumeBatchSize = 20

a2.sinks.k1.kafka.producer.acks = 1

a2.sinks.k1.kafka.producer.linger.ms = 1

a2.sinks.k1.channel = c1

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

在根目录下启动flume agent => flume-ng agent -n a2 -c conf/ -f task/task_02.conf -Dflume.root.logger=INFO,console 启动成功后给端口发送消息 nc localhost 11111 使用kafka的消费者,消费 topic test的数据 kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --from-beginning kafka成功接受到来自flume传输的数据

3、采集端口数据,存入到 hdfs 和 kafka 中(多路复用)

默认已经启动 hadoop 和 kafka => 存入 kafka 的 topic test 和 hdfs 的 /tmp/flume 目录

创建kafka的topic test => kafka-topics.sh --create --bootstrap-server master:9092 --topic test

编写 flume agent a3 配置 => vim /opt/module/flume/task/task_03.conf

a3.sources = r1

a3.sinks = k1 k2

a3.channels = c1 c2

a3.sources.r1.type = netcat

a3.sources.r1.port = 11111

a3.sources.r1.bind = localhost

a3.sources.r1.selector.type = multiplexing

a3.sources.r1.channels = c1 c2

a3.sinks.k1.type = hdfs

a3.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume

a3.sinks.k1.hdfs.fileType = DataStream

a3.sinks.k1.hdfs.rollInterval = 0

a3.sinks.k1.hdfs.rollSize = 0

a3.sinks.k1.hdfs.rollCount = 0

a3.sinks.k1.hdfs.batchSize = 100

a3.sinks.k1.channel = c1

a3.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

a3.sinks.k2.kafka.topic = test

a3.sinks.k2.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092

a3.sinks.k2.kafka.flumeBatchSize = 20

a3.sinks.k2.kafka.producer.acks = 1

a3.sinks.k2.kafka.producer.linger.ms = 1

a3.sinks.k2.channel = c2

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

a3.channels.c2.type = memory

a3.channels.c2.capacity = 1000

a3.channels.c2.transactionCapacity = 100

在根目录下启动 flume agent a3 => flume-ng agent -n a3 -c conf/ -f task/task_03.conf -Dflume.root.logger=INFO,console 等待flume agent启动成功,在终端给对应的端口发送消息 nc localhost 11111 使用kafka消费者消费topic test的数据=>kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --from-beginning 查看 hdfs 有无数据 => dfs dfs -cat /tmp/flume/FlumeData.1731057184740.tmp

多路复用成功实现,一个文件实现多路复用,也可以使用多个agent实现多路复用的功能

📚 相关推荐

发字取名 发字辈的名字大全
365不让提款

发字取名 发字辈的名字大全

📅 08-02 👁️ 9004
砗磲被列为极度濒危物种
365下载手机版

砗磲被列为极度濒危物种

📅 08-01 👁️ 6674
抛弃高价定位!十款3D显示器对比评测
365不让提款

抛弃高价定位!十款3D显示器对比评测

📅 07-24 👁️ 4522
GG修改器使用总结,附我功夫特牛修改实例
beat365手机下载

GG修改器使用总结,附我功夫特牛修改实例

📅 07-18 👁️ 9129
GTA5什么直升机最好 直升机速度排行一览
365下载手机版

GTA5什么直升机最好 直升机速度排行一览

📅 09-21 👁️ 7521
英国的国花:玫瑰花的传奇与象征
365下载手机版

英国的国花:玫瑰花的传奇与象征

📅 09-01 👁️ 8902
正常的空调压力高低压分别是多少
365不让提款

正常的空调压力高低压分别是多少

📅 07-25 👁️ 6681
青金石是什么?产地,价格,寓意,保养,鉴别,禁忌
面对百亿数据,Hbase为什么查询速度依然非常快?
beat365手机下载

面对百亿数据,Hbase为什么查询速度依然非常快?

📅 09-23 👁️ 7781