MapReduce源码解析--Reduce中values的Iterator的生成
众所周知Reduce中的values是一个Iterator,但是value的数据并不是一下全部加载到这个Iterator(也就是对value根据key进行分组,这个分组比较器可以重写,二次排序中可能会需要重写,默认是按照key进行分组),那么values的Iterator是怎么形成的,是怎么被加载的呢?
下面从源码的角度去探秘一下吧,因为源码才是最好的教科书。
写Reduce函数时都会继承Reduce类然后重写reduce函数,在Reduce.run中循环调用reduce,reduce函数传进去的参数values已经是一个Iterator,则此Iterator的形成只能在调用reduce之前,开始循环调用reduce之后。看下run的代码可能更清晰一些:
1234567891011121314public void run(Context context) throws IOException, InterruptedException { setup(context); try { // while 循环调用reduce while (conte ...
MapReduce源码解析--Reduce解析
MapReduce–Reduce上篇主要扒了下Map阶段的代码,并根据代码把Map阶段的流程串了下,本篇主要扒下Reduce阶段的代码,顺便串下Reduce阶段的流程。Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。Reduce Task和Map Task一样,同样也有四中任务类型,分别为job setup、job cleanup、reduce task、task cleanup。Reduce阶段的代码入口是ReduceTask.java类中的run方法,代码如下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, Interr ...
Java同步锁解析
synchronized是用来控制线程同步的,是一个同步锁,防止在多线程的环境下,某些资源被多线程同时操作。将不能同时操作的资源用synchronized锁住,当某个线程要执行被synchronized关键字锁住的代码片段的时候,将首先检查锁是否可用,然后获取锁,执行代码,最后释放锁。synchronized可用于一段代码上(同步语句块),也可用于方法上(同步方法)。
synchronized锁住的是对象而不是某段代码,所以要将不能被同时访问的资源封装到一个对象里,然后将所有要访问这个资源的方法标记为synchronized。如果某个线程处于一个对a对象中标记为synchronized的方法的调用中,那么在这个线程从该方法返回之前,其他所有要调用该对象中任何标记为synchronized方法的线程都会被阻塞。所有对象都自动含有单一的锁,当在对象上调用起任意synchronized方法的时候,此对象就被加锁,这时该对象上其他synchronized方法只有等到前一个带有synchronized的方法调用完毕并释放锁之后才能被调用。例如:
12345public class A ...
Java优先级队列解析
在看代码过程中,优先级队列(PriorityQueue)出现的次数很多,PriorityQueue是基于优先堆(完全二叉堆)的一个无界队列。
优先级队列是不同于先进先出队列的另一种队列。每次从队列中取出的是具有最高优先权的元素。每次都取最高优先权的原理是通过在内部维护一个堆来实现的。
下面先搞个小demo来感受下PriorityQueue,来个常见的面试题,求topN问题。
求topN一组数16,7,3,20,17,8,2,1,30,求top6。使用PriorityQueue代码如下:
123456789101112131415161718192021222324public class TestPriorityQueue { public static void main(String[] args){ int[] arr = {16,7,3,20,17,8,2,1,30}; Object[] result = topN(arr, 6); for (int i=result.length-1; ...
数据结构算法之动态规划
动态规划的基本思想与分治法类似,也是将待求解的问题分解为若干个子问题(阶段)。按顺序求解子阶段,前一子问题的解,为后一子问题的求解提供了有用的信息。在求解任一子问题时,列出各种可能的局部解,通过决策保留那些有可能达到最优的局部解,丢弃其他局部最优解。依次解决各子问题,最后一个子问题就是初始问题的解。
动态规划是通过拆分问题,定义问题状态和状态之间的关系,使得问题能够以递推的方式去解决。则动态规划的本质是状态的定义和状态转移方程
动态规划通过拆分问题,将原始问题拆分为子问题,而各子问题之间的关系是后一子问题的解往往由前一子问题的解求出,则**子问题之间是重叠的(则不是相互独立的)**,为减少重复计算,对每一个子问题只解一次,将其不同阶段的不同状态保存在一个二维数组中。
动态规划与分治法最大的差别是:适合于用动态规划法求解的问题,经分解后得到的子问题往往不是互相独立的(即下一个子阶段的求解是建立在上一个子阶段的解的基础上,进行进一步的求解)。
动态规划与贪心法的区别
动态规划其实是贪心法的一般情况,而贪心法是动态规划的特殊情况,为什么这么说呢,下面就来解释下,
用动态规划来求解 ...
Spark Streaming 消费kafka到HDFS
先说下程序功能,使用Spark Streaming 实时消费kafka,并将message写入HDFS上指定的topic目录中。消费kafka使用的是Spark提供的Direct Approach方法,然后利用HDFS API将不同topic下的message写到各自topic目录下。
Spark Streaming简介Spark Streaming是Spark的扩展,能够扩展的。高吞吐、容错的处理实时数据流。Spark Streaming的工作流程是将接受到实时的数据划分到不同的batch中,然后由Spark Engine处理并生成结果batch。如下图:
Spark Streaming提供了一个高级的抽象模型,叫做discretized stream或者叫做DStream,它代表了一个持续的数据流。
Saprk Streaming中的Context是StreamingContext,StreamingContext是所有功能的主入口,可以通过两种方法create,
123456// firstval conf = new SparkConf().setAppName(appNa ...
Flume简介及初次使用
最近要搭建一个日志分析平台,整体架构思路是通过flume采集日志到kafka中,从kafka之后分两条路走,一条是实时,一条是离线。实时会用spark或者storm(还没开始做),离线用hadoop+hive,将kafka中的数据消费到hdfs上,通过mr处理进hive,进行统计分析。
采用flume的理由:flume是java开发的,利于维护和二次开发flume可扩展,也有较好的容错性,性能也不错flume支持对现有应用无缝接入,对较老的应用程序侵入性较低
本篇主要简单介绍下flume,因为flume是初次使用所以简单记录下。
Flume版本演变Flume 是 cloudera 开发的实时日志收集系统,其可以实时的将分布在不同节点、机器上的日志收集到一个存储设备中。目前Flume存在两个大版本,其一为初始发行版本目前被统称为 Flume OG(original generation),属于 cloudera。其二为重构后的版本统称为Flume NG(next generation)。Flume NG对OG的核心组件、核心配置以及代码架构进行了重构。
Flume OG简介OG的 ...
再议HDFS写流程之pipeline
之前有一篇是介绍HDFS写流程的,只是大致的从代码的角度解析了下,有些细节并没有深挖,这篇文章我们主要来挖下写流程中的一些细节,但是由于我水平有限,只能尽可能的挖,本篇也会不定时的进行打补丁。
这里重点总结下开始往pipeline中写数据到一个block结束关闭pipeline的流程。想先了解下HDFS write流程的请看[这篇](http://bigdatadecode.top/HDFS write解析.html)
这里先列下想要总结的知识点:
lease检查发生在哪?createFile时创建lease,两个线程能否同时创建同一个文件?lease检查只在addBlock中发生?是否会判断lease的当前持有者是否继续持有
client或者dn何时向nn汇报Replica或者Block的状态
pipeline是怎么建立的,targets dn是怎么传递的
pipeline在setup阶段发生故障怎么办,client如何检测,如何修复
pipeline在stream阶段发生故障怎么办,client如何检测,如何修复
pipeline在close阶段发生故障怎么办,client如何检 ...
HDFS write解析
本篇主要记录下HDFS写文件的流程。其写入流程与普通文件写入流程类似,首先创建一个输出流OutputStream,然后通过这个输出流写入数据。在HDFS中数据传输的基本单元为Packet(默认64k),每个packet又由很多个chunk组成,chunk是文件校验的基本单位,一个chunk一个chunksum,chunk是校验单位也就是写入单位,将chunk写入packet,一个packet写满之后,将packet发送到pipeline中。
下面从代码层次去详细解读下write流程。其写入流程图如下:![write流程图](/blogimgs/HDFS write解析/hdfs-write-flow.png “write流程图”)
创建一个输出流HDFS写文件跟java写文件类似,都需要先打开一个文件流,HDFS是通过FileSystem对象打开文件流的,代码流程为通过FileSystem.get(conf)得到一个FileSystem对象,然后调用create(Path)或者append(Path)打开一个FSDataOutputStream流,看下create代码:
123pu ...
Append/Hflush/Read设计文档
本篇是一篇译文,主要是翻译下Append/Hflush/Read Design。hdfs的write一直看不太懂,想翻译下此设计文档,希望能有更深入的理解。
Replica/Block 状态Block在NameNode和DataNode中有不同的称呼,在NameNode中为block,在DataNode中为Replica。通常说的block的副本数是3,指的就是dn中Replica为3。
Replica状态Finalized: finalized Replica已经完成了写入操作。不会再有新的数据写入,除非此Replica被再次打开或者被追加。 finalized Replica的数据和meta数据是匹配的。 当前block id的所有Replica有相同的数据大小。 但是finalized Replica的GS(generation stamp)不是一直不变的,GS可能因为一次错误的recovery而导致其发生变化。RBW(Replica Being Written): 创建Replica或者追加Replica时,Replica的状态为RBW。 数据写入R ...