001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022 023import java.io.IOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicReference; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.HRegionLocation; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.StartTestingClusterOption; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.RegionInfo; 038import org.apache.hadoop.hbase.client.RetriesExhaustedException; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.client.TableDescriptor; 041import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 042import org.apache.hadoop.hbase.coprocessor.ObserverContext; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 044import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 045import org.apache.hadoop.hbase.coprocessor.RegionObserver; 046import org.apache.hadoop.hbase.ipc.CallRunner; 047import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue; 048import org.apache.hadoop.hbase.ipc.PriorityFunction; 049import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl; 050import org.apache.hadoop.hbase.testclassification.MediumTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.JVMClusterUtil; 054import org.apache.hadoop.hbase.util.Threads; 055import org.junit.jupiter.api.AfterAll; 056import org.junit.jupiter.api.BeforeAll; 057import org.junit.jupiter.api.Tag; 058import org.junit.jupiter.api.Test; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062@Tag(RegionServerTests.TAG) 063@Tag(MediumTests.TAG) 064public class TestRegionServerRejectDuringAbort { 065 066 private static final Logger LOG = 067 LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class); 068 069 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 070 071 private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort"); 072 073 private static byte[] CF = Bytes.toBytes("cf"); 074 075 private static final int REGIONS_NUM = 5; 076 077 private static final AtomicReference<Exception> THROWN_EXCEPTION = new AtomicReference<>(null); 078 079 private static volatile boolean shouldThrowTooBig = false; 080 081 @BeforeAll 082 public static void setUp() throws Exception { 083 // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms 084 UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", "pluggable"); 085 UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name", 086 CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class); 087 StartTestingClusterOption option = 088 StartTestingClusterOption.builder().numRegionServers(2).build(); 089 UTIL.startMiniCluster(option); 090 TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME) 091 .setCoprocessor(SleepWhenCloseCoprocessor.class.getName()) 092 .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build(); 093 UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), REGIONS_NUM); 094 } 095 096 public static final class CallQueueTooBigThrowingQueue extends TestPluggableQueueImpl { 097 098 public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction priority, 099 Configuration conf) { 100 super(maxQueueLength, priority, conf); 101 } 102 103 @Override 104 public boolean offer(CallRunner callRunner) { 105 if (shouldThrowTooBig && callRunner.getRpcCall().getRequestAttribute("test") != null) { 106 return false; 107 } 108 return super.offer(callRunner); 109 } 110 } 111 112 @AfterAll 113 public static void tearDown() throws Exception { 114 UTIL.shutdownMiniCluster(); 115 } 116 117 /** 118 * Tests that the logic in ServerRpcConnection works such that if the server is aborted, it short 119 * circuits any other logic. This means we no longer even attempt to enqueue the request onto the 120 * call queue. We verify this by using a special call queue which we can trigger to always return 121 * CallQueueTooBigException. If the logic works, despite forcing those exceptions, we should not 122 * see them. 123 */ 124 @Test 125 public void testRejectRequestsOnAbort() throws Exception { 126 // We don't want to disrupt the server carrying meta, because we plan to disrupt requests to 127 // the server. Disrupting meta requests messes with the test. 128 HRegionServer serverWithoutMeta = null; 129 for (JVMClusterUtil.RegionServerThread regionServerThread : UTIL.getMiniHBaseCluster() 130 .getRegionServerThreads()) { 131 HRegionServer regionServer = regionServerThread.getRegionServer(); 132 if ( 133 regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty() 134 && !regionServer.getRegions(TABLE_NAME).isEmpty() 135 ) { 136 serverWithoutMeta = regionServer; 137 break; 138 } 139 } 140 141 assertNotNull(serverWithoutMeta, 142 "couldn't find a server without meta, but with test table regions"); 143 144 Thread writer = new Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName())); 145 writer.setDaemon(true); 146 writer.start(); 147 148 // Trigger the abort. Our WriterThread will detect the first RegionServerAbortedException 149 // and trigger our custom queue to reject any more requests. This would typically result in 150 // CallQueueTooBigException, unless our logic in ServerRpcConnection to preempt the processing 151 // of a request is working. 152 serverWithoutMeta.abort("Abort RS for test"); 153 154 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 155 assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), RegionServerAbortedException.class); 156 } 157 158 private Runnable getWriterThreadRunnable(ServerName loadServer) { 159 return () -> { 160 try { 161 Configuration conf = UTIL.getConfiguration(); 162 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); 163 try (Connection conn = ConnectionFactory.createConnection(conf); 164 Table table = conn.getTableBuilder(TABLE_NAME, null) 165 .setRequestAttribute("test", new byte[] { 0 }).build()) { 166 // find the first region to exist on our test server, then submit requests to it 167 for (HRegionLocation regionLocation : table.getRegionLocator().getAllRegionLocations()) { 168 if (regionLocation.getServerName().equals(loadServer)) { 169 submitRequestsToRegion(table, regionLocation.getRegion()); 170 return; 171 } 172 } 173 throw new RuntimeException("Failed to find any regions for loadServer " + loadServer); 174 } 175 } catch (Exception e) { 176 LOG.warn("Failed to load data", e); 177 synchronized (THROWN_EXCEPTION) { 178 THROWN_EXCEPTION.set(e); 179 THROWN_EXCEPTION.notifyAll(); 180 } 181 } 182 }; 183 } 184 185 private void submitRequestsToRegion(Table table, RegionInfo regionInfo) throws IOException { 186 // We will block closes of the regions with a CP, so no need to worry about the region getting 187 // reassigned. Just use the same rowkey always. 188 byte[] rowKey = getRowKeyWithin(regionInfo); 189 190 int i = 0; 191 while (true) { 192 try { 193 i++; 194 table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), Bytes.toBytes(i))); 195 } catch (IOException e) { 196 // only catch RegionServerAbortedException once. After that, the next exception thrown 197 // is our test case 198 if ( 199 !shouldThrowTooBig && e instanceof RetriesExhaustedException 200 && e.getCause() instanceof RegionServerAbortedException 201 ) { 202 shouldThrowTooBig = true; 203 } else { 204 throw e; 205 } 206 } 207 208 // small sleep to relieve pressure 209 Threads.sleep(10); 210 } 211 } 212 213 private byte[] getRowKeyWithin(RegionInfo regionInfo) { 214 byte[] rowKey; 215 // region is start of table, find one after start key 216 if (regionInfo.getStartKey().length == 0) { 217 if (regionInfo.getEndKey().length == 0) { 218 // doesn't matter, single region table 219 return Bytes.toBytes(1); 220 } else { 221 // find a row just before endkey 222 rowKey = Bytes.copy(regionInfo.getEndKey()); 223 rowKey[rowKey.length - 1]--; 224 return rowKey; 225 } 226 } else { 227 return regionInfo.getStartKey(); 228 } 229 } 230 231 public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, RegionObserver { 232 233 public SleepWhenCloseCoprocessor() { 234 } 235 236 @Override 237 public Optional<RegionObserver> getRegionObserver() { 238 return Optional.of(this); 239 } 240 241 @Override 242 public void preClose(ObserverContext<? extends RegionCoprocessorEnvironment> c, 243 boolean abortRequested) throws IOException { 244 // Wait so that the region can't close until we get the information we need from our test 245 UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null); 246 } 247 } 248}