主页 > 互联网  > 

浅析数据采集工具Flume

浅析数据采集工具Flume

title: Flume系列


第一章 Flume基础理论 1.1 数据收集工具产生背景

Hadoop 业务的一般整体开发流程:

任何完整的大数据平台,一般都会包括以下的基本处理过程:

数据采集 数据 ETL 数据存储 数据计算/分析 数据展现

其中,数据采集是所有数据系统必不可少的,随着大数据越来越被重视,数据采集的挑战也变的尤为突出。这其中包括:

数据源多种多样 数据量大,变化快 如何保证数据采集的可靠性的性能 如何避免重复数据 如何保证数据的质量

我们今天就来看看当前可用的一些数据采集的产品,重点关注一些它们是如何做到高可靠, 高性能和高扩展。

总结: 数据的来源大体上包括:

1、业务数据 2、爬取的网络公开数据 3、购买数据 4、自行采集日志数据 1.1 Flume简介 Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

​ Flume 是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送 方,用于收集数据,同时,Flume 提供对数据的简单处理,并写到各种数据接收方的能力。

1、 Apache Flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统,和 Sqoop 同属于数据采集系统组件,但是 Sqoop 用来采集关系型数据库数据,而 Flume 用来采集流动型数据。

2、 Flume 名字来源于原始的近乎实时的日志数据采集工具,现在被广泛用于任何流事件数 据的采集,它支持从很多数据源聚合数据到 HDFS。

3、 一般的采集需求,通过对 flume 的简单配置即可实现。Flume 针对特殊场景也具备良好 的自定义扩展能力,因此,flume 可以适用于大部分的日常数据采集场景 。

4、 Flume 最初由 Cloudera 开发,在 2011 年贡献给了 Apache 基金会,2012 年变成了 Apache 的顶级项目。Flume OG(Original Generation)是 Flume 最初版本,后升级换代成 Flume NG(Next/New Generation)。

5、 Flume 的优势:可横向扩展、延展性、可靠性。

1.2 Flume版本

Flume 在 0.9.x and 1.x 之间有较大的架构调整: 1.x 版本之后的改称 Flume NG 0.9.x 版本称为 Flume OG,最后一个版本是 0.94,之后是由 Apache 进行了重构 N是New 和 O是Old

Flume1.7版本要求:

Flume OG Old/Original Generation Flume NG New/Next Generation

注意,上面是flume1.7的要求,其他版本要求可能会不一样!!

本文使用版本链接:http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

官网链接:http://flume.apache.org/

Flume1.9 版本要求:

System Requirements

Java Runtime Environment - Java 1.8 or later Memory - Sufficient memory for configurations used by sources, channels or sinks Disk Space - Sufficient disk space for configurations used by channels or sinks Directory Permissions - Read/Write permissions for directories used by agent 第二章 Flume体系结构/核心组件 agent:能独立执行一个数据收集任务的JVM进程 source : agent中的一个用来跟数据源对接的服务 channel : agent内部的一个中转组件 sink : agent中的一个用来跟数据目的地对接的服务 event: 消息流转的一个载体/对象 header body 常见source的类型 Avro source :接收网络端口中的数据 exec source: 监听文件新增内容 tail -f spooldir source :监控文件夹的,如果这个文件夹里面的文件发送了变化,就可以采集 Taildir source: 多目录多文件实时监控 常见的channel的类型 memory : 内存中 , 快 , 但不安全 file : 相对来说安全些,但是效率低些 jdbc: 使用数据库进行数据的保存 常见的sink的类型 logger 做测试使用 HDFS 离线数据的sink 一般 Kafka 流式数据的sink 以上仅仅是常见的一些,官网中有完整的。 2.1 介绍

​ Flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事 件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久化日志或 者把事件推向另一个 Source。

​ Flume 以 agent 为最小的独立运行单位

​ 一个 agent 就是一个 JVM

​ 单 agent 由 Source、Sink 和 Channel 三大组件构成。

如下面官网图片

解释:

2.2 Flume三大核心组件

Event Event 是 Flume 数据传输的基本单元。 Flume 以事件的形式将数据从源头传送到最终的目的地。 Event 由可选的 header 和载有数据的一个 byte array 构成。 载有的数据对 flume 是不透明的。 Header 是容纳了 key-value 字符串对的无序集合,key 在集合内是唯一的。 Header 可以在上下文路由中使用扩展。

Client Client 是一个将原始 log 包装成 events 并且发送他们到一个或多个 agent 的实体 目的是从数据源系统中解耦 Flume 在 Flume 的拓扑结构中不是必须的

Agent 一个 Agent 包含 source,channel,sink 和其他组件。 它利用这些组件将 events 从一个节点传输到另一个节点或最终目的地。 Agent 是 flume 流的基础部分。 Flume为这些组件提供了配置,声明周期管理,监控支持。

Agent 之 Source Source 负责接收 event 或通过特殊机制产生 event,并将 events 批量的放到一个或多个 包含 event 驱动和轮询两种类型 不同类型的 Source 与系统集成的 Source:Syslog,Netcat,监测目录池 自动生成事件的 Source:Exec 用于 Agent 和 Agent 之间通信的 IPC source:avro,thrift Source 必须至少和一个 channel 关联

Agent 之 Channel Channel 位于 Source 和 Sink 之间,用于缓存进来的 event 当 sink 成功的将 event 发送到下一个的 channel 或最终目的,event 从 channel 删除 不同的 channel 提供的持久化水平也是不一样的 Memory Channel:volatile(不稳定的) File Channel:基于 WAL(预写式日志 Write-Ahead Logging)实现 JDBC Channel:基于嵌入式 database 实现 Channel 支持事务,提供较弱的顺序保证 可以和任何数量的 source 和 sink 工作

Agent 之 Sink Sink 负责将 event 传输到下一级或最终目的地,成功后将 event 从 channel 移除 不同类型的 sink ,比如 HDFS,HBase

2.3 Flume经典部署方案

1、单Agent采集数据

​ 由一个 agent 负责把从 web server 中收集数据到 HDFS 。

2、多Agent串联

​ 在收集数据的过程中,可以让多个 agent 串联起来,形成一条 event 数据线,进行传输,但 是注意的是:相邻两个 agent 的前一个 agent 的 sink 类型要和后一个 agent 的 source 类型一 致。

3、多Agent合并串联

​ 多个 agent 串联,并联成一个复杂的 数据收集架构。反映了 flume 的部署灵活。并且针对关键节点,还可以进行高可用配置。

4、多路复用

​ 一份数据流,可以被复制成多份数据流,交给多个不同组件进行处理。一般用于一边永久存储一边进行计算。

第三章 Flume安装及案例 3.1 安装部署 3.1.1 Flume1.7安装部署

1、将apache-flume-1.7.0-bin.tar.gz上传到hadoop0的/software目录下,并解压

[root@hadoop0 software]# tar -zxvf apache-flume-1.7.0-bin.tar.gz

2、重命名为flume

[root@hadoop0 software]# mv apache-flume-1.7.0-bin flume

3、修改flume-env.sh文件

[root@hadoop0 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk 3.1.2 Flume1.9安装部署

1、将apache-flume-1.9.0-bin.tar.gz上传到hadoop10的/software目录下,并解压

[root@hadoop10 software]# tar -zxvf apache-flume-1.9.0-bin.tar.gz

2、重命名为flume

[root@hadoop10 software]# mv apache-flume-1.9.0-bin flume

3、修改flume-env.sh文件

[root@hadoop10 conf]# mv flume-env.sh.template flume-env.sh

然后vim flume-env.sh,修改jdk路径

export JAVA_HOME=/software/jdk

4、看看Flume版本

[root@hadoop10 bin]# flume-ng version Flume 1.9.0 Source code repository: git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9 [root@hadoop10 bin]# pwd /software/flume/bin [root@hadoop10 bin]# 3.2 案例 3.2.1 监控端口数据(官方案例) 1、在flume的目录下面创建文件夹 [root@hadoop0 flume]# mkdir job [root@hadoop0 flume]# cd job 2、定义配置文件telnet-logger.conf [root@hadoop0 job]# vim telnet-logger.conf 添加内容如下: # example.conf: A single-node Flume configuration # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 3、先开启flume监听端口 退到flume目录 官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console 实际操作: bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console 4、执行telnet localhost 44444 telnet localhost 44444 会先报找不到telnet [root@hadoop10 flume]# telnet localhost 44444 bash: telnet: command not found... [root@hadoop10 flume]# 然后执行yum -y install telnet 5、发送命令测试即可

针对于上述配置telnet-logger.conf文件的内容的解释:

# example.conf: A single-node Flume configuration # Name the components on this agent #a1: 表示的是agent的名字 a1.sources = r1 #r1 : 表示的是a1的输入源 a1.sinks = k1 #k1 : 表示的a1的输出目的地 a1.channels = c1 #c1 : 表示的a1的缓冲区 # Describe/configure the source #配置source a1.sources.r1.type = netcat #表示a1的输入源r1的类型是netcat类型 a1.sources.r1.bind = localhost #表示a1监听的主机 a1.sources.r1.port = 44444 #表示a1监听的端口号 # Describe the sink #描述sink a1.sinks.k1.type = logger #表示a1的输入目的地k1的类型是logger # Use a channel which buffers events in memory a1.channels.c1.type = memory #表示a1的channel的类型是memory类型 a1.channels.c1.capacity = 1000 #表示a1的channel总容量1000个event a1.channels.c1.transactionCapacity = 100 #表示a1的channel传输的时候收集到了100个event以后再去提交事务 # Bind the source and sink to the channel a1.sources.r1.channels = c1 #表示将r1和c1 连接起来 a1.sinks.k1.channel = c1 #表示将k1和c1 连接起来 3、先开启flume监听端口 退到flume目录 官方样例:bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console 实际操作:bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file job/telnet-logger2.conf -Dflume.root.logger=INFO,console 参数说明: --conf conf : 表示配置文件在conf目录 --name a1 : 表示给agent起名为a1 --conf-file job/telnet-logger.conf : flume本次启动所要读取的配置文件在job文件夹下面的telnet-logger.conf文件 -Dflume.root.logger=INFO,console : -D 表示flume运行时候的动态修改flume.root.logger参数值,并将日志打印到控制台,级别是INFO级别。 日志级别: log、info、warn、error 3.2.2 监控目录中的文件到HDFS 1、创建配置文件dir-hdfs.conf 在job目录下面 vim dir-hdfs.conf 添加下面的内容: a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = spooldir a3.sources.r3.spoolDir = /software/flume/upload a3.sources.r3.fileSuffix = .COMPLETED a3.sources.r3.fileHeader = true a3.sources.r3.ignorePattern = ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix = upload- a3.sinks.k3.hdfs.round = true a3.sinks.k3.hdfs.roundValue = 1 a3.sinks.k3.hdfs.roundUnit = hour a3.sinks.k3.hdfs.useLocalTimeStamp = true a3.sinks.k3.hdfs.batchSize = 100 a3.sinks.k3.hdfs.fileType = DataStream a3.sinks.k3.hdfs.rollInterval = 600 a3.sinks.k3.hdfs.rollSize = 134217700 a3.sinks.k3.hdfs.rollCount = 0 a3.sinks.k3.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 2、启动监控目录命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

针对于上述配置dir-hdfs.conf文件的内容的解释:

1、创建配置文件dir-hdfs.conf 在job目录下面 vim dir-hdfs.conf 添加下面的内容: a3.sources = r3 #定义source为r3 a3.sinks = k3 #定义sink为k3 a3.channels = c3 #定义channel为c3 # Describe/configure the source #配置source相关的信息 a3.sources.r3.type = spooldir #定义source的类型是spooldir类型 a3.sources.r3.spoolDir = /software/flume/upload #定义监控的具体的目录 a3.sources.r3.fileSuffix = .COMPLETED #文件上传完了之后的后缀 a3.sources.r3.fileHeader = true #是否有文件头 a3.sources.r3.ignorePattern = ([^ ]*\.tmp) #忽略以tmp结尾的文件,不进行上传 # Describe the sink #配置sink相关的信息 a3.sinks.k3.type = hdfs #定义sink的类型是hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/upload/%Y%m%d/%H #文件上传到hdfs的具体的目录 a3.sinks.k3.hdfs.filePrefix = upload- #文件上传到hdfs之后的前缀 a3.sinks.k3.hdfs.round = true #是否按照时间滚动生成文件 a3.sinks.k3.hdfs.roundValue = 1 #多长时间单位创建一个新的文件 a3.sinks.k3.hdfs.roundUnit = hour #时间单位 a3.sinks.k3.hdfs.useLocalTimeStamp = true #是否使用本地时间 a3.sinks.k3.hdfs.batchSize = 100 #积累多少个event才刷写到hdfs一次 a3.sinks.k3.hdfs.fileType = DataStream #文件类型 a3.sinks.k3.hdfs.rollInterval = 600 #多久生成新文件 a3.sinks.k3.hdfs.rollSize = 134217700 #多大生成新文件 a3.sinks.k3.hdfs.rollCount = 0 #多少event生成新文件 a3.sinks.k3.hdfs.minBlockReplicas = 1 #副本数 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 2、启动监控目录命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/dir-hdfs.conf

在执行上面的命令过程中遇到的了一点点小问题:

...... SLF4J: See http:// .slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: com.google mon.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357) at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338) at org.apache.hadoop.conf.Configuration.setBoolean(Configuration.java:1679) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:221) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:572) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:412) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.lang.Thread.run(Thread.java:748)

解决方案:将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop的版本。可以通过重命名的方式注释掉即可(实现删除的效果)。

[root@hadoop10 lib]# mv guava-11.0.2.jar guava-11.0.2.jar.backup 3.2.3 监控文件到HDFS 1、创建一个自动化文件 [root@hadoop0 job]# vim mydateauto.sh 写入: #!/bin/bash while true do echo `date` sleep 1 done 然后运行测试: [root@hadoop0 job]# sh mydateauto.sh Wed Aug 19 18:34:19 CST 2020 Wed Aug 19 18:34:20 CST 2020 然后修改配置,将输出的日志追加到某个文件中 #!/bin/bash while true do echo `date` >> /software/flume/mydate.txt sleep 1 done 再次执行[root@hadoop0 job]# sh mydateauto.sh 就会在flume的文件夹下面生成了mydate.txt文件 通过tail -f mydate.txt 查看 再次执行sh mydateauto.sh 查看输出。 2、创建配置vim file-hdfs.conf # Name the components on this agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # Describe/configure the source a2.sources.r2.type = exec a2.sources.r2 mand = tail -F /software/flume/mydate.txt a2.sources.r2.shell = /bin/bash -c # Describe the sink a2.sinks.k2.type = hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H a2.sinks.k2.hdfs.filePrefix = logs- a2.sinks.k2.hdfs.round = true a2.sinks.k2.hdfs.roundValue = 1 a2.sinks.k2.hdfs.roundUnit = hour a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.batchSize = 1000 a2.sinks.k2.hdfs.fileType = DataStream a2.sinks.k2.hdfs.rollInterval = 600 a2.sinks.k2.hdfs.rollSize = 134217700 a2.sinks.k2.hdfs.rollCount = 0 a2.sinks.k2.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 3、启动 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

针对于上述配置file-hdfs.conf文件的内容的解释:

# Name the components on this agent a2.sources = r2 #定义source为r2 a2.sinks = k2 #定义sink为k2 a2.channels = c2 #定义channel为c2 # Describe/configure the source a2.sources.r2.type = exec #定义source的类型是exec 可执行命令 a2.sources.r2 mand = tail -F /software/flume/mydate.txt #具体文件位置 a2.sources.r2.shell = /bin/bash -c #命令开头 # Describe the sink #sink相关配置 a2.sinks.k2.type = hdfs #定义sink的类型是hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop10:8020/flume/%Y%m%d/%H #具体的位置 a2.sinks.k2.hdfs.filePrefix = logs- a2.sinks.k2.hdfs.round = true a2.sinks.k2.hdfs.roundValue = 1 a2.sinks.k2.hdfs.roundUnit = hour a2.sinks.k2.hdfs.useLocalTimeStamp = true a2.sinks.k2.hdfs.batchSize = 100 a2.sinks.k2.hdfs.fileType = DataStream a2.sinks.k2.hdfs.rollInterval = 600 #单位是秒!! a2.sinks.k2.hdfs.rollSize = 134217700 a2.sinks.k2.hdfs.rollCount = 0 a2.sinks.k2.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2 3、启动 bin/flume-ng agent --conf conf/ --name a2 --conf-file job/file-hdfs.conf

过程中遇到的一点点小问题:

18 Oct 2021 14:32:24,340 INFO [conf-file-poller-0] (org.apache.flume.sink.DefaultSinkFactory.create:42) - Creating instance of sink: k2, type: hdfs 18 Oct 2021 14:32:24,348 ERROR [conf-file-poller-0] (org.apache.flume.node.AbstractConfigurationProvider.loadSinks:469) - Sink k2 has been removed due to an error during configuration java.lang.InstantiationException: Incompatible sink and channel settings defined. sink's batch size is greater than the channels transaction capacity. Sink: k2, batch size = 1000, channel c2, transaction capacity = 100 at org.apache.flume.node.AbstractConfigurationProvider.checkSinkChannelCompatibility(AbstractConfigurationProvider.java:403) at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:462) at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:106) at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

解决方案:

问题原因:原因其实很明了了,就是字面的意思,channel 与 sink的设置不匹配,sink的batch size大于channel的transaction capacity 解决方案:将a2.sinks.k2.hdfs.batchSize设置为小于等于100 。 或者注释掉也可以。 3.2.4 多目录多文件实时监控(Taildir Source)

与前面使用到的Source的对比

Spooldir Source 用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。 Exec source 用于监控一个实时追加的文件,不能实现断点续传; Taildir Source 用于监听多个实时追加的文件,并且能够实现断点续传。

操作案例:

1、在job下面创建 vim taildir-hdfs.conf a3.sources = r3 a3.sinks = k3 a3.channels = c3 # Describe/configure the source a3.sources.r3.type = TAILDIR a3.sources.r3.positionFile = /software/flume/taildir.json a3.sources.r3.filegroups = f1 f2 a3.sources.r3.filegroups.f1 = /software/flume/taildirtest/filedir/.*file.* a3.sources.r3.filegroups.f2 = /software/flume/taildirtest/logdir/.*log.* # Describe the sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop10:8020/flume/uploadtaildir/%Y%m%d/%H a3.sinks.k3.hdfs.filePrefix = upload- a3.sinks.k3.hdfs.round = true a3.sinks.k3.hdfs.roundValue = 1 a3.sinks.k3.hdfs.roundUnit = hour a3.sinks.k3.hdfs.useLocalTimeStamp = true a3.sinks.k3.hdfs.batchSize = 100 a3.sinks.k3.hdfs.fileType = DataStream a3.sinks.k3.hdfs.rollInterval = 600 a3.sinks.k3.hdfs.rollSize = 134217700 a3.sinks.k3.hdfs.rollCount = 0 a3.sinks.k3.hdfs.minBlockReplicas = 1 # Use a channel which buffers events in memory a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3 2、创建文件文件夹,注意需要在启动之前创建监控的文件夹 [root@hadoop10 flume]# mkdir taildirtest [root@hadoop10 flume]# cd taildirtest/ [root@hadoop10 taildirtest]# ll total 0 [root@hadoop10 taildirtest]# mkdir filedir [root@hadoop10 taildirtest]# mkdir logdir [root@hadoop10 taildirtest]# ll total 0 drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir [root@hadoop10 taildirtest]# vim file.txt [root@hadoop10 taildirtest]# vim log.txt [root@hadoop10 taildirtest]# ll total 8 drwxr-xr-x. 2 root root 6 Oct 18 16:44 filedir -rw-r--r--. 1 root root 35 Oct 18 16:45 file.txt drwxr-xr-x. 2 root root 6 Oct 18 16:45 logdir -rw-r--r--. 1 root root 35 Oct 18 16:46 log.txt 3、启动监控目录命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-hdfs.conf 4、测试 [root@hadoop10 taildirtest]# cp file.txt filedir/ [root@hadoop10 taildirtest]# cp log.txt logdir/ [root@hadoop10 taildirtest]# cd filedir/ [root@hadoop10 filedir]# echo hello1 >> file.txt [root@hadoop10 filedir]# cd ../logdir/ [root@hadoop10 logdir]# echo hello2 >> log.txt [root@hadoop10 logdir]#

声明:         文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。

By luoyepiaoxue2014

微博地址: http://weibo /luoyepiaoxue2014 点击打开链接

标签:

浅析数据采集工具Flume由讯客互联互联网栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“浅析数据采集工具Flume