001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Optional; 024import java.util.concurrent.Future; 025import java.util.concurrent.Semaphore; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseTestingUtil; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; 031import org.apache.hadoop.hbase.client.AsyncAdmin; 032import org.apache.hadoop.hbase.client.BalanceRequest; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Durability; 035import org.apache.hadoop.hbase.client.Get; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 039import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 040import org.apache.hadoop.hbase.coprocessor.ObserverContext; 041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 043import org.apache.hadoop.hbase.coprocessor.RegionObserver; 044import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 045import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 046import org.apache.hadoop.hbase.regionserver.HRegionServer; 047import org.apache.hadoop.hbase.testclassification.LargeTests; 048import org.apache.hadoop.hbase.testclassification.MasterTests; 049import org.apache.hadoop.hbase.util.Bytes; 050import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; 051import org.apache.hadoop.hbase.wal.WALEdit; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.BeforeAll; 054import org.junit.jupiter.api.Tag; 055import org.junit.jupiter.api.Test; 056 057import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 058 059/** 060 * Test to ensure that the priority for procedures and stuck checker can partially solve the problem 061 * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be executed within a certain 062 * period of time. 063 * <p> 064 * As of HBASE-28199, we no longer block a worker when updating meta now, so this test can not test 065 * adding procedure worker now, but it could still be used to make sure that we could make progress 066 * when meta is gone and we have a lot of pending TRSPs. 067 */ 068@Tag(MasterTests.TAG) 069@Tag(LargeTests.TAG) 070public class TestProcedurePriority { 071 072 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 073 074 private static String TABLE_NAME_PREFIX = "TestProcedurePriority-"; 075 076 private static byte[] CF = Bytes.toBytes("cf"); 077 078 private static byte[] CQ = Bytes.toBytes("cq"); 079 080 private static int CORE_POOL_SIZE; 081 082 private static int TABLE_COUNT; 083 084 private static volatile boolean FAIL = false; 085 086 public static final class MyCP implements RegionObserver, RegionCoprocessor { 087 088 @Override 089 public Optional<RegionObserver> getRegionObserver() { 090 return Optional.of(this); 091 } 092 093 @Override 094 public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c, Get get, 095 List<Cell> result) throws IOException { 096 if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { 097 throw new IOException("Inject error"); 098 } 099 } 100 101 @Override 102 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 103 WALEdit edit, Durability durability) throws IOException { 104 if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) { 105 throw new IOException("Inject error"); 106 } 107 } 108 } 109 110 @BeforeAll 111 public static void setUp() throws Exception { 112 UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000); 113 UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4); 114 UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName()); 115 UTIL.startMiniCluster(3); 116 CORE_POOL_SIZE = 117 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize(); 118 TABLE_COUNT = 50 * CORE_POOL_SIZE; 119 List<Future<?>> futures = new ArrayList<>(); 120 AsyncAdmin admin = UTIL.getAsyncConnection().getAdmin(); 121 Semaphore concurrency = new Semaphore(10); 122 for (int i = 0; i < TABLE_COUNT; i++) { 123 concurrency.acquire(); 124 futures.add(admin 125 .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX + i)) 126 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build()) 127 .whenComplete((r, e) -> concurrency.release())); 128 } 129 for (Future<?> future : futures) { 130 future.get(3, TimeUnit.MINUTES); 131 } 132 UTIL.getAdmin().balance(BalanceRequest.newBuilder().setIgnoreRegionsInTransition(true).build()); 133 UTIL.waitUntilNoRegionsInTransition(); 134 UTIL.getAdmin().balancerSwitch(false, true); 135 } 136 137 @AfterAll 138 public static void tearDown() throws Exception { 139 UTIL.shutdownMiniCluster(); 140 } 141 142 @Test 143 public void test() throws Exception { 144 RegionServerThread rsWithMetaThread = UTIL.getMiniHBaseCluster().getRegionServerThreads() 145 .stream().filter(t -> !t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty()) 146 .findAny().get(); 147 HRegionServer rsNoMeta = UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer()); 148 FAIL = true; 149 UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName()); 150 ProcedureExecutor<?> executor = 151 UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); 152 // wait until we have way more TRSPs than the core pool size, and then make sure we can recover 153 // normally 154 UTIL.waitFor(60000, new ExplainingPredicate<Exception>() { 155 156 @Override 157 public boolean evaluate() throws Exception { 158 return executor.getProcedures().stream().filter(p -> !p.isFinished()) 159 .filter(p -> p.getState() != ProcedureState.INITIALIZING) 160 .filter(p -> p instanceof TransitRegionStateProcedure).count() > 5 * CORE_POOL_SIZE; 161 } 162 163 @Override 164 public String explainFailure() throws Exception { 165 return "Not enough TRSPs scheduled"; 166 } 167 }); 168 // sleep more time to make sure the TRSPs have been executed 169 Thread.sleep(10000); 170 UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName()); 171 rsWithMetaThread.join(); 172 FAIL = false; 173 // verify that the cluster is back 174 UTIL.waitUntilNoRegionsInTransition(480000); 175 for (int i = 0; i < TABLE_COUNT; i++) { 176 try (Table table = UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) { 177 table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); 178 } 179 } 180 UTIL.waitFor(60000, new ExplainingPredicate<Exception>() { 181 182 @Override 183 public boolean evaluate() throws Exception { 184 return executor.getWorkerThreadCount() == CORE_POOL_SIZE; 185 } 186 187 @Override 188 public String explainFailure() throws Exception { 189 return "The new workers do not timeout"; 190 } 191 }); 192 } 193}