随着容器和编排技术持续发展,各个公司都已陆续在容器云领域有了相关实践,转转也不例外,也在进行一些积极探索与实践。
本文以日志采集为切入点,介绍下转转容器云平台中的业务日志是如何自动化采集的。

背景

在微服务和docker大火的当下,各个公司都在积极主推容器化,最大化的利用资源并减少运维成本,希望各个业务方能尽早接入云平台。

而做为数据团队的我们却遇到了一个问题,我们的数据源绝大多数都来自于业务程序的埋点日志,如果业务方迁移到云平台,埋点日志将无法采集,所以我们急需一套云平台日志采集方案。

先看下容器日志采集与传统日志采集的区别

容器日志采集与传统日志采集区别

容器云平台的最大的特点就是能够最大化的利用资源,并且利用编排工具可以做到弹性伸缩,所以容器云与传统日志采集的最大区别有如下两点:

  1. 采集日志的实例较多,且数据量较大。
  2. 采集日志分布不固定,无法像传统日志采集方案那样配置固定的采集目录。

调研

  1. 最简单粗暴的方法就是让业务方将埋点日志直接打入kafka或者redis等中间件,这种方式对业务侵入性较大,无法做到对业务无感知,要保证日志传输的可靠性也比较困难
  2. 使用容器推荐的方法将日志写到标准输出中,然后使用Docker Engine采集。这种方式也是对业务方有一定的侵入性。
  3. 在每个容器内部署一个日志采集服务,如flume、filebeat,通过采集工具将日志传输到下一环节。这种方案对业务方无感知,但是依然对业务程序有一定的侵入性,因为存在采集程序异常导致容器崩溃的可能性,而且容器的启动和停止是一个常态,如果采集有延迟的话,容器停止之后将会丢失一部分数据,所以这种方案也不合理。
  4. 容器在启动时,指定日志的挂载目录,这样日志就保存到了宿主机里,而且日志也可以做到不随容器的消亡和消失。但是这样的话需要保证挂载目录唯一,而且最致命的是需要根据容器的状态更新宿主机上采集服务的配置文件。

架构方案

调研之后我们倾向使用第4种方法,在此基础上制定的容器云采集方案希望能解决如下问题:

  1. 业务无感知
  2. 能够采集容器内业务日志,并且支持同一容器内多份日志采集
  3. 断点续传
  4. 从日志中区分来源
  5. 动态发现
  6. 支持扩展
  7. 日志灵活归档

整个背景调研结束之后发现一个关键点,那就是如何动态感知容器的启停,从而决定采集哪些日志。

阿里云开源了一款日志采集工具log-pilot,此工具最大的特色是能够动态地发现和采集容器内部的日志文件。

log-pilot架构与原理

log-pilot是用golang开发的,本身架构比较简单,代码量也不大,主要分为三部分,其中一部分就是容器事件管理,它能够动态地监听容器的事件变化,然后依据容器的标签来进行解析,生成日志采集配置文件,然后交由采集插件来进行日志采集。

log-pilot采集流图
简单介绍下log-pilot日志采集的流程,log-pilot容器事件管理模块监听到容器启动之后,从容器的env中解析特定标签,然后根据log-pilot配置的采集插件通过配置文件管理生成对应的配置文件,启动采集工具对日志进行采集。

log-pilot对采集后端支持的也比较丰富,具体如下图,不过我们为了与现有传统日志采集流程进行兼容,我们使用了file模式,即将日志从容器内采集到宿主机,然后再进行日志收集。
log-pilot采集后端

log-pilot也没有使用特别复杂的架构,但是功能还是比较全的,我们使用主要功能是服务发现和动态更新配置的功能,其它功能这里不展开,有兴趣的可以参考github主页

采集实践

通过一些测试发现此工具能够最大化的满足目前的需求,只需要与目前已存在的采集流程和日志采集工单系统进行下兼容就可以快速上线。

log-pilot部署

起初我们使用fluentd采集日志,针对fluentd的配置模版并没有太大的改动,只是调整了一些参数,相关改动如下:

1
2
3
4
5
6
7
8
9
10
11
12
<buffer tag,time,docker_app,docker_service,docker_container>
@type ${FILE_BUFFER_TYPE:=file}
path $FILE_PATH/.buffer
chunk_limit_size 8MB
chunk_limit_records 1000
flush_thread_count 20
flush_at_shutdown true
timekey ${FILE_BUFFER_TIME_KEY:=1d}
timekey_wait ${FILE_BUFFER_TIME_KEY_WAIT:=2m}
timekey_use_utc ${FILE_BUFFER_TIME_KEY_USE_UTC:=false}
$(bufferd_output)
</buffer>

这个配置主要目的是尽可能的减少数据延迟,并实现按照时间归档日志。但是由于fluentd本身的一些机制和与我们现有采集流程的融合性不高,在线上运行一段时间之后最终放弃了该插件,使用flume进行采集(具体内容在下一节会介绍)。

更多fluentd相关的内容可以参考官网

log-pilot的部署我们采用的是每台宿主机上启动一个log-pilot容器,用来监听整个宿主机上所有的容器。

关键功能代码解析

上文中提到架构比较简单,一起看下关键功能的实现。

  1. 容器事件管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 时刻监听
msgs, errs := p.client().Events(ctx, options)
for {
select {
case msg := <-msgs:
if err := p.processEvent(msg); err != nil {
...
}
case err := <-errs:
...
}
}
// 事件处理function
func (p *Pilot) processEvent(msg events.Message) error {
containerId := msg.Actor.ID
ctx := context.Background()
switch msg.Action {
case "start", "restart":
...
containerJSON, err := p.client().ContainerInspect(ctx, containerId)
...
return p.newContainer(&containerJSON)
case "destroy":
...
err := p.delContainer(containerId)
...
}
return nil
}

通过调用docker的api拿到事件信息,然后教由processEvent进行处理,这里主要监听startrestartdestroy事件。
监听到startrestart事件之后,通过inspect拿到容器的相关信息,由newContainer去check是否由日志采集标识,然后生成具体到配置文件,进行采集。

  1. 生成配置文件
    各个采集工具的配置文件相对固定,具有一定的规律性,可以提出一个统一的模版,所以这里使用了template功能来生成配置文件,具体代码不做展开,只展示下flume的配置模版,这个模版解决了k8s环境下容器重启之后日志重复采集的问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{{range .configList}}

a1.sources.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_source.type = TAILDIR
a1.sources.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_source.channels = {{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_channel
a1.sources.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_source.positionFile = /flume/log_meta/source/{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}/{{ .Name }}/taildir_position.json
a1.sources.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_source.filegroups = f1
a1.sources.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_source.filegroups.f1 = {{ .HostDir }}/{{ .File }}

a1.channels.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_channel.type = file
a1.channels.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_channel.checkpointDir = /flume/log_meta/channel/{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}/{{ .Name }}/checkpoint
a1.channels.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_channel.dataDirs = /flume/log_meta/channel/{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}/{{ .Name }}/buffer

a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.type = file_roll
a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.channel = {{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_channel
a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.sink.directory = {{ $.output }}/{{ index $.container "docker_container_subname" }}
a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.sink.rollInterval = 3600000
a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.sink.pathManager.prefix = {{ .Name }}
a1.sinks.{{if index $.container "k8s_pod"}}{{ index $.container "k8s_pod" }}{{else}}{{ $.containerId }}{{end}}_{{ .Name }}_sink.sink.pathManager.extension = log

{{end}}

整个项目用了很多golang的特性,比如channel、协程等等,而且也比较简单,对一些golang小白也比较友好,可以作为glang语言的入门项目好好钻研下。

排雷

  1. 时区问题
    timekey_use_utc参数不生效,总是慢8小时,发现是fluentd的bug,更新fluentd版本到1.2.6解决。
  2. 数据延迟
    随着业务量的增长,采集延迟较大,发现大量的日志在buffer中无法进行归档,增加线程池flush_thread_count为20(按照一份日志一个线程),并增加chunk_limit_sizechunk_limit_records,延迟得到缓解,但是零点依然会有延迟。
  3. 数据重复采集
    docker中需要采集日志的offset记录在以containerId命名的目录中,但在k8s环境下,容器重启之后,由于容器所在的pod没有消失,所以容器日志的挂载目录没有发生变化(也就是说之前产生的日志还在),但是容器的containerId发生了变化,当log-pilot去采集重启之后的容器时,发现并没有记录offset于是从新采集,于是就造成了日志重复采集。

二次开发与优化

在调研期间,发现log-pilot并不能完全满足我们的场景,于是对log-pilot进行了定制化的开发和优化。

  1. 扩展动态配置
    首先为了与现有传统日志采集流程进行兼容,我们使用的采集后端模式是file,则宿主机上的原有采集服务也需要根据容器的启停动态的更新配置,所以我们针对这个需求我们对log-pilot进行定制开发,使原有对采集服务能够动态的采集日志。

  2. 日志漏采
    其次在线上运行期间发现fluentd有延迟,而且如果采用时间归档日志的话,fluentd是按照日志进入buffer的时间进行归档的,这就造成buffer中数据需要立即消费到sink中进行规档,以便后续流程进行采集,如果有延迟将会造成数据漏采。为了尽量减少漏采事件的发生,我们按照天来归档日志。但是随后发现有的业务日志特别庞大,单个能达到80G+,进行日常grep较困难,而且在零点的时候依然会有漏采的可能性。于是决定要对log-pilot的插件进行扩展,采用了比较大众的flume,而且团队对flume也有一定的技术积累。

  3. 重复采集
    决定增加flume之后,我们对flume也进行了一些调整,比如支持严格意义的按照时间进行归档(此功能没有贡献给社区,因为这个功能相对个性化)。使用的flume组件为Taildir Source + File Channel + File Roll Sink,针对数据重复采集的问题,我们将相关的offset放在了以podName命名的目录中,这样可以保证容器重启offset不会丢失。

总结

虽然现在看下整个方案并没有什么难点,但是从调研到上线再到后来的改进,整个过程还是比较曲折的。通过对log-pilot定制化开发我们将云平台日志采集流程集成到了传统日志采集流程中,对业务方做到最大的透明化,整个采集流程依然是业务方发起工单,我们进行审批然后就是自动化的采集流程,不需要由于容器的启停而要人工介入修改配置,最大化的释放了人力成本。

最后还是要感谢阿里云开源了这么棒的工具,让我们得以站在巨人的肩膀上看世界。转转本着拥抱开源,回馈开源的理念,将我们遇到的问题和扩展也反馈到了社区,相关PR见附件。

附件

FLUME-3306
log-pilot#142
log-pilot#155
log-pilot#167

参考

容器日志采集利器Log-Pilot