fluentd-创新互联

介绍

fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP 请求内容。

创新互联:自2013年起为各行业开拓出企业自己的“网站建设”服务,为超过千家公司企业提供了专业的成都网站设计、网站制作、外贸营销网站建设、网页设计和网站推广服务, 定制开发由设计师亲自精心设计,设计的效果完全按照客户的要求,并适当的提出合理的建议,拥有的视觉效果,策划师分析客户的同行竞争对手,根据客户的实际情况给出合理的网站构架,制作客户同行业具有领先地位的。

数据被收集后按照用户配置的解析规则,形成一个个 event,event 格式如下:

tag = xxx
time = xxx
record = {
    "key1": "value1",
    "key2": "value2"
}

其中:

  • tag:为数据流的标记。  当 fluentd 中有数不清的 event 时,tag 可以用于分组处理。
    • 比如说 tag 为 water 的,需要其中的 record 添加一个 hostname 的 kv 值。
    • 又或者 tag 为 fire 的,不做任何处理,直接输出到文件中的。
  • time: event 产生的时间,该字段通常由日志内的时间字段解析出来。
  • record: 日志的内容,为 JSON 格式。
source

source 定义数据源,是 fluentd 的输入端,流入 fluentd 的配置都是在 source 中的,一个 fluentd 中可以有多个数据源,因此,一个 fluentd 中可以有多个 source 。

一个 source 由一个输入插件和插件的配置组成,也就意味着,一个 source 中只能有一种类型的输入。

输入插件

输入插件有很多,具体的可以去官网查看,很详细。链接如下:

Fluentd

在这里,只是总结一下,我自己使用的插件:

@type 'kafka'

该插件是以“单消费者”模式订阅 kafka 消息。

单消费者模式:每个 kafka 输入插件独立地订阅 kafka 消息。

很简单,但有缺陷,因此目前大多以 “消费组模式”订阅。

单消费者模式缺陷如下(网上抄的,实际情况如何不清楚):

  • 如果存在多个单消费者进程同时订阅相同的 topic,进程之间无法协调和分配不同的分区。
  • 如果多个消费者进程中的某个进程挂掉,其他进程无法从该进程原先订阅位置进行恢复。

配置如下:

# 插件类型 kafka
  @type kafka
  
  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
  brokers:,:,..
  # 逗号分隔的 topic 列表
  topics# 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
  format:default =>json
  # tag 增加前缀
  add_prefix# tag 增加后缀
  add_suffix

其中:

  • topic: 填写的是想要消费 kafka 中 topic 的名字,可以同时消费多个 topic 。
  • tag: tag 的名字默认是 topic 的名字,如果想要修改 tag ,可以使用 add_prefix 和 add_suffix 在 tag 的前后添加字符串。 
    • 例如:当目标 topic 名称为 app_event 时,tag 为 app_event 。使用add_prefix kafka,tag 就是kafka.app_event。

以上是最简单的配置,同时也是我用的配置。想要查看更复杂的配置,请移步:

GitHub - fluent/fluent-plugin-kafka: Kafka input and output plugin for Fluentd

GitHub - zendesk/ruby-kafka: A Ruby client library for Apache Kafka

@type 'kafka_group'

插件以“消费者组”模式订阅 kafka 消息。消费者组模式解决了单消费者模式存在的几个缺点,可以同时启动多个 Fluentd 进程协同工作。

配置如下:

# 插件类型 kafka_group
  @type kafka_group
  
  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口
  brokers:,:,..
  # 设定消费者组名称,必须设置
  consumer_group# 逗号分隔的 topic 列表
  topics# 输入消息的格式,有 text、json、ltsv、msgpack 等几种,默认 json
  format:default =>json
  # 如果为 true,添加 kafka 的消息头到记录中
  add_headers# tag 增加前缀
  add_prefix# tag 增加后缀
  add_suffixusername USERNAME
  password PASSWORD
  sasl_over_ssl false
  ssl_ca_certs_from_system false
  get_kafka_client_log false

match

match 定义数据的输出目标,match 指令通过匹配 tag 字段来将事件输出到其他的系统。

同样 match 指令也必须指定 @type 参数,该参数用来指定使用哪个输出插件。

@type kafka2
# 插件类型 kafka2
  @type kafka2

  # 逗号分隔的 broker 列表,每个 broker 需要指定 ip 和端口 
  brokers    :,:,.. # Set brokers directly
  # 默认 topic,若未设置 topic_key,则 topic 取此处的值
  default_topic         (string) :default =>nil

  # 设置输出消息格式,支持 json、ltsv、msgpack或其他输出插件,默认是 json@type (json|ltsv|msgpack|attr:|) :default =>jsonflush_interval 10susername USERNAME
  password PASSWORD
  sasl_over_ssl false
  ssl_ca_certs_from_system false
  get_kafka_client_log false

default_topic:将要输出到 kafka 中的 topic 名字

buffer: 缓存配置,一般只配 flush_interval ,代表间隔一定时间去输出一次。

@type webhdfs

该插件是用于将日志输出到 hdfs 中的。

HDFS (Hadoop)是存储和处理大量数据的。

既然要输出到 hdfs 中,所以首先要安装一些软件或插件:

  • Fluentd
  • webhdfs 输出插件 (out_webhdfs)
  • Apache HDFS

输出目的地将是WebHDFS。输出配置应该如下所示:

 @type webhdfs
  host namenode.your.cluster.local
  port 50070
  path "/log/%Y%m%d_%H/access.log.#{Socket.gethostname}"
  flush_interval 10s
 
  • 部分指定了用于查找匹配标签的正则表达式。如果日志中的标签匹配,则使用相应的匹配配置(即相应地路由日志)。
  • flush_interval参数指定数据写入HDFS的频率。追加操作用于将传入数据追加到path参数指定的文件中。 时间和主机名的占位符可以与path参数一起使用。这可以防止多个Fluentd实例将数据追加到同一个文件中,而追加操作必须避免这种情况。
  • 其他选项指定HDFS的NameNode 的主机和端口。

除了配置 fluentd 的配置外,还需要对 hdfs 的配置做一些修改,将以下配置添加到 hdfs-site.xml 文件中,然后重新启动整个群集:

dfs.webhdfs.enabledtruedfs.support.appendtruedfs.support.broken.appendtrue

同时要确认 hdfs 用户对指定为网络文件系统输出的路径具有写权限。

@type stdout

这个类型的插件,适用于 debug 时使用的。

在使用其他输出插件时,如果在目的接收端收不到日志,或收到的日志数据不准确,可以先将日志输出到 stdout 进行查看。

它的配置很简单:

@type stdout

其中 pattern 要替换成需要匹配 tag 的正则表达式,如果想要匹配全部 tag,pattern 替换成 *.*

当然,如果debug,还有一种方法,在任意一个 插件类型的下方,添加一个 @log_level debug ,控制台就会输出 debug 级别的日志,否则,默认只输出 info 级别的日志。

@type copy

match 匹配到第一个 match,就直接输出了,不会再继续匹配下一个 match,如果需要将日志同时输出到两个地方,就需要用输出插件中的 copy 搞搞。

具体配置如下:

@type copy@type file
    path /var/log/fluent/myapp1
    .........

fluentd 生命周期

在 fluentd 中有以下几个类型: source、 parser、filter、output 四种。

parse 和 filter 用于解析和过滤,在我的这次项目中没有用到。

在 fluentd 中,数据的流向如下:

source ->parser ->filter ->output

从  source 数据源进来,流过所有的 parser 和 filter ,最后优先匹配到一个 output 输出出去。

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


当前名称:fluentd-创新互联
文章位置:http://scyanting.com/article/ijsje.html