001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.HashMap; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.HBaseTestingUtility; 028import org.apache.hadoop.hbase.MiniHBaseCluster; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.StartMiniClusterOption; 031import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 033import org.apache.hadoop.hbase.testclassification.MasterTests; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.apache.zookeeper.KeeperException; 037import org.junit.AfterClass; 038import org.junit.BeforeClass; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 048 049@Category({ MasterTests.class, MediumTests.class }) 050public class TestGetReplicationLoad { 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestGetReplicationLoad.class); 054 055 private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); 056 057 private static MiniHBaseCluster cluster; 058 private static HMaster master; 059 private static HBaseTestingUtility TEST_UTIL; 060 061 public static class MyMaster extends HMaster { 062 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 063 super(conf); 064 } 065 066 @Override 067 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) { 068 // do nothing 069 } 070 } 071 072 @BeforeClass 073 public static void startCluster() throws Exception { 074 LOG.info("Starting cluster"); 075 TEST_UTIL = new HBaseTestingUtility(); 076 // Set master class and use default values for other options. 077 StartMiniClusterOption option = 078 StartMiniClusterOption.builder().masterClass(TestMasterMetrics.MyMaster.class).build(); 079 TEST_UTIL.startMiniCluster(option); 080 cluster = TEST_UTIL.getHBaseCluster(); 081 LOG.info("Waiting for active/ready master"); 082 cluster.waitForActiveAndReadyMaster(); 083 master = cluster.getMaster(); 084 } 085 086 @AfterClass 087 public static void after() throws Exception { 088 if (TEST_UTIL != null) { 089 TEST_UTIL.shutdownMiniCluster(); 090 } 091 } 092 093 @Test 094 public void testGetReplicationMetrics() throws Exception { 095 String peer1 = "test1", peer2 = "test2", queueId = "1"; 096 long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4, 097 timeStampOfNextToReplicate = 5, editsRead = 6, oPsShipped = 7; 098 int sizeOfLogQueue = 8; 099 boolean recovered = false, running = false, editsSinceRestart = false; 100 RegionServerStatusProtos.RegionServerReportRequest.Builder request = 101 RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); 102 ServerName serverName = cluster.getMaster(0).getServerName(); 103 request.setServer(ProtobufUtil.toServerName(serverName)); 104 ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource 105 .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) 106 .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) 107 .setSizeOfLogQueue(sizeOfLogQueue).setTimeStampOfNextToReplicate(timeStampOfNextToReplicate) 108 .setQueueId(queueId).setEditsRead(editsRead).setOPsShipped(oPsShipped).setRunning(running) 109 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 110 ClusterStatusProtos.ReplicationLoadSource rload2 = 111 ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) 112 .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) 113 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) 114 .setSizeOfLogQueue(sizeOfLogQueue + 1) 115 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate + 1).setQueueId(queueId) 116 .setEditsRead(editsRead + 1).setOPsShipped(oPsShipped + 1).setRunning(running) 117 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 118 119 ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() 120 .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); 121 request.setLoad(sl); 122 master.getReplicationPeerManager().addPeer(peer1, 123 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 124 master.getReplicationPeerManager().addPeer(peer2, 125 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 126 master.getMasterRpcServices().regionServerReport(null, request.build()); 127 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad = 128 master.getReplicationLoad(new ServerName[] { serverName }); 129 assertEquals("peer size ", 2, replicationLoad.size()); 130 assertEquals("load size ", 1, replicationLoad.get(peer1).size()); 131 assertEquals("log queue size of peer1", sizeOfLogQueue, 132 replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); 133 assertEquals("replication lag of peer2", replicationLag + 1, 134 replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); 135 master.stopMaster(); 136 } 137}