001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.apache.hadoop.hbase.master.HMaster.HBASE_MASTER_RSPROC_DISPATCHER_CLASS; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.util.List; 024import java.util.stream.Collectors; 025import java.util.stream.IntStream; 026import org.apache.hadoop.hbase.HBaseTestingUtil; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.client.Table; 034import org.apache.hadoop.hbase.client.TableDescriptor; 035import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 036import org.apache.hadoop.hbase.master.HMaster; 037import org.apache.hadoop.hbase.master.hbck.HbckChore; 038import org.apache.hadoop.hbase.master.hbck.HbckReport; 039import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; 040import org.apache.hadoop.hbase.regionserver.HRegion; 041import org.apache.hadoop.hbase.regionserver.HRegionServer; 042import org.apache.hadoop.hbase.testclassification.LargeTests; 043import org.apache.hadoop.hbase.testclassification.MiscTests; 044import org.junit.jupiter.api.AfterAll; 045import org.junit.jupiter.api.BeforeAll; 046import org.junit.jupiter.api.BeforeEach; 047import org.junit.jupiter.api.Tag; 048import org.junit.jupiter.api.Test; 049import org.junit.jupiter.api.TestInfo; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 054 055/** 056 * Testing custom RSProcedureDispatcher to ensure retry limit can be imposed on certain errors. 057 */ 058@Tag(MiscTests.TAG) 059@Tag(LargeTests.TAG) 060public class TestProcDispatcher { 061 062 private static final Logger LOG = LoggerFactory.getLogger(TestProcDispatcher.class); 063 064 private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 065 private static ServerName rs0; 066 067 @BeforeAll 068 public static void setUpBeforeClass() throws Exception { 069 TEST_UTIL.getConfiguration().set(HBASE_MASTER_RSPROC_DISPATCHER_CLASS, 070 RSProcDispatcher.class.getName()); 071 TEST_UTIL.getConfiguration().setInt("hbase.master.rs.remote.proc.fail.fast.limit", 5); 072 TEST_UTIL.startMiniCluster(3); 073 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 074 rs0 = cluster.getRegionServer(0).getServerName(); 075 TEST_UTIL.getAdmin().balancerSwitch(false, true); 076 } 077 078 @AfterAll 079 public static void tearDownAfterClass() throws Exception { 080 TEST_UTIL.shutdownMiniCluster(); 081 } 082 083 @BeforeEach 084 public void setUp(TestInfo testInfo) throws Exception { 085 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 086 TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName) 087 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("fam1")).build(); 088 int startKey = 0; 089 int endKey = 80000; 090 TEST_UTIL.getAdmin().createTable(tableDesc, Bytes.toBytes(startKey), Bytes.toBytes(endKey), 9); 091 } 092 093 @Test 094 public void testRetryLimitOnConnClosedErrors(TestInfo testInfo) throws Exception { 095 HbckChore hbckChore = new HbckChore(TEST_UTIL.getHBaseCluster().getMaster()); 096 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 097 SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); 098 Admin admin = TEST_UTIL.getAdmin(); 099 Table table = TEST_UTIL.getConnection().getTable(tableName); 100 List<Put> puts = IntStream.range(10, 50000).mapToObj(i -> new Put(Bytes.toBytes(i)) 101 .addColumn(Bytes.toBytes("fam1"), Bytes.toBytes("q1"), Bytes.toBytes("val_" + i))) 102 .collect(Collectors.toList()); 103 table.put(puts); 104 admin.flush(tableName); 105 admin.compact(tableName); 106 Thread.sleep(3000); 107 HRegionServer hRegionServer0 = cluster.getRegionServer(0); 108 HRegionServer hRegionServer1 = cluster.getRegionServer(1); 109 HRegionServer hRegionServer2 = cluster.getRegionServer(2); 110 int numRegions0 = hRegionServer0.getNumberOfOnlineRegions(); 111 int numRegions1 = hRegionServer1.getNumberOfOnlineRegions(); 112 int numRegions2 = hRegionServer2.getNumberOfOnlineRegions(); 113 114 hbckChore.choreForTesting(); 115 HbckReport hbckReport = hbckChore.getLastReport(); 116 assertEquals(0, hbckReport.getInconsistentRegions().size()); 117 assertEquals(0, hbckReport.getOrphanRegionsOnFS().size()); 118 assertEquals(0, hbckReport.getOrphanRegionsOnRS().size()); 119 120 HRegion region0 = hRegionServer0.getRegions().get(0); 121 // move all regions from server1 to server0 122 for (HRegion region : hRegionServer1.getRegions()) { 123 TEST_UTIL.getAdmin().move(region.getRegionInfo().getEncodedNameAsBytes(), rs0); 124 } 125 TEST_UTIL.getAdmin().move(region0.getRegionInfo().getEncodedNameAsBytes()); 126 HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); 127 128 // Ensure: 129 // 1. num of regions before and after scheduling SCP remain same 130 // 2. all procedures including SCPs are successfully completed 131 // 3. two servers have SCPs scheduled 132 TEST_UTIL.waitFor(5000, 1000, () -> { 133 LOG.info("numRegions0: {} , numRegions1: {} , numRegions2: {}", numRegions0, numRegions1, 134 numRegions2); 135 LOG.info("Online regions - server0 : {} , server1: {} , server2: {}", 136 cluster.getRegionServer(0).getNumberOfOnlineRegions(), 137 cluster.getRegionServer(1).getNumberOfOnlineRegions(), 138 cluster.getRegionServer(2).getNumberOfOnlineRegions()); 139 LOG.info("Num of successfully completed procedures: {} , num of all procedures: {}", 140 master.getMasterProcedureExecutor().getProcedures().stream() 141 .filter(masterProcedureEnvProcedure -> masterProcedureEnvProcedure.getState() 142 == ProcedureProtos.ProcedureState.SUCCESS) 143 .count(), 144 master.getMasterProcedureExecutor().getProcedures().size()); 145 LOG.info("Num of SCPs: " + master.getMasterProcedureExecutor().getProcedures().stream() 146 .filter(proc -> proc instanceof ServerCrashProcedure).count()); 147 return (numRegions0 + numRegions1 + numRegions2) 148 == (cluster.getRegionServer(0).getNumberOfOnlineRegions() 149 + cluster.getRegionServer(1).getNumberOfOnlineRegions() 150 + cluster.getRegionServer(2).getNumberOfOnlineRegions()) 151 && master.getMasterProcedureExecutor().getProcedures().stream() 152 .filter(masterProcedureEnvProcedure -> masterProcedureEnvProcedure.getState() 153 == ProcedureProtos.ProcedureState.SUCCESS) 154 .count() == master.getMasterProcedureExecutor().getProcedures().size() 155 && master.getMasterProcedureExecutor().getProcedures().stream() 156 .anyMatch(proc -> proc instanceof ServerCrashProcedure); 157 }); 158 159 // Ensure we have no inconsistent regions 160 TEST_UTIL.waitFor(5000, 1000, () -> { 161 hbckChore.choreForTesting(); 162 HbckReport report = hbckChore.getLastReport(); 163 return report.getInconsistentRegions().isEmpty() && report.getOrphanRegionsOnFS().isEmpty() 164 && report.getOrphanRegionsOnRS().isEmpty(); 165 }); 166 167 } 168 169}