HBase Replication源码解析之HLog读取
2019独角兽企业重金招聘Python工程师标准>>>
在HRegionServer中两个量和replication相关,如下所示:
[java] view plain copy
在ReplicationSourceService中只有一个方法getWALActionsListener,该方法返回WALActionsListener。ReplicationSinkService同样也是一个接口类,它有一个方法replicateLogEntries。在HRegionServer的如下代码段中会启动replicationservice。
[java] view plain copy
startReplicationService中做了三件事,分别是调用ReplicationSourceManager的init方法,初始化replicationSink,初始化调度线程池scheduleThreadPool;
startReplicationService方法中调用了ReplicationSourceManager的init方法,init中遍历replicationPeers中的peerid,并以该id为参数,调用addSource方法,在addSource中针对每一个peerid构造了一个对象ReplicationSource,ReplicationSource是个守护进程,这里初始化的时候并不是通过构造函数,而是通过getReplicationSource函数,在这个方法里先获得了一个ReplicationSource的接口,接着调用init初始化该接口,此外,getReplicationSource还有一个重要的作用是它实例化了replicationEndpoint(HBaseInterClusterReplicationEndpoint)。回到addSource这个方法,它返回前调用了ReplicationSource的startup方法,startup是个挺有意思的方法,代码如下:
ReplicationSource是个守护线程,在startUp中启动了自己。。。。这么说也就是replicationPeers中的每个peerid都表示了一个slave集群,而每个slave集群都有一个自己的ReplicationSource线程。现在的重点就落在了ReplicationSource这个守护线程的处理逻辑,可以从它的run方法入手分析。
run中有如下几个关键步骤,首先:
1、启动replicationEndpoint :Service.State state = replicationEndpoint.start().get();
2、构造walEntryFilter:this.walEntryFilter = new ChainWALEntryFilter(filters);
3、进入一个循环,循环持续运行至守护线程ReplicationSource终止:
while(isActive) {
获取log path;
调用openReader打开当前path的log reader(后文详解);
从reader中依次读取WAL.Entry并放入一个List<WAL.Entry>的数据结构中,方法调用如下:
readAllEntriesToReplicateOrNextFile(currentWALisBingWrittenTo, entries)
最后调用shipEdits将entries发送到远端集群;
}
发送WALEntry到从集群的逻辑在方法shipEdits中完成,ship方法接收一个List<WAL.Entry>类型的参数entries,在shipEdits中entries参数被包装进replicateContext中并发送到从集群,这部分的主要代码如下所示:
还记得前文中说到,replicationEndpoint在getReplicationSource中初始化为HBaseInterClusterReplicationEndpoint类型的变量。进入HBaseInterClusterReplicationEndpoint的replicate方法的实现,该方法首先从参数replicateContext中获得List<Entry> entries,关键的wal传递在下面这段代码中:
其中最后一句将Entry对象序列化之后由文首RegionServer中初始化的ReplicationSinkService发送到远端集群;
以上这些就是大概的replication时,wal跨集群传递的一些细节实现。接下来回过头详细解释上文留下的一个小辫子,就是围绕ReplicationSource的openReader方法的实现,分析这个调用的目的是理清wal的读逻辑是什么样的。
ReplicationSource的openReader以currentPath为参数,调用ReplicationWALReaderManager的openReader
ReplicationWALReaderManager的openReader通过WALFactory.createReader返回指定文件的reader;
看看WALFactory.createReader中的关键代码吧:
可见Reader是在这里构建的,我们以最常见的lrClass属于ProtobufLogReader.class为例来解释,首先初始化一个数据输入流FSDataInputStream,通过这个流打开文件fs(fs在输入参数中指定),根据isPbWal选择new不同的Reader实例,最后调用reader的init方法完成初始化工作。这里的Reader大多数是DefaultWALProvider.Reader类型的。
Reader创建已经分析完毕,那读实现是什么样的?
读的动作主要在readAllEntriesToReplicateOrNextFile中,该方法接收一个List<WAL.Entry>类型的参数entries,也就是说读到的各个log entry在entries中返回,下面一一分析readAllEntriesToReplicateOrNextFile中的主要逻辑。
1、this.repLogReader.seek();
2、WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
3、进入循环
while(entry != null) {
//过滤掉已经消费掉的log entry
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
entry = walEntryFilter.filter(entry); //过滤的逻辑在walEntryFilter中实现
entries.add(entry);
}
try {
entry = this.repLogReader.readNextAndSetPosition();
}
}
4、各种metrics处理;
WALEntryFilter的作用是在把wal entries发送到slave集群前过滤掉某些并不需要的发送WAL Entries,它有很多个实现类,所有的类都实现了filter方法,这些不同的WALEntryFilter可以通过ChainWALEntryFilter构成一条责任链。HLog文件读出的wal entries流经责任链,筛选出需要replicate的walEntry,这是典型的责任链模式的应用。
转载于:https://my.oschina.net/sniperLi/blog/910764
总结
以上是生活随笔为你收集整理的HBase Replication源码解析之HLog读取的全部内容,希望文章能够帮你解决所遇到的问题。
- 上一篇: [BZOJ 1076][SCOI2008
- 下一篇: 常用的 16 个 Sublime Tex