001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.EnumSet;
025import java.util.List;
026import org.apache.hadoop.hbase.ClusterMetrics;
027import org.apache.hadoop.hbase.ClusterMetrics.Option;
028import org.apache.hadoop.hbase.ServerMetrics;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.regionserver.HRegionServer;
034import org.apache.hadoop.hbase.testclassification.MediumTests;
035import org.apache.hadoop.hbase.testclassification.ReplicationTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.hbase.util.JVMClusterUtil;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.jupiter.api.Tag;
040import org.junit.jupiter.api.Test;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044@Tag(ReplicationTests.TAG)
045@Tag(MediumTests.TAG)
046public class TestReplicationStatus extends TestReplicationBase {
047
048  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class);
049
050  static void insertRowsOnSource() throws IOException {
051    final byte[] qualName = Bytes.toBytes("q");
052    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
053      Put p = new Put(Bytes.toBytes("row" + i));
054      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
055      htable1.put(p);
056    }
057  }
058
059  /**
060   * Test for HBASE-9531.
061   * <p/>
062   * put a few rows into htable1, which should be replicated to htable2 <br/>
063   * create a ClusterStatus instance 'status' from HBaseAdmin <br/>
064   * test : status.getLoad(server).getReplicationLoadSourceList() <br/>
065   * test : status.getLoad(server).getReplicationLoadSink()
066   */
067  @Test
068  public void testReplicationStatus() throws Exception {
069    // This test wants two RS's up. We only run one generally so add one.
070    UTIL1.getMiniHBaseCluster().startRegionServer();
071    Waiter.waitFor(UTIL1.getConfiguration(), 30000, new Waiter.Predicate<Exception>() {
072      @Override
073      public boolean evaluate() throws Exception {
074        return UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() > 1;
075      }
076    });
077    Admin hbaseAdmin = UTIL1.getAdmin();
078    // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
079    hbaseAdmin.disableReplicationPeer(PEER_ID2);
080    insertRowsOnSource();
081    LOG.info("AFTER PUTS");
082    // TODO: Change this wait to a barrier. I tried waiting on replication stats to
083    // change but sleeping in main thread seems to mess up background replication.
084    // HACK! To address flakeyness.
085    Threads.sleep(10000);
086    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
087    for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster()
088      .getRegionServerThreads()) {
089      ServerName server = thread.getRegionServer().getServerName();
090      assertTrue(metrics.getLiveServerMetrics().containsKey(server), "" + server);
091      ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
092      List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
093      ReplicationLoadSink rLoadSink = sm.getReplicationLoadSink();
094
095      // check SourceList only has one entry, because only has one peer
096      assertEquals(1, rLoadSourceList.size(),
097        "Failed to get ReplicationLoadSourceList " + rLoadSourceList + ", " + server);
098      assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
099
100      // check Sink exist only as it is difficult to verify the value on the fly
101      assertTrue(rLoadSink.getAgeOfLastAppliedOp() >= 0,
102        "failed to get ReplicationLoadSink.AgeOfLastShippedOp");
103      assertTrue(rLoadSink.getTimestampsOfLastAppliedOp() >= 0,
104        "failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp");
105    }
106
107    // Stop rs1, then the queue of rs1 will be transfered to rs0
108    HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(1);
109    hrs.stop("Stop RegionServer");
110    while (hrs.isAlive()) {
111      Threads.sleep(100);
112    }
113    // To be sure it dead and references cleaned up. TODO: Change this to a barrier.
114    // I tried waiting on replication stats to change but sleeping in main thread
115    // seems to mess up background replication.
116    Threads.sleep(10000);
117    ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
118    List<ReplicationLoadSource> rLoadSourceList = waitOnMetricsReport(1, server);
119    // The remaining server should now have two queues -- the original and then the one that was
120    // added because of failover. The original should still be PEER_ID2 though.
121    assertEquals(2, rLoadSourceList.size(), "Failed ReplicationLoadSourceList " + rLoadSourceList);
122    assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
123  }
124
125  /**
126   * Wait until Master shows metrics counts for ReplicationLoadSourceList that are greater than
127   * <code>greaterThan</code> for <code>serverName</code> before returning. We want to avoid case
128   * where RS hasn't yet updated Master before allowing test proceed.
129   * @param greaterThan size of replicationLoadSourceList must be greater before we proceed
130   */
131  private List<ReplicationLoadSource> waitOnMetricsReport(int greaterThan, ServerName serverName)
132    throws Exception {
133    UTIL1.waitFor(30000, 1000, new Waiter.ExplainingPredicate<Exception>() {
134      @Override
135      public boolean evaluate() throws Exception {
136        List<ReplicationLoadSource> list =
137          hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
138            .get(serverName).getReplicationLoadSourceList();
139        return list.size() > greaterThan;
140      }
141
142      @Override
143      public String explainFailure() throws Exception {
144        return "The ReplicationLoadSourceList's size is lesser than or equal to " + greaterThan
145          + " for " + serverName;
146      }
147    });
148
149    return hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
150      .get(serverName).getReplicationLoadSourceList();
151  }
152}