Flume NG高可用集群搭建:
架构总图:
架构分配:
角色 | Host | 端口 |
agent1 | hadoop3 | 52020 |
collector1 | hadoop1 | 52020 |
collector2 | hadoop2 | 52020 |
agent1配置(flume-client.conf):
#agent1 nameagent1.channels = c1agent1.sources = r1agent1.sinks = k1 k2#set gruopagent1.sinkgroups = g1#set channelagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 1000agent1.channels.c1.transactionCapacity = 100agent1.sources.r1.channels = c1agent1.sources.r1.type = execagent1.sources.r1.command = tail -F /home/sky/flume/log_exec_tailagent1.sources.r1.interceptors = i1 i2agent1.sources.r1.interceptors.i1.type = staticagent1.sources.r1.interceptors.i1.key = Typeagent1.sources.r1.interceptors.i1.value = LOGINagent1.sources.r1.interceptors.i2.type = timestamp# set sink1agent1.sinks.k1.channel = c1agent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hadoop1agent1.sinks.k1.port = 52020# set sink2agent1.sinks.k2.channel = c1agent1.sinks.k2.type = avroagent1.sinks.k2.hostname = hadoop2agent1.sinks.k2.port = 52020#set sink groupagent1.sinkgroups.g1.sinks = k1 k2#set failoveragent1.sinkgroups.g1.processor.type = failoveragent1.sinkgroups.g1.processor.priority.k1 = 10agent1.sinkgroups.g1.processor.priority.k2 = 1agent1.sinkgroups.g1.processor.maxpenalty = 10000
collector1配置(flume-server.conf):
#set Agent namea1.sources = r1a1.channels = c1a1.sinks = k1#set channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# other node,nna to nnsa1.sources.r1.type = avroa1.sources.r1.bind = hadoop1a1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = Collectora1.sources.r1.interceptors.i1.value = hadoop1a1.sources.r1.channels = c1#set sink to hdfsa1.sinks.k1.type=loggera1.sinks.k1.channel=c1
collector2配置(flume-server.conf):
#set Agent namea1.sources = r1a1.channels = c1a1.sinks = k1#set channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# other node,nna to nnsa1.sources.r1.type = avroa1.sources.r1.bind = hadoop2a1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = Collectora1.sources.r1.interceptors.i1.value = hadoop2a1.sources.r1.channels = c1#set sink to hdfsa1.sinks.k1.type=loggera1.sinks.k1.channel=c1
先启动server,在启动client:
flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-server.conf --name a1 -Dflume.root.logger=INFO,console
flume-ng agent --conf conf --conf-file /usr/local/flume/conf/flume-client.conf --name agent1 -Dflume.root.logger=INFO,console
测试验证:
hadoop1收到了hadoop3的消息了,哈哈。hadoop2没有收到消息,那是因为hadoop1的priority高。
hadoop3:
hadoop1:
hadoop2:
再次测试Failover:
停掉hadoop1的Flume,再次在hadoop3发送数据。可见hadoop3的Flume报错重连了,并且hadoop2收到了数据。如果再次启动hadoop1的Flume,一切又会恢复到hadoop1接收。
hadoop3:
hadoop2:
这样测试就完毕了。Flume高可用集群就搭建好了!