Druid调研
Druid初期是由metamarkets的技术人员开发,用于向买家、卖家、广告主进行广告展示的底层实时分析平台,为OLAP的事件查询而设计。目前已发展成一个开源的实时数据库。
时序数据库(TSDB)是一种特定类型的数据库,主要用来存储时序数据。
架构
Druid的架构比较灵活,是一个集群模式,由多个不同类型的节点组成,并且各个节点也能组成自己的小集群以增加可用性和扩展性。
Druid中每类节点都只完成一小部分事情,节点分类如下:
- Historical节点
Historical节点是Druid集群的骨干,用于负责存储和查询历史数据。它从Deep Storage下载segments到本地,响应从broker节点上分发到查询请求,并将结果数据返回给broker节点。Historical节点采用shared nothing的架构,所以每个节点能够独自加载segments、删除segments、以及为segments提供查询服务。
Historical节点与Zookeeper进行通信(Historical节点之间并不通信),将该节点上已加载的segments和正在提供服务的segments记录在zk上,zk会通知Historical节点进行segments的加载和删除。
Broker节点
Broker节点是客户端和应用程序从Druid查询数据的入口。Broker节点负责分发查询,将查询请求分发给实时节点和历史节点,以及收集和合并来自实时节点和历史节点的结果。Broker节点通过Zookeeper确定segments是在实时节点还是历史节点存活。Coordinator节点
Coordinator节点通过Metadata Storage和zk管理集群中historical节点上的segments*。Coordinator从Metadata Storage中获取segments的元数据信息,来决定哪些segments需要加载到历史节点集群中,利用zk判断历史节点集群中某些节点是否存在。当Historical需要加载或者删除一些segments时会在zk上创建一些entries。
Coordinator不与Historical通信,直接与zk进行通信
Realtime节点
实时节点负责加载实时的数据到系统中。Indexing Service节点
索引服务节点由多个worker组成的集群,负责为加载批量的和实时的数据创建索引,并且允许对已经存在的数据进行修改。
Real-time处理目前可以通过独立的realtime节点,或者通过indexing service节点实现,这两种方式都很常见。 Real-time处理包括加载数据、创建数据索引(创建segments)、以及交接segments给historical节点。实时处理逻辑加载之后数据立马可查。数据交接的过程也是安全的,数据在整个流程中都保持可查。
Druid集群还会依赖一些外部组件:
Zookeeper
Zookeeper主要作用是帮助群集服务发现和维护当前数据的拓扑结构,Druid依赖Zookeeper来保证集群内的信息一致。Metadata Storage
Druid依赖metadata storage存储segments的元数据和配置。创建segments的服务在元数据中记录信息,coordinator监听着元数据以便了解什么时候需要下载新数据或者删除旧数据。 元数据的存储不涉及查询的路径。MySQL和PostgreSQL非常有利于生产环境下元数据的存储。Deep Storage
Deep storage是segments的永久备份。创建segments的服务(实时或者离线)上传segments到Deep storage,然后historical节点下载。Deep storage不涉及查询路径。 S3和HDFS是比较推荐的deep storages。
Druid集群整体架构图如下:
上面Druid架构图中展示了Druid集群中内部组件和外部依赖组件的整体架构,下面再来一张官方的架构图,和上面的架构图大体一样,我感觉两个图结合起来看就比较完美了。嘿嘿。。
特点
高可用性
由于Druid使用的是shared noting架构,所以其内部并无单点故障,不同类型的节点失败也不会影响到其他类型节点的正常服务。
可扩展性
Druid中的每类节点都可部署为集群模式,使Druid处理能力能够水平扩展。
亚秒级响应
官网说10亿量级下做到亚秒响应,能够做到实时导入,导入即可查询。
Roll-up
在数据接入时对原始数据选定一系列维度之后进行第一级聚合,减少数据量,提高查询速度和减少存储空间。
数据源
Druid数据源支持多种格式,比如json、csv或者是用特殊字符分割的数据。
数据来源主要分为批量数据和实时数据。接入方式主要有3种:
- 从文件中批量加载,数据文件可以存放在本地、HDFS和S3等文件存储系统。
- 实时推送数据,是使用Tranquility通过HTTP推送数据到Druid。Tranquility是一个单独的工具包,并没有和Druid做集成,需要单独下载,Tranquility启动一个server进程,通过HTTP将数据发送给Druid,而不需要启动一个JVM之类的进程。
Tranquility是用scala编写的,主要是通过Druid indexing service来实时抽取数据,弥补Druid indexing service API的不足
- 实时拉取数据,是使用Firehose与要读取的数据进行连接,然后Realtime节点实时从中抽取数据。
Firehose是一个可插拔的组件,是Druid中的消费实时数据模型,可以有不同的实现,Druid自带了一个基于Kafka High Level API实现的对于Kafka的数据消费(druid-kafka-eight Firehose)
数据接入时需要注意一下几点:
- 数据集应该被什么调用?由”dataSchema”的”dataSource”字段设置
- 数据集位于什么位置?文件路径在”inputSpec”的”paths”。如果加载多个文件,将字符串以逗号分隔。
- 什么字段作为时间戳?由”timestampSpec”的”column”字段设置。
- 什么字段作为维度?由”dimensionsSpec”的”dimensions”字段设置。
- 什么字段作为度量?由”metricsSpec”控制。
- 什么时间范围(时间间隔)被加载?由”granularitySpec”的”intervals”字段设置。
数据格式
Druid的数据集有三部分组成。分别为
Timestamp列: 将timestamp区别开是因为Druid是一个时间序列的olap工具,其中的查询都以时间为中心。
Dimension列: Dimensions对应事件的维度,通常用于筛选过滤数据。
Metric列: Metrics是用于聚合和计算的列。在OLAP的术语中也被叫做measures。
数据源配置
接入数据不需要代码的开发,只需要进行一些配置。
下面看个从HDFS接入数据的demo:
1 | { |
简单解释下这些配置项:
json中最外层有3个key,分别为type、spec和hadoopDependencyCoordinates,前两个是必须的。
如果从HDFS中加载数据,type
始终是index_hadoop
,spec
的value是数据的一些信息,也是一个json。
spec value就是DataSchema,由dataSchema、ioConfig和tuningConfig,如果tuningConfig不设置将提供默认值。
dataSchema描述了数据源的具体信息,包括数据的格式、Timestamp、维度和度量。
ioConfig描述了数据从何而来,和最终去向哪里。
查询
Druid的查询语言是json,通过HTTP REST提交给可查询的节点(Broker, Historical, or Realtime)。官方目前暂不支持sql查询,有第三方的插件可以支持sql,但支持度不是太完美。
Druid支持的查询包括聚合查询、搜索查询(类似模糊匹配)和选择查询。
下面简单看几个json查询语句。
- 聚合查询json语句
聚合查询比较复杂,又分为3个小类,分别为timeseries、topN和groupBy。
这里展示一个timeseries的json语句:
1 | { |
timeseries是按照某段时间区间作为数据查询区间的,这里使用的是2012-01-01到2012-01-03的数据,分别对sample_fieldName1和sample_fieldName2进行聚合,最后两个聚合指标相除。结果如下:
1 | [ |
- 搜索查询json语句
1 | { |
searchDimensions定义了要在哪些维度中进行搜索,query定义了搜索方式(本语句中的意思是包含Ke的维度值),结果为:
1 | [ |
- 选择查询json语句
1 | { |
select查询支持分页显示,pagingSpec定义了分页的信息,这里只显示前5个,结果显示如下:
1 | [{ |
数据生命周期
如果数据来源于Kafka,则Realtime节点从kafka消费数据,实时数据在Realtime节点中创建索引,存入节点内存,内存数据量超过阈值时(或者定期)时,将内存数据写到外存形成外存索引。 内存数据加上多个外存索引, Realtime节点以这种方式支持亚秒级数据可见性。
但是Realtime节点的容量和查询能力是有限的, 所以它会定期合并多个外存索引生成segment,(segment对应一段时间范围内的进入druid的数据)。segment生成之后马上被上传到deep storage,很快就会有Historical节点下载该segment,并替代Realtime节点提供查询服务。
Historcial节点从DeepStorage下载segment之后,由Coordinator节点通过Zookeeper来通知Historcial节点加载或者删除segment。