欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

HBase Replication源码解析之HLog读取

发布时间:2024/4/13 编程问答 38 豆豆
生活随笔 收集整理的这篇文章主要介绍了 HBase Replication源码解析之HLog读取 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

在HRegionServer中两个量和replication相关,如下所示:

 

[java] view plain copy

  • //Replication services. if no replication, this handler will be null  
  • protected ReplicationSourceService replicationSourceHandler;  
  • protected ReplicationSinkService replicationSinkHandler;  
  • 在ReplicationSourceService中只有一个方法getWALActionsListener,该方法返回WALActionsListener。ReplicationSinkService同样也是一个接口类,它有一个方法replicateLogEntries。在HRegionServer的如下代码段中会启动replicationservice。

     

     

    [java] view plain copy

  • if(this.replicationSourceHandler == this.replicationSinkHandler && this.replicationSourceHandler != null) {  
  •    this.replicationSourceHandler.startReplicationService();  
  • }else {  
  •    if(this.replicationSourceHandler != null) {  
  •        this.replicationSourceHandler.startReplicationService();  
  •    }  
  •    if(this.replicationSinkHandler != null) {  
  •        this.replicationSinkHandler.startReplicationService();  
  •    }  
  • }  
  • startReplicationService中做了三件事,分别是调用ReplicationSourceManager的init方法,初始化replicationSink,初始化调度线程池scheduleThreadPool;

    startReplicationService方法中调用了ReplicationSourceManager的init方法,init中遍历replicationPeers中的peerid,并以该id为参数,调用addSource方法,在addSource中针对每一个peerid构造了一个对象ReplicationSource,ReplicationSource是个守护进程,这里初始化的时候并不是通过构造函数,而是通过getReplicationSource函数,在这个方法里先获得了一个ReplicationSource的接口,接着调用init初始化该接口,此外,getReplicationSource还有一个重要的作用是它实例化了replicationEndpoint(HBaseInterClusterReplicationEndpoint)。回到addSource这个方法,它返回前调用了ReplicationSource的startup方法,startup是个挺有意思的方法,代码如下:

     

    ReplicationSource是个守护线程,在startUp中启动了自己。。。。这么说也就是replicationPeers中的每个peerid都表示了一个slave集群,而每个slave集群都有一个自己的ReplicationSource线程。现在的重点就落在了ReplicationSource这个守护线程的处理逻辑,可以从它的run方法入手分析。

     

    run中有如下几个关键步骤,首先:

              1、启动replicationEndpoint :Service.State state = replicationEndpoint.start().get();

              2、构造walEntryFilter:this.walEntryFilter = new ChainWALEntryFilter(filters);

              3、进入一个循环,循环持续运行至守护线程ReplicationSource终止:

                             while(isActive) {

                                  获取log path;

                                  调用openReader打开当前path的log reader(后文详解);

                                  从reader中依次读取WAL.Entry并放入一个List<WAL.Entry>的数据结构中,方法调用如下:

                                       readAllEntriesToReplicateOrNextFile(currentWALisBingWrittenTo, entries)

                                  最后调用shipEdits将entries发送到远端集群;

                             }

     

    发送WALEntry到从集群的逻辑在方法shipEdits中完成,ship方法接收一个List<WAL.Entry>类型的参数entries,在shipEdits中entries参数被包装进replicateContext中并发送到从集群,这部分的主要代码如下所示:

    还记得前文中说到,replicationEndpoint在getReplicationSource中初始化为HBaseInterClusterReplicationEndpoint类型的变量。进入HBaseInterClusterReplicationEndpoint的replicate方法的实现,该方法首先从参数replicateContext中获得List<Entry> entries,关键的wal传递在下面这段代码中:

    其中最后一句将Entry对象序列化之后由文首RegionServer中初始化的ReplicationSinkService发送到远端集群;

    以上这些就是大概的replication时,wal跨集群传递的一些细节实现。接下来回过头详细解释上文留下的一个小辫子,就是围绕ReplicationSource的openReader方法的实现,分析这个调用的目的是理清wal的读逻辑是什么样的。

     

    ReplicationSource的openReader以currentPath为参数,调用ReplicationWALReaderManager的openReader

     

    ReplicationWALReaderManager的openReader通过WALFactory.createReader返回指定文件的reader;

     

    看看WALFactory.createReader中的关键代码吧:

     

     

    可见Reader是在这里构建的,我们以最常见的lrClass属于ProtobufLogReader.class为例来解释,首先初始化一个数据输入流FSDataInputStream,通过这个流打开文件fs(fs在输入参数中指定),根据isPbWal选择new不同的Reader实例,最后调用reader的init方法完成初始化工作。这里的Reader大多数是DefaultWALProvider.Reader类型的。

     

    Reader创建已经分析完毕,那读实现是什么样的?

     

    读的动作主要在readAllEntriesToReplicateOrNextFile中,该方法接收一个List<WAL.Entry>类型的参数entries,也就是说读到的各个log entry在entries中返回,下面一一分析readAllEntriesToReplicateOrNextFile中的主要逻辑。

              1、this.repLogReader.seek();

              2、WAL.Entry entry = this.repLogReader.readNextAndSetPosition();

              3、进入循环

                     while(entry != null) {

                           //过滤掉已经消费掉的log entry

                          if (replicationEndpoint.canReplicateToSameCluster()

                              || !entry.getKey().getClusterIds().contains(peerClusterId)) {

                              entry = walEntryFilter.filter(entry);  //过滤的逻辑在walEntryFilter中实现

                              entries.add(entry);

                          }

                          try {

                                  entry = this.repLogReader.readNextAndSetPosition();

                          }

                      }

     

     

              4、各种metrics处理;

     

    WALEntryFilter的作用是在把wal entries发送到slave集群前过滤掉某些并不需要的发送WAL Entries,它有很多个实现类,所有的类都实现了filter方法,这些不同的WALEntryFilter可以通过ChainWALEntryFilter构成一条责任链。HLog文件读出的wal entries流经责任链,筛选出需要replicate的walEntry,这是典型的责任链模式的应用。

    转载于:https://my.oschina.net/sniperLi/blog/910764

    总结

    以上是生活随笔为你收集整理的HBase Replication源码解析之HLog读取的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。