Flume安装配置
1. 概述
1.1 flume定义
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统
。Flume基于流式架构,灵活简单。
1.2 Flume组成架构
架构图:
架构详解:
组件介绍:
:one:Agent
Agent是一个JVM进程,它以事件的形式将数据从源头送至目的,是Flume数据传输的基本单元
。
Agent主要有3个部分组成,Source、Channel、Sink。
:two:Source
Source是负责接收数据到Flume Agent的组件
。Source组件可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec
、jms、spooling directory
、netcat、sequence generator、syslog、http、legacy。
:three:Channel
Channel是位于Source和Sink之间的缓冲区
。因此,Channel允许Source和Sink运作在不同的速率上。Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
Flume自带两种Channel:Memory Channel和File Channel。
Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用
。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
:four:Sink
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent。
Sink是完全事务性的
。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件。
Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。
:five:Event
传输单元,Flume数据传输的基本单元,以事件的形式将数据从源头送至目的地。
1.3 Flume拓扑结构
Flume的拓扑结构如图1-3、1-4、1-5和1-6所示:
图1-3 Flume Agent连接:
图1-4 单source,多channel、sink:
图1-5 Flume负载均衡:
图1-6 Flume Agent聚合
1.4 Flume Agent内部原理
2. 快速入门
2.1 Flume安装地址
Flume官网地址: http://flume.apache.org/
文档查看地址: http://flume.apache.org/FlumeUserGuide.html
下载地址: http://archive.apache.org/dist/flume/
2.2 安装部署
2.2.1 将所需包解压重命名
[root@master ~]# tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
[root@master ~]# mv /opt/module/apache-flume-1.9.0-bin/ /opt/module/apache-flume-1.9.0
2.2.2 添加环境变量
[root@master ~]# vi /etc/profile
添加以下内容:
#FLUME_HOME
export FLUME_HOME=/opt/module/apache-flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
2.2.3 修改配置
修改flume-env.sh 指定JDK
[root@master ~]# cp $FLUME_HOME/conf/flume-env.sh.template $FLUME_HOME/conf/flume-env.sh
[root@master ~]# vi $FLUME_HOME/conf/flume-env.sh
修改如下:
2.2.4 解决guava冲突问题
将hadoop的guava包替换掉flume的guava包
[root@master ~]# rm -rf $FLUME_HOME/lib/guava-11.0.2.jar
[root@master ~]# cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $FLUME_HOME/lib/
查看结果:
[root@master ~]# ll /opt/module/apache-flume-1.9.0/lib/ |grep guava
-rw-r--r-- 1 root root 2747878 Jan 14 07:09 guava-27.0-jre.jar
运行命令(配置文件需自己进行编写):
flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/namenode-hdfs.conf -Dflume.root.logger=INFO,console
命令解析:
-n/–name 指定Agent的名称。(必填)
-c/–conf 指定配置文件放在什么目录
-f/–conf-file指定配置文件,必须在–c参数定义的目录下。(必填)
-Dflume.root.logger=INFO,console 仅为 debug使用,请勿生产环境生搬硬套,否则大量的日志会返回到终端,将日志输入到控制台上
#端口测试 netcat_flume_console.comf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9999
a1.sinks.k1.type =logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#使用flume将数据导入到hdfs中
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir 该目录下的文件会传输到HDFS
a1.sources.s1.spoolDir = /opt/module/flume/tmpdata
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume/logs
a1.channels.c1.type = memory
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
传输Hadoop日志(namenode)到hdfs:star:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#读取位置
#定义sourece类型为exec可执行命令的
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/module/hadoop/logs/hadoop-root-datanode-master.log
#存储位置
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://master:9000/tmp/flume
#使用在内存中缓冲事件的通道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#将source和sink绑定到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
查看HDFS中/tmp/flume目录下生成的内容,将查看命令及结果(
至少5条结果
)
hdfs dfs -ls /tmp/flume
报错解决:
原因:guava冲突
解决:将/opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar 拷贝到 flume/lib 中,并删除或把自带的 guava-11.0.2.jar 改名
cp $HADOOP_HOME/share/hadoop/common/lib/guava-27.0-jre.jar $FLUME_HOME/lib
3. 企业开发案例
3.1 监控端口数据官方案例
3.1.1 案例需求:
首先,Flume监控本机44444端口,然后通过telnet工具向本机44444端口发送消息,最后Flume将监听的数据实时显示在控制台
3.1.2 需求分析
3.1.3 实现步骤:
1.安装telnet工具
[root@master ~]# yum install telnet -y
2.判断44444端口是否被占用
[root@master ~]# ss -tunlp |grep 44444
返回空值则表示没有被占用:
功能描述:ss命令是一个监控TCP/IP网络的非常有用的工具,它可以显示路由表、实际的网络连接以及每一个网络接口设备的状态信息。
3.创建Flume Agent配置文件flume-telnet-logger.conf
(1) 在flume目录下创建job文件夹并进入job文件夹
[root@master ~]# mkdir $FLUME_HOME/job
[root@master ~]# cd $FLUME_HOME/job
(2) 在job文件夹下创建Flume Agent配置文件flume-telnet-logger.conf
[root@master job]# vim flume-telnet-logger.conf
添加以下内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
注:配置文件来源于官方手册http://flume.apache.org/FlumeUserGuide.html
本地官方文档:/opt/module/apache-flume-1.9.0/docs/FlumeUserGuide.html
参数解释:
:star:注意:一个sources
可以对多个channels
,一个sinks
只能对接一个channel
4.开启flume监听端口
[root@master job]# flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/job/flume-telnet-logger.conf -Dflume.root.logger=INFO,console
这是用于启动Flume代理(agent)的命令,以下是每个参数的解释:
flume-ng
:Flume的命令行工具。agent
:指定要运行的Flume代理。-n a1
:指定代理的名称为a1
。这是代理的唯一标识符。-c $FLUME_HOME/conf
:指定Flume配置文件的目录。这里使用了环境变量$FLUME_HOME
,它应该设置为 Flume 的主目录。-f $FLUME_HOME/jobs/namenode-hdfs.conf
:指定Flume代理使用的配置文件的路径。这个配置文件描述了数据流的配置,例如来源、通道和目的地。-Dflume.root.logger=INFO,console
:设置Flume代理的日志级别和日志输出方式。在这个例子中,日志级别设置为INFO
,并将日志输出到控制台(console)。
运行结果如下:
5.使用telnet工具向本机的44444端口发送内容
[root@master ~]# telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
OK
this is bigdata
OK
6.在Flume监听页面观察接收数据情况
结果如下:
2024-01-14 07:49:43,343 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
2024-01-14 07:49:59,039 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 74 68 69 73 20 69 73 20 62 69 67 64 61 74 61 0D this is bigdata. }
3.2 实时读取本地文件到HDFS案例(缺少jar包待测试
)
3.2.1 案例需求:
实时监控Hive日志,并上传到HDFS中
3.2.2 需求分析:
3.2.3 实现步骤:
1.Flume要想将数据输出到HDFS,必须持有Hadoop相关jar包
将commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
拷贝到/opt/module/flume/lib文件夹下
2.创建flume-file-hdfs.conf文件
[root@master job]# vim flume-file-hdfs.conf
添加以下内容:
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://master:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k2.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
参数解释:
3.执行监控配置
[root@master apache-flume-1.9.0]# flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
4.开启Hadoop和Hive并操作Hive产生日志
[root@master hadoop]$ sbin/start-dfs.sh
[root@master hadoop]$ sbin/start-yarn.sh
[root@master hive]$ bin/hive
hive (default)>
5.在HDFS上查看文件。
3.3 实时读取目录文件到HDFS案例
3.3.1 案例需求:
使用Flume监听整个目录的文件
3.3.2 需求分析
3.3.3 实现步骤
1.创建配置文件flume-dir-hdfs.conf
[root@master job]# mkdir /opt/module/apache-flume-1.9.0/upload
[root@master job]# vim flume-dir-hdfs.conf
添加以下内容:
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/apache-flume-1.9.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://master:9000/apache-flume-1.9.0/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
参数详解:
2.启动监控文件夹命令
[root@master job]# flume-ng agent -n a3 -c $FLUME_HOME/conf -f $FLUME_HOME/job/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console
说明
: 在使用Spooling Directory Source时
- 不要在监控目录中创建并持续修改文件
- 上传完成的文件会以.COMPLETED结尾
- 被监控文件夹每500毫秒扫描一次文件变动
3.向upload文件夹中添加文件
[root@master upload]# touch hj.txt
[root@master upload]# touch app.txt
[root@master upload]# touch hyp.txt
查看flume日志输出:
4.查看HDFS上的数据
5.等待1s,再次查询upload文件夹
[root@master upload]# ll
total 0
-rw-r--r-- 1 root root 0 Jan 14 08:28 app.txt.COMPLETED
-rw-r--r-- 1 root root 0 Jan 14 08:28 hj.txt.COMPLETED
-rw-r--r-- 1 root root 0 Jan 14 08:28 hyp.txt.COMPLETED
3.4 单数据源多出口案例(选择器)
单Source多Channel、Sink如下图所示:
3.4.1 案例需求:
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。
3.4.2 需求分析
3.4.3 实现步骤:
1.准备工作
在/opt/module/flume/job目录下创建group1文件夹
[root@master job]# mkdir $FLUME_HOME/job/group1
在/opt/module/datas/目录下创建flume3文件夹
mkdir -p /opt/module/datas/flume3
2.创建flume-file-flume.conf
配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume-flume-hdfs和flume-flume-dir。
[root@master job]# cd group1/
[root@master group1]# vim flume-file-flume.conf
添加以下内容:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/apache-hive-3.1.2/logs/hiveServer2.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = master
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = master
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
注
:Avro是由Hadoop创始人Doug Cutting创建的一种语言无关的数据序列化和RPC框架。
注
:RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。
3.创建flume-flume-hdfs.conf
配置上级Flume输出的Source,输出是到HDFS的Sink。
[root@master group1]# vim flume-flume-hdfs.conf
添加以下内容:
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = master
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://master:9000/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#最小冗余数
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
4.创建flume-flume-dir.conf
[root@master group1]# vim flume-flume-dir.conf
添加以下内容:
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = master
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:输出的本地目录必须是已经存在的目录
,如果该目录不存在,并不会创建新的目录。
5.执行配置文件
别开启对应配置文件:flume-flume-dir
,flume-flume-hdfs
,flume-file-flume
。
1.
flume-ng agent -c $FLUME_HOME/conf -n a3 -f $FLUME_HOME/job/group1/flume-flume-hdfs.conf
2.
flume-ng agent -c $FLUME_HOME/conf -n a2 -f $FLUME_HOME/job/group1/flume-flume-hdfs.conf
3.
flume-ng agent -c $FLUME_HOME/conf -n a1 -f $FLUME_HOME/job/group1/flume-file-flume.conf
6.启动hive
[root@master ~]# hive
hive> show databases;
OK
default
Time taken: 0.469 seconds, Fetched: 1 row(s)
7.检查HDFS上的数据
8.检查/opt/module/datas/flume3目录中数据
[root@master flume3]$ ll
总用量 8
-rw-rw-r--. 1 root root 5942 5月 22 00:09 1526918887550-3
3.4.4 大数据赛项数据采集
3.4.1 电商数据:
3.4.4.1.1 任务一:
在主节点使用Flume采集实时数据生成器10050端口的socket数据,将数据存入到Kafka的Topic中(Topic名称为order,分区数为4),使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;
1.创建bigdata.conf
[root@master job]# vim bigdata.conf
添加以下配置:
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.1.10
a1.sources.r1.port = 10050
# 这条可忽略
a1.sources.r1.max-line-length = 102400
# 描述和配置kafka sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = bigdata
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.kafka.flumeBatchSize = 200
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
:star::star: 配置参考/opt/module/apache-flume-1.9.0/docs/FlumeUserGuide.html
:
定义agent sources channel参考:
定义kafka sinks:
启动zookeeper 和 kafka服务,并创建名为
bigdata
的topic
启动方式1(脚本启动):
1.启动zookeeper服务
[root@master ~]# zk.sh start
2.启动kafka服务
[root@master job]# kafka.sh start
启动方式2(命令行启动):
所有节点执行:
1.启动zookeeper服务
zkServer.sh start
2.启动kafka服务
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
创建topic:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 3 --replication-factor 3 --topic bigdata
查看topic:
kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181
执行配置文件:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/job/bigdata.conf -Dflume.root.logger=INFO,console
执行结果如下:
消费者消费消息:
kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic bigdata --from-beginning
运行结果:
运行模拟数据脚本Python脚本:
目录如下:
编辑Python脚本,修改ip
修改后运行Python脚本:
gxjzy@gxjzy:~/桌面/数据采集$ sudo python3 test2.py
结果如下:
观察kafka消费者:
完成
使用Kafka自带的消费者消费order(Topic)中的数据,将前2条数据的结果截图:
kafka-console-consumer.sh --bootstrap-server bigdata1:9092 --from-beginning --topic order --max-messages 2
3.4.4.1.2 任务二:
采用多路复用模式,Flume接收数据注入kafka 的同时,将数据备份到HDFS目录/user/test/flumebackup下,将查看备份目录下的第一个文件的前2条数据的命令与结果截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。
多路复用架构图:
1.创建
bigdata_Multiplexing.conf
多路复用
拷贝前面编写的配置文件,新加一个hdfs的sinks:
[root@master job]# cp bigdata.conf bigdata_Multiplexing.conf
修改后内容如下:
# 定义这个agent中各组件的名字
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 描述和配置source组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.1.10
a1.sources.r1.port = 10050
a1.sources.r1.max-line-length = 102400
# 描述和配置sink组件:k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = bigdata
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092
a1.sinks.kafka.flumeBatchSize = 200
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# 描述和配置sink组件:k2
## 指定了数据接收组件类型为 HDFS
a1.sinks.k2.type = hdfs
## 指定了写入 HDFS 的路径
a1.sinks.k2.hdfs.path = hdfs://master:9000/user/test/flumebackup
## 指定了每批次写入 HDFS 的记录数为 100
a1.sinks.k2.hdfs.batchSize = 100
##指定了在滚动写入 HDFS 之前等待的最大文件大小为 10000 字节
a1.sinks.k2.hdfs.rollSize = 10000
# 描述和配置channel组件,此处使用是内存缓存的方式 c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
# 描述和配置channel组件,此处使用是内存缓存的方式 c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
主要添加配置如下:
参考:
启动flume服务:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/job/bigdata_Multiplexing.conf -Dflume.root.logger=INFO,console
运行结果:
查看备份目录下的第一个文件的前2条数据的命令与结果截图:
hdfs dfs -cat /usr/test/flumebackup/FlumeData.1708341766181 | head -n 2
消费者消费消息:
kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092 --topic bigdata --from-beginning
运行结果:
数据采集完成,接下来使用fink进行实时数据处理
3.4.2 工业数据:
3.4.2.1 任务一:
1、在主节点使用Flume采集/data_log目录下实时日志文件中的数据,将数据存入到Kafka的Topic中(Topic名称分别为ChangeRecord、ProduceRecord和EnvironmentData,分区数为4),将Flume采集ChangeRecord主题的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下;
创建topic:
partitions: 分区
replication-factor: 副本
ChangeRecord:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic ChangeRecord
ProduceRecord:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic ProduceRecord
EnvironmentData:
kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181 --partitions 4 --replication-factor 1 --topic EnvironmentData
创建后的topic如下:
[root@bigdata1 data_log]# kafka-topics.sh --zookeeper localhost:2181 --list
ChangeRecord
EnvironmentData
ProduceRecord
创建industry.conf:
[root@bigdata1 job]# vim industry.conf
添加以下内容:
# 定义这个agent中各组件的名字
a1.sources = r1 r2 r3
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3
# 定义sources r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile =/opt/module/flume-1.9.0/changerecord/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data_log/.*changerecord.csv
# r2
a1.sources.r2.type = TAILDIR
a1.sources.r2.positionFile = /opt/module/flume-1.9.0/producerecord/taildir_position.json
a1.sources.r2.filegroups = f1
a1.sources.r2.filegroups.f1 = /data_log/.*producerecord.csv
# r3
a1.sources.r3.type = TAILDIR
a1.sources.r3.positionFile =/opt/module/flume-1.9.0/environmentdata/taildir_position.json
a1.sources.r3.filegroups = f1
a1.sources.r3.filegroups.f1 = /data_log/.*environmentdata.csv
# 定义sinks k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = ChangeRecord
a1.sinks.k1.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# k2
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = ProduceRecord
a1.sinks.k2.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.kafka.producer.linger.ms = 1
# k3
a1.sinks.k3.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k3.kafka.topic = EnvironmentData
a1.sinks.k3.kafka.bootstrap.servers = bigdata1:9092,bigdata2:9092,bigdata3:9092
a1.sinks.k3.kafka.flumeBatchSize = 20
a1.sinks.k3.kafka.producer.acks = 1
a1.sinks.k3.kafka.producer.linger.ms = 1
# 定义channel c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
# c2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 10000
# c3
a1.channels.c3.type = memory
a1.channels.c3.capacity = 10000
a1.channels.c3.transactionCapacity = 10000
# 描述和配置source channel sink之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel = c2
a1.sources.r3.channels = c3
a1.sinks.k3.channel = c3
新增参数:
运行flume脚本:
flume-ng agent -n a1 -c $FLUME_HOME/conf -f $FLUME_HOME/job/industry.conf -Dflume.root.logger=INFO,console
运行结果如下:
3.4.2.2 任务二:
2、编写新的Flume配置文件,将数据备份到HDFS目录/user/test/flumebackup下,要求所有主题的数据使用同一个Flume配置文件完成,将Flume的配置截图粘贴至客户端桌面【Release\任务D提交结果.docx】中对应的任务序号下。
flume配置内容
# 定义这个agent中各组件的名字
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume-1.9.0/tail_dir.json
a2.sources.r1.filegroups = f1 f2 f3
#f1
a2.sources.r1.filegroups.f1 = /data_log/.*producerecord.csv
a2.sources.r1.headers.f1.headerKey1 = producerecord
#f2
a2.sources.r1.filegroups.f2 = /data_log/.*changerecord.csv
a2.sources.r1.headers.f2.headerKey1 = changerecord
#f3
a2.sources.r1.filegroups.f3 = /data_log/.*environmentdata.csv
a2.sources.r1.headers.f3.headerKey1 = environmentdata
a2.sources.r1.fileHeader = true
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs:///user/test/flumebackup/%Y%m%d/%H/%{headerKey1}
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
运行
flume-ng agent -n a2 -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/hdfs.conf -Dflume.root.logger=INFO,console
结果如下:
发现数据已经在备份,到hdfs查看如下:
查看数据详情:
hdfs dfs -cat /user/test/flumebackup/20240723/15/changerecord/upload-.1721718299144 | head -n2
完成