FlumeNG学习笔记(六)Selector(复用与复制)测试-创新互联

版权声明:本文为博主原创文章,未经博主允许不得转载。

为扎囊等地区用户提供了全套网页设计制作服务,及扎囊网站建设行业解决方案。主营业务为成都网站制作、成都网站设计、外贸营销网站建设、扎囊网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!

目录(?)[+]

学习心得(三)流配置中介绍多路复用流的时候,有说到Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用。这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向。

而关于selector配置前面也讲过:

.sources..selector.type= replicating

这个源的选择类型为复制。这个参数不指定一个选择的时候,默认情况下它复制

复用则是麻烦一下,流的事情是被筛选的发生到不同的渠道,需要指定源和扇出通道的规则,感觉与case when 类似。

复用的参数为:

.sources..selector.type= multiplexing

一、下面给出复制的测试例子:

这里需要配置1个代理作为源发送与2个代理作为接受复制事件,共3个flume配置

首先是作为源发送的代理配置

[html] view plain

  1. #配置文件:replicate_source_case11.conf

  2. # Name the components on this agent

  3. a1.sources = r1

  4. a1.sinks = k1 k2

  5. a1.channels = c1 c2

  6. # Describe/configure the source

  7. a1.sources.r1.type = syslogtcp

  8. a1.sources.r1.port = 50000

  9. a1.sources.r1.host = 192.168.233.128

  10. a1.sources.r1.selector.type = replicating

  11. a1.sources.r1.channels = c1 c2

  12. # Describe the sink

  13. a1.sinks.k1.type = avro

  14. a1.sinks.k1.channel = c1

  15. a1.sinks.k1.hostname = 192.168.233.129

  16. a1.sinks.k1.port = 50000

  17. a1.sinks.k2.type = avro

  18. a1.sinks.k2.channel = c2

  19. a1.sinks.k2.hostname = 192.168.233.130

  20. a1.sinks.k2.port = 50000

  21. # Use a channel which buffers events inmemory

  22. a1.channels.c1.type = memory

  23. a1.channels.c1.capacity = 1000

  24. a1.channels.c1.transactionCapacity = 100

  25. a1.channels.c2.type = memory

  26. a1.channels.c2.capacity = 1000

  27. a1.channels.c2.transactionCapacity = 100

这里设置了2个channels与2个sinks,那么我们也要设置2个sinks对应的代理配置:

下面是第一个接受复制事件代理配置

[html] view plain

  1. #配置文件:replicate_sink1_case11.conf

  2. # Name the components on this agent

  3. a2.sources = r1

  4. a2.sinks = k1

  5. a2.channels = c1

  6. # Describe/configure the source

  7. a2.sources.r1.type = avro

  8. a2.sources.r1.channels = c1

  9. a2.sources.r1.bind = 192.168.233.129

  10. a2.sources.r1.port = 50000

  11. # Describe the sink

  12. a2.sinks.k1.type = logger

  13. a2.sinks.k1.channel = c1

  14. # Use a channel which buffers events inmemory

  15. a2.channels.c1.type = memory

  16. a2.channels.c1.capacity = 1000

  17. a2.channels.c1.transactionCapacity = 100

下面是第二个接受复制事件代理配置:

[html] view plain

  1. #配置文件:replicate_sink2_case11.conf

  2. # Name the components on this agent

  3. a3.sources = r1

  4. a3.sinks = k1

  5. a3.channels = c1

  6. # Describe/configure the source

  7. a3.sources.r1.type = avro

  8. a3.sources.r1.channels = c1

  9. a3.sources.r1.bind = 192.168.233.130

  10. a3.sources.r1.port = 50000

  11. # Describe the sink

  12. a3.sinks.k1.type = logger

  13. a3.sinks.k1.channel = c1

  14. # Use a channel which buffers events inmemory

  15. a3.channels.c1.type = memory

  16. a3.channels.c1.capacity = 1000

  17. a3.channels.c1.transactionCapacity = 100

#敲命令

首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。

flume-ng agent -cconf -f conf/replicate_sink1_case11.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/replicate_sink2_case11.conf -n a1 -Dflume.root.logger=INFO,console

在启动源发送的代理

flume-ng agent -cconf -f conf/replicate_source_case11.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

echo "hello looklook5"| nc 192.168.233.128 50000

#在启动源发送的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

可以看到他的正常启动以及发送数据成功

#在启动源第一个接事件的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

可以看到他的正常启动,以及接受到源代理发送的数据

#在启动源第二个接事件的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

同样可以可以看到他的正常启动,以及接受到源代理发送的数据

Ok,成功

二、下面给出复用的测试例子:

因为复用的流的事件要声明一个头部,然后我们检查头部对应的值,因为我们这边源类用http source

下面是源代理的配置

[html] view plain

  1. #配置文件:multi_source_case12.conf

  2. a1.sources= r1

  3. a1.sinks= k1 k2

  4. a1.channels= c1 c2

  5. #Describe/configure the source

  6. a1.sources.r1.type= org.apache.flume.source.http.HTTPSource

  7. a1.sources.r1.port= 50000

  8. a1.sources.r1.host= 192.168.233.128

  9. a1.sources.r1.selector.type= multiplexing

  10. a1.sources.r1.channels= c1 c2

  11. a1.sources.r1.selector.header= state

  12. a1.sources.r1.selector.mapping.CZ= c1

  13. a1.sources.r1.selector.mapping.US= c2

  14. a1.sources.r1.selector.default= c1

  15. #Describe the sink

  16. a1.sinks.k1.type= avro

  17. a1.sinks.k1.channel= c1

  18. a1.sinks.k1.hostname= 192.168.233.129

  19. a1.sinks.k1.port= 50000

  20. a1.sinks.k2.type= avro

  21. a1.sinks.k2.channel= c2

  22. a1.sinks.k2.hostname= 192.168.233.130

  23. a1.sinks.k2.port= 50000

  24. # Usea channel which buffers events in memory

  25. a1.channels.c1.type= memory

  26. a1.channels.c1.capacity= 1000

  27. a1.channels.c1.transactionCapacity= 100

  28. a1.channels.c2.type= memory

  29. a1.channels.c2.capacity= 1000

  30. a1.channels.c2.transactionCapacity= 100

这里设置了2个channels与2个sinks 同时判断头部属性,当CZ的时,事件发送到sinks1,US时发送到sink2,其他的都发送到sink2,因此我们还有配置2个sinks对于的代理。这里的2个接受代理我们沿用之前复制的接受代理。

#敲命令

与之前复制的情况一样,首先先启动2个接受复制事件代理,如果先启动源发送的代理,会报他找不到sinks的绑定,因为2个接事件的代理还未起来。

flume-ng agent -cconf -f conf/multi_sink1_case12.conf -n a1 -Dflume.root.logger=INFO,console

flume-ng agent -cconf -f conf/multi_sink2_case12.conf -n a1 -Dflume.root.logger=INFO,console

在启动源发送的代理

flume-ng agent -cconf -f conf/multi_source_case12.conf -n a1 -Dflume.root.logger=INFO,console

启动成功后

打开另一个终端输入,往侦听端口送数据

curl -X POST -d '[{"headers" :{"state" : "CZ"},"body" :"TEST1"}]' http://192.168.233.128:50000

curl -X POST -d '[{"headers" :{"state" : "US"},"body" :"TEST2"}]' http://192.168.233.128:50000

curl -X POST -d '[{"headers" :{"state" : "SH"},"body" :"TEST3"}]' http://192.168.233.128:50000

#在启动源发送的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

可以看到他的正常启动以及发送数据成功

#在启动源第一个接事件的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

这里可以清楚的看到,这个接事件代理只收到了2个事件,因为第二个事件因为我们设置复用,将头部信息对于的事件分流的关系,发送到另一个接事件代理去了。

#在启动源第二个接事件的代理终端查看console输出

Flume NG 学习笔记(六)Selector(复用与复制)测试

Ok,第二个接事件代理因为复用分流,果然只获得了第二个事件信息。

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站标题:FlumeNG学习笔记(六)Selector(复用与复制)测试-创新互联
本文链接:http://scyanting.com/article/piejj.html