001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.master; 012 013import static org.junit.Assert.assertEquals; 014 015import java.io.IOException; 016import java.util.HashMap; 017import java.util.List; 018 019import org.apache.hadoop.conf.Configuration; 020import org.apache.hadoop.hbase.HBaseClassTestRule; 021import org.apache.hadoop.hbase.HBaseTestingUtility; 022import org.apache.hadoop.hbase.MiniHBaseCluster; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.StartMiniClusterOption; 025import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 026import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 027import org.apache.hadoop.hbase.testclassification.MasterTests; 028import org.apache.hadoop.hbase.testclassification.MediumTests; 029import org.apache.hadoop.hbase.util.Pair; 030import org.apache.zookeeper.KeeperException; 031import org.junit.AfterClass; 032import org.junit.BeforeClass; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 042 043@Category({ MasterTests.class, MediumTests.class }) 044public class TestGetReplicationLoad { 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestGetReplicationLoad.class); 048 049 private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); 050 051 private static MiniHBaseCluster cluster; 052 private static HMaster master; 053 private static HBaseTestingUtility TEST_UTIL; 054 055 public static class MyMaster extends HMaster { 056 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 057 super(conf); 058 } 059 060 @Override 061 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) { 062 // do nothing 063 } 064 } 065 066 @BeforeClass 067 public static void startCluster() throws Exception { 068 LOG.info("Starting cluster"); 069 TEST_UTIL = new HBaseTestingUtility(); 070 // Set master class and use default values for other options. 071 StartMiniClusterOption option = StartMiniClusterOption.builder() 072 .masterClass(TestMasterMetrics.MyMaster.class).build(); 073 TEST_UTIL.startMiniCluster(option); 074 cluster = TEST_UTIL.getHBaseCluster(); 075 LOG.info("Waiting for active/ready master"); 076 cluster.waitForActiveAndReadyMaster(); 077 master = cluster.getMaster(); 078 } 079 080 @AfterClass 081 public static void after() throws Exception { 082 if (TEST_UTIL != null) { 083 TEST_UTIL.shutdownMiniCluster(); 084 } 085 } 086 087 @Test 088 public void testGetReplicationMetrics() throws Exception { 089 String peer1 = "test1", peer2 = "test2", queueId="1"; 090 long ageOfLastShippedOp = 2, 091 replicationLag = 3, 092 timeStampOfLastShippedOp = 4, 093 timeStampOfNextToReplicate=5, 094 editsRead=6, 095 oPsShipped=7; 096 int sizeOfLogQueue = 8; 097 boolean recovered=false, 098 running=false, 099 editsSinceRestart=false; 100 RegionServerStatusProtos.RegionServerReportRequest.Builder request = 101 RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); 102 ServerName serverName = cluster.getMaster(0).getServerName(); 103 request.setServer(ProtobufUtil.toServerName(serverName)); 104 ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource 105 .newBuilder().setPeerID(peer1) 106 .setAgeOfLastShippedOp(ageOfLastShippedOp) 107 .setReplicationLag(replicationLag) 108 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) 109 .setSizeOfLogQueue(sizeOfLogQueue) 110 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate) 111 .setQueueId(queueId) 112 .setEditsRead(editsRead) 113 .setOPsShipped(oPsShipped) 114 .setRunning(running) 115 .setRecovered(recovered) 116 .setEditsSinceRestart(editsSinceRestart) 117 .build(); 118 ClusterStatusProtos.ReplicationLoadSource rload2 = 119 ClusterStatusProtos.ReplicationLoadSource 120 .newBuilder() 121 .setPeerID(peer2) 122 .setAgeOfLastShippedOp(ageOfLastShippedOp + 1) 123 .setReplicationLag(replicationLag + 1) 124 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) 125 .setSizeOfLogQueue(sizeOfLogQueue + 1) 126 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate+1) 127 .setQueueId(queueId) 128 .setEditsRead(editsRead+1) 129 .setOPsShipped(oPsShipped+1) 130 .setRunning(running) 131 .setRecovered(recovered) 132 .setEditsSinceRestart(editsSinceRestart) 133 .build(); 134 135 ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() 136 .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); 137 request.setLoad(sl); 138 master.getReplicationPeerManager().addPeer(peer1, 139 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 140 master.getReplicationPeerManager().addPeer(peer2, 141 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 142 master.getMasterRpcServices().regionServerReport(null, request.build()); 143 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad = 144 master.getReplicationLoad(new ServerName[] { serverName }); 145 assertEquals("peer size ", 2, replicationLoad.size()); 146 assertEquals("load size ", 1, replicationLoad.get(peer1).size()); 147 assertEquals("log queue size of peer1", sizeOfLogQueue, 148 replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue()); 149 assertEquals("replication lag of peer2", replicationLag + 1, 150 replicationLoad.get(peer2).get(0).getSecond().getReplicationLag()); 151 master.stopMaster(); 152 } 153}