001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.HashMap; 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 029import org.apache.hadoop.hbase.StartTestingClusterOption; 030import org.apache.hadoop.hbase.replication.ReplicationLoadSource; 031import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 032import org.apache.hadoop.hbase.testclassification.MasterTests; 033import org.apache.hadoop.hbase.testclassification.MediumTests; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.zookeeper.KeeperException; 036import org.junit.jupiter.api.AfterAll; 037import org.junit.jupiter.api.BeforeAll; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 046 047@Tag(MasterTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestGetReplicationLoad { 050 051 private static final Logger LOG = LoggerFactory.getLogger(TestGetReplicationLoad.class); 052 053 private static SingleProcessHBaseCluster cluster; 054 private static HMaster master; 055 private static HBaseTestingUtil TEST_UTIL; 056 057 public static class MyMaster extends HMaster { 058 public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException { 059 super(conf); 060 } 061 } 062 063 @BeforeAll 064 public static void startCluster() throws Exception { 065 LOG.info("Starting cluster"); 066 TEST_UTIL = new HBaseTestingUtil(); 067 // Set master class and use default values for other options. 068 StartTestingClusterOption option = 069 StartTestingClusterOption.builder().masterClass(TestMasterMetrics.MyMaster.class).build(); 070 TEST_UTIL.startMiniCluster(option); 071 cluster = TEST_UTIL.getHBaseCluster(); 072 LOG.info("Waiting for active/ready master"); 073 cluster.waitForActiveAndReadyMaster(); 074 master = cluster.getMaster(); 075 } 076 077 @AfterAll 078 public static void after() throws Exception { 079 if (TEST_UTIL != null) { 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 } 083 084 @Test 085 public void testGetReplicationMetrics() throws Exception { 086 String peer1 = "test1", peer2 = "test2", queueId = "1"; 087 long ageOfLastShippedOp = 2, replicationLag = 3, timeStampOfLastShippedOp = 4, 088 timeStampOfNextToReplicate = 5, editsRead = 6, oPsShipped = 7; 089 int sizeOfLogQueue = 8; 090 boolean recovered = false, running = false, editsSinceRestart = false; 091 RegionServerStatusProtos.RegionServerReportRequest.Builder request = 092 RegionServerStatusProtos.RegionServerReportRequest.newBuilder(); 093 ServerName serverName = cluster.getMaster(0).getServerName(); 094 request.setServer(ProtobufUtil.toServerName(serverName)); 095 ClusterStatusProtos.ReplicationLoadSource rload1 = ClusterStatusProtos.ReplicationLoadSource 096 .newBuilder().setPeerID(peer1).setAgeOfLastShippedOp(ageOfLastShippedOp) 097 .setReplicationLag(replicationLag).setTimeStampOfLastShippedOp(timeStampOfLastShippedOp) 098 .setSizeOfLogQueue(sizeOfLogQueue).setTimeStampOfNextToReplicate(timeStampOfNextToReplicate) 099 .setQueueId(queueId).setEditsRead(editsRead).setOPsShipped(oPsShipped).setRunning(running) 100 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 101 ClusterStatusProtos.ReplicationLoadSource rload2 = 102 ClusterStatusProtos.ReplicationLoadSource.newBuilder().setPeerID(peer2) 103 .setAgeOfLastShippedOp(ageOfLastShippedOp + 1).setReplicationLag(replicationLag + 1) 104 .setTimeStampOfLastShippedOp(timeStampOfLastShippedOp + 1) 105 .setSizeOfLogQueue(sizeOfLogQueue + 1) 106 .setTimeStampOfNextToReplicate(timeStampOfNextToReplicate + 1).setQueueId(queueId) 107 .setEditsRead(editsRead + 1).setOPsShipped(oPsShipped + 1).setRunning(running) 108 .setRecovered(recovered).setEditsSinceRestart(editsSinceRestart).build(); 109 110 ClusterStatusProtos.ServerLoad sl = ClusterStatusProtos.ServerLoad.newBuilder() 111 .addReplLoadSource(rload1).addReplLoadSource(rload2).build(); 112 request.setLoad(sl); 113 master.getReplicationPeerManager().addPeer(peer1, 114 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 115 master.getReplicationPeerManager().addPeer(peer2, 116 ReplicationPeerConfig.newBuilder().setClusterKey("test").build(), true); 117 master.getMasterRpcServices().regionServerReport(null, request.build()); 118 HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoad = 119 master.getReplicationLoad(new ServerName[] { serverName }); 120 assertEquals(2, replicationLoad.size(), "peer size "); 121 assertEquals(1, replicationLoad.get(peer1).size(), "load size "); 122 assertEquals(sizeOfLogQueue, replicationLoad.get(peer1).get(0).getSecond().getSizeOfLogQueue(), 123 "log queue size of peer1"); 124 assertEquals(replicationLag + 1, 125 replicationLoad.get(peer2).get(0).getSecond().getReplicationLag(), 126 "replication lag of peer2"); 127 master.stopMaster(); 128 } 129}