001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.EnumSet;
025import java.util.List;
026import org.apache.hadoop.hbase.ClusterMetrics;
027import org.apache.hadoop.hbase.ClusterMetrics.Option;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.ServerMetrics;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.Waiter;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.regionserver.HRegionServer;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.testclassification.ReplicationTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.util.JVMClusterUtil;
039import org.apache.hadoop.hbase.util.Threads;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046@Category({ ReplicationTests.class, MediumTests.class })
047public class TestReplicationStatus extends TestReplicationBase {
048  private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStatus.class);
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052    HBaseClassTestRule.forClass(TestReplicationStatus.class);
053
054  static void insertRowsOnSource() throws IOException {
055    final byte[] qualName = Bytes.toBytes("q");
056    for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
057      Put p = new Put(Bytes.toBytes("row" + i));
058      p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
059      htable1.put(p);
060    }
061  }
062
063  /**
064   * Test for HBASE-9531.
065   * <p/>
066   * put a few rows into htable1, which should be replicated to htable2 <br/>
067   * create a ClusterStatus instance 'status' from HBaseAdmin <br/>
068   * test : status.getLoad(server).getReplicationLoadSourceList() <br/>
069   * test : status.getLoad(server).getReplicationLoadSink()
070   */
071  @Test
072  public void testReplicationStatus() throws Exception {
073    // This test wants two RS's up. We only run one generally so add one.
074    UTIL1.getMiniHBaseCluster().startRegionServer();
075    Waiter.waitFor(UTIL1.getConfiguration(), 30000, new Waiter.Predicate<Exception>() {
076      @Override
077      public boolean evaluate() throws Exception {
078        return UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() > 1;
079      }
080    });
081    Admin hbaseAdmin = UTIL1.getAdmin();
082    // disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
083    hbaseAdmin.disableReplicationPeer(PEER_ID2);
084    insertRowsOnSource();
085    LOG.info("AFTER PUTS");
086    // TODO: Change this wait to a barrier. I tried waiting on replication stats to
087    // change but sleeping in main thread seems to mess up background replication.
088    // HACK! To address flakeyness.
089    Threads.sleep(10000);
090    ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
091    for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster()
092      .getRegionServerThreads()) {
093      ServerName server = thread.getRegionServer().getServerName();
094      assertTrue("" + server, metrics.getLiveServerMetrics().containsKey(server));
095      ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
096      List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
097      ReplicationLoadSink rLoadSink = sm.getReplicationLoadSink();
098
099      // check SourceList only has one entry, because only has one peer
100      assertEquals("Failed to get ReplicationLoadSourceList " + rLoadSourceList + ", " + server, 1,
101        rLoadSourceList.size());
102      assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
103
104      // check Sink exist only as it is difficult to verify the value on the fly
105      assertTrue("failed to get ReplicationLoadSink.AgeOfLastShippedOp ",
106        (rLoadSink.getAgeOfLastAppliedOp() >= 0));
107      assertTrue("failed to get ReplicationLoadSink.TimeStampsOfLastAppliedOp ",
108        (rLoadSink.getTimestampsOfLastAppliedOp() >= 0));
109    }
110
111    // Stop rs1, then the queue of rs1 will be transfered to rs0
112    HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(1);
113    hrs.stop("Stop RegionServer");
114    while (!hrs.isShutDown()) {
115      Threads.sleep(100);
116    }
117    // To be sure it dead and references cleaned up. TODO: Change this to a barrier.
118    // I tried waiting on replication stats to change but sleeping in main thread
119    // seems to mess up background replication.
120    Threads.sleep(10000);
121    ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
122    List<ReplicationLoadSource> rLoadSourceList = waitOnMetricsReport(1, server);
123    // The remaining server should now have two queues -- the original and then the one that was
124    // added because of failover. The original should still be PEER_ID2 though.
125    assertEquals("Failed ReplicationLoadSourceList " + rLoadSourceList, 2, rLoadSourceList.size());
126    assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
127  }
128
129  /**
130   * Wait until Master shows metrics counts for ReplicationLoadSourceList that are greater than
131   * <code>greaterThan</code> for <code>serverName</code> before returning. We want to avoid case
132   * where RS hasn't yet updated Master before allowing test proceed.
133   * @param greaterThan size of replicationLoadSourceList must be greater before we proceed
134   */
135  private List<ReplicationLoadSource> waitOnMetricsReport(int greaterThan, ServerName serverName)
136    throws Exception {
137    UTIL1.waitFor(30000, 1000, new Waiter.ExplainingPredicate<Exception>() {
138      @Override
139      public boolean evaluate() throws Exception {
140        List<ReplicationLoadSource> list =
141          hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
142            .get(serverName).getReplicationLoadSourceList();
143        return list.size() > greaterThan;
144      }
145
146      @Override
147      public String explainFailure() throws Exception {
148        return "The ReplicationLoadSourceList's size is lesser than or equal to " + greaterThan
149          + " for " + serverName;
150      }
151    });
152
153    return hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics()
154      .get(serverName).getReplicationLoadSourceList();
155  }
156}