001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.master; 012 013import static org.junit.Assert.assertEquals; 014 015import java.io.IOException; 016import java.util.HashMap; 017import java.util.List; 018 019import org.apache.hadoop.conf.Configuration; 020import org.apache.hadoop.hbase.HBaseClassTestRule; 021import org.apache.hadoop.hbase.HBaseTestingUtility; 022import org.apache.hadoop.hbase.MiniHBaseCluster; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 025import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 026import org.apache.hadoop.hbase.testclassification.MasterTests; 027import org.apache.hadoop.hbase.testclassification.MediumTests; 028import org.apache.hadoop.hbase.util.Pair; 029import org.apache.zookeeper.KeeperException; 030import org.junit.AfterClass; 031import org.junit.BeforeClass; 032import org.junit.ClassRule; 033import org.junit.Test; 034import org.junit.experimental.categories.Category; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 041 042@Category({ MasterTests.class, MediumTests.class }) 043public class TestGetReplicationLoad { 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestGetReplicationLoad.class); 047 048 private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); 049 050 private static MiniHBaseCluster cluster; 051 private static HMaster master; 052 private static HBaseTestingUtility TEST_UTIL; 053 054 public static class MyMaster extends HMaster { 055 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 056 super(conf); 057 } 058 059 @Override 060 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) { 061 // do nothing 062 } 063 } 064 065 @BeforeClass 066 public static void startCluster() throws Exception { 067 LOG.info("Starting cluster"); 068 TEST_UTIL = new HBaseTestingUtility(); 069 TEST_UTIL.startMiniCluster(1, 1, 1, null, TestMasterMetrics.MyMaster.class, null); 070 cluster = TEST_UTIL.getHBaseCluster(); 071 LOG.info("Waiting for active/ready master"); 072 cluster.waitForActiveAndReadyMaster(); 073 master = cluster.getMaster(); 074 } 075 076 @AfterClass 077 public static void after() throws Exception { 078 if (TEST_UTIL != null) { 079 TEST_UTIL.shutdownMiniCluster(); 080 } 081 } 082 083 @Test 084 public void testGetReplicationMetrics() throws Exception { 085 String peer1 = "test1", peer2 = "test2"; 086 long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4; 087 int sizeOfLogQueue = 5; 088 RegionServerStatusProtos.RegionServerReportRequest.Builder request = 089 RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); 090 ServerName serverName = cluster.getMaster(0).getServerName(); 091 request.setServer(ProtobufUtil.toServerName(serverName)); 092 ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource 093 .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) 094 .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) 095 .setSizeOfLogQueue(sizeOfLogQueue).build(); 096 ClusterStatusProtos.ReplicationLoadSource rload2 = 097 ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) 098 .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) 099 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) 100 .setSizeOfLogQueue(sizeOfLogQueue + 1).build(); 101 ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() 102 .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); 103 request.setLoad(sl); 104 master.getReplicationPeerManager().addPeer(peer1, 105 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 106 master.getReplicationPeerManager().addPeer(peer2, 107 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 108 master.getMasterRpcServices().regionServerReport(null, request.build()); 109 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad = 110 master.getReplicationLoad(new ServerName[] { serverName }); 111 assertEquals("peer size ", 2, replicationLoad.size()); 112 assertEquals("load size ", 1, replicationLoad.get(peer1).size()); 113 assertEquals("log queue size of peer1", sizeOfLogQueue, 114 replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); 115 assertEquals("replication lag of peer2", replicationLag + 1, 116 replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); 117 master.stopMaster(); 118 } 119}