001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
003 * agreements. See the NOTICE file distributed with this work for additional information regarding
004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
005 * "License"); you may not use this file except in compliance with the License. You may obtain a
006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
009 * for the specific language governing permissions and limitations under the License.
010 */
011package org.apache.hadoop.hbase.master;
012
013import static org.junit.Assert.assertEquals;
014
015import java.io.IOException;
016import java.util.HashMap;
017import java.util.List;
018
019import org.apache.hadoop.conf.Configuration;
020import org.apache.hadoop.hbase.HBaseClassTestRule;
021import org.apache.hadoop.hbase.HBaseTestingUtility;
022import org.apache.hadoop.hbase.MiniHBaseCluster;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
025import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
026import org.apache.hadoop.hbase.testclassification.MasterTests;
027import org.apache.hadoop.hbase.testclassification.MediumTests;
028import org.apache.hadoop.hbase.util.Pair;
029import org.apache.zookeeper.KeeperException;
030import org.junit.AfterClass;
031import org.junit.BeforeClass;
032import org.junit.ClassRule;
033import org.junit.Test;
034import org.junit.experimental.categories.Category;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
041
042@Category({ MasterTests.class, MediumTests.class })
043public class TestGetReplicationLoad {
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046      HBaseClassTestRule.forClass(TestGetReplicationLoad.class);
047
048  private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class);
049
050  private static MiniHBaseCluster cluster;
051  private static HMaster master;
052  private static HBaseTestingUtility TEST_UTIL;
053
054  public static class MyMaster extends HMaster {
055    public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
056      super(conf);
057    }
058
059    @Override
060    protected void tryRegionServerReport(long reportStartTime, long reportEndTime) {
061      // do nothing
062    }
063  }
064
065  @BeforeClass
066  public static void startCluster() throws Exception {
067    LOG.info("Starting cluster");
068    TEST_UTIL = new HBaseTestingUtility();
069    TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null);
070    cluster = TEST_UTIL.getHBaseCluster();
071    LOG.info("Waiting for active/ready master");
072    cluster.waitForActiveAndReadyMaster();
073    master = cluster.getMaster();
074  }
075
076  @AfterClass
077  public static void after() throws Exception {
078    if (TEST_UTIL != null) {
079      TEST_UTIL.shutdownMiniCluster();
080    }
081  }
082
083  @Test
084  public void testGetReplicationMetrics() throws Exception {
085    String peer1 = "test1", peer2 = "test2";
086    long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4;
087    int sizeOfLogQueue = 5;
088    RegionServerStatusProtos.RegionServerReportRequest.Builder request =
089        RegionServerStatusProtos.RegionServerReportRequest.newBuilder();
090    ServerName serverName = cluster.getMaster(0).getServerName();
091    request.setServer(ProtobufUtil.toServerName(serverName));
092    ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource
093        .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp)
094        .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp)
095        .setSizeOfLogQueue(sizeOfLogQueue).build();
096    ClusterStatusProtos.ReplicationLoadSource rload2 =
097        ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2)
098            .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1)
099            .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1)
100            .setSizeOfLogQueue(sizeOfLogQueue + 1).build();
101    ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder()
102        .addReplLoadSource(rload1).addReplLoadSource(rload2).build();
103    request.setLoad(sl);
104    master.getReplicationPeerManager().addPeer(peer1,
105      ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true);
106    master.getReplicationPeerManager().addPeer(peer2,
107      ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true);
108    master.getMasterRpcServices().regionServerReport(null, request.build());
109    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad =
110        master.getReplicationLoad(new ServerName[] { serverName });
111    assertEquals("peer size ", 2, replicationLoad.size());
112    assertEquals("load size ", 1, replicationLoad.get(peer1).size());
113    assertEquals("log queue size of peer1", sizeOfLogQueue,
114      replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue());
115    assertEquals("replication lag of peer2", replicationLag + 1,
116      replicationLoad.get(peer2).get(0).getSecond().getReplicationLag());
117    master.stopMaster();
118  }
119}