001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertTrue; 021 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.util.Optional; 025import java.util.concurrent.atomic.AtomicBoolean; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.HBaseTestingUtil; 031import org.apache.hadoop.hbase.HConstants; 032import org.apache.hadoop.hbase.NotServingRegionException; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.Waiter; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.Append; 037import org.apache.hadoop.hbase.client.BufferedMutator; 038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 039import org.apache.hadoop.hbase.client.Delete; 040import org.apache.hadoop.hbase.client.Durability; 041import org.apache.hadoop.hbase.client.Increment; 042import org.apache.hadoop.hbase.client.Put; 043import org.apache.hadoop.hbase.client.RegionInfo; 044import org.apache.hadoop.hbase.client.Result; 045import org.apache.hadoop.hbase.client.ResultScanner; 046import org.apache.hadoop.hbase.client.Scan; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.coprocessor.RegionObserver; 054import org.apache.hadoop.hbase.exceptions.DeserializationException; 055import org.apache.hadoop.hbase.filter.FilterBase; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.apache.hadoop.hbase.wal.WALEdit; 061import org.junit.jupiter.api.AfterEach; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.BeforeEach; 064import org.junit.jupiter.api.Tag; 065import org.junit.jupiter.api.Test; 066import org.junit.jupiter.api.TestInfo; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@Tag(RegionServerTests.TAG) 071@Tag(LargeTests.TAG) 072public class TestRegionInterrupt { 073 074 private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 075 private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class); 076 077 static final byte[] FAMILY = Bytes.toBytes("info"); 078 079 static long sleepTime; 080 081 @BeforeAll 082 public static void setUpBeforeClass() throws Exception { 083 Configuration conf = TEST_UTIL.getConfiguration(); 084 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 085 conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class); 086 conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); 087 // Ensure the sleep interval is long enough for interrupts to occur. 088 long waitInterval = 089 conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, HRegion.DEFAULT_CLOSE_WAIT_INTERVAL); 090 sleepTime = waitInterval * 2; 091 // Try to bound the running time of this unit if expected actions do not take place. 092 conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2); 093 } 094 095 @BeforeEach 096 public void setUp() throws Exception { 097 TEST_UTIL.startMiniCluster(); 098 } 099 100 @AfterEach 101 public void tearDown() throws Exception { 102 TEST_UTIL.shutdownMiniCluster(); 103 } 104 105 @Test 106 public void testCloseInterruptScanning(TestInfo testInfo) throws Exception { 107 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 108 LOG.info("Creating table " + tableName); 109 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 110 // load some data 111 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 112 TEST_UTIL.loadTable(table, FAMILY); 113 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 114 // scan the table in the background 115 Thread scanner = new Thread(new Runnable() { 116 @Override 117 public void run() { 118 Scan scan = new Scan(); 119 scan.addFamily(FAMILY); 120 scan.setFilter(new DelayingFilter()); 121 try { 122 LOG.info("Starting scan"); 123 try (ResultScanner rs = table.getScanner(scan)) { 124 Result r; 125 do { 126 r = rs.next(); 127 if (r != null) { 128 LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow())); 129 } 130 } while (r != null); 131 } 132 } catch (IOException e) { 133 LOG.info("Scanner caught exception", e); 134 expectedExceptionCaught.set(true); 135 } finally { 136 LOG.info("Finished scan"); 137 } 138 } 139 }); 140 scanner.start(); 141 142 // Wait for the filter to begin sleeping 143 LOG.info("Waiting for scanner to start"); 144 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() { 145 @Override 146 public boolean evaluate() throws Exception { 147 return DelayingFilter.isSleeping(); 148 } 149 }); 150 151 // Offline the table, this will trigger closing 152 LOG.info("Offlining table " + tableName); 153 TEST_UTIL.getAdmin().disableTable(tableName); 154 155 // Wait for scanner termination 156 scanner.join(); 157 158 // When we get here the region has closed and the table is offline 159 assertTrue(InterruptInterceptingHRegion.wasInterrupted(), 160 "Region operations were not interrupted"); 161 assertTrue(expectedExceptionCaught.get(), "Scanner did not catch expected exception"); 162 } 163 } 164 165 @Test 166 public void testCloseInterruptMutation(TestInfo testInfo) throws Exception { 167 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 168 final Admin admin = TEST_UTIL.getAdmin(); 169 // Create the test table 170 TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName) 171 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 172 .setCoprocessor(MutationDelayingCoprocessor.class.getName()).build(); 173 LOG.info("Creating table " + tableName); 174 admin.createTable(htd); 175 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 176 177 // Insert some data in the background 178 LOG.info("Starting writes to table " + tableName); 179 final int NUM_ROWS = 100; 180 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 181 Thread inserter = new Thread(new Runnable() { 182 @Override 183 public void run() { 184 try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) { 185 for (int i = 0; i < NUM_ROWS; i++) { 186 LOG.info("Writing row " + i + " to " + tableName); 187 byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i)); 188 Bytes.random(value); 189 t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value)); 190 t.flush(); 191 } 192 } catch (IOException e) { 193 LOG.info("Inserter caught exception", e); 194 expectedExceptionCaught.set(true); 195 } 196 } 197 }); 198 inserter.start(); 199 200 // Wait for delayed insertion to begin 201 LOG.info("Waiting for mutations to start"); 202 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() { 203 @Override 204 public boolean evaluate() throws Exception { 205 return MutationDelayingCoprocessor.isSleeping(); 206 } 207 }); 208 209 // Offline the table, this will trigger closing 210 LOG.info("Offlining table " + tableName); 211 admin.disableTable(tableName); 212 213 // Wait for the inserter to finish 214 inserter.join(); 215 216 // When we get here the region has closed and the table is offline 217 assertTrue(InterruptInterceptingHRegion.wasInterrupted(), 218 "Region operations were not interrupted"); 219 assertTrue(expectedExceptionCaught.get(), "Inserter did not catch expected exception"); 220 221 } 222 223 public static class InterruptInterceptingHRegion extends HRegion { 224 225 private static boolean interrupted = false; 226 227 public static boolean wasInterrupted() { 228 return interrupted; 229 } 230 231 public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration conf, 232 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 233 super(tableDir, wal, fs, conf, regionInfo, htd, rsServices); 234 } 235 236 public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf, 237 TableDescriptor htd, RegionServerServices rsServices) { 238 super(fs, wal, conf, htd, rsServices); 239 } 240 241 @Override 242 void checkInterrupt() throws NotServingRegionException, InterruptedIOException { 243 try { 244 super.checkInterrupt(); 245 } catch (NotServingRegionException | InterruptedIOException e) { 246 interrupted = true; 247 throw e; 248 } 249 } 250 251 @Override 252 IOException throwOnInterrupt(Throwable t) { 253 interrupted = true; 254 return super.throwOnInterrupt(t); 255 } 256 257 } 258 259 public static class DelayingFilter extends FilterBase { 260 261 static volatile boolean sleeping = false; 262 263 public static boolean isSleeping() { 264 return sleeping; 265 } 266 267 @Override 268 public ReturnCode filterCell(Cell v) throws IOException { 269 LOG.info("Starting sleep on " + v); 270 sleeping = true; 271 try { 272 Thread.sleep(sleepTime); 273 } catch (InterruptedException e) { 274 // restore interrupt status so region scanner can handle it as expected 275 Thread.currentThread().interrupt(); 276 LOG.info("Interrupted during sleep on " + v); 277 } finally { 278 LOG.info("Done sleep on " + v); 279 sleeping = false; 280 } 281 return ReturnCode.INCLUDE; 282 } 283 284 public static DelayingFilter parseFrom(final byte[] pbBytes) throws DeserializationException { 285 // Just return a new instance. 286 return new DelayingFilter(); 287 } 288 289 } 290 291 public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver { 292 293 static volatile boolean sleeping = false; 294 295 public static boolean isSleeping() { 296 return sleeping; 297 } 298 299 private void doSleep(Region.Operation op) { 300 LOG.info("Starting sleep for " + op); 301 sleeping = true; 302 try { 303 Thread.sleep(sleepTime); 304 } catch (InterruptedException e) { 305 // restore interrupt status so doMiniBatchMutation etc. can handle it as expected 306 Thread.currentThread().interrupt(); 307 LOG.info("Interrupted during " + op); 308 } finally { 309 LOG.info("Done"); 310 sleeping = false; 311 } 312 } 313 314 @Override 315 public Optional<RegionObserver> getRegionObserver() { 316 return Optional.of(this); 317 } 318 319 @Override 320 public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put, 321 WALEdit edit, Durability durability) throws IOException { 322 doSleep(Region.Operation.PUT); 323 RegionObserver.super.prePut(c, put, edit, durability); 324 } 325 326 @Override 327 public void preDelete(ObserverContext<? extends RegionCoprocessorEnvironment> c, Delete delete, 328 WALEdit edit, Durability durability) throws IOException { 329 doSleep(Region.Operation.DELETE); 330 RegionObserver.super.preDelete(c, delete, edit, durability); 331 } 332 333 @Override 334 public Result preAppend(ObserverContext<? extends RegionCoprocessorEnvironment> c, 335 Append append) throws IOException { 336 doSleep(Region.Operation.APPEND); 337 return RegionObserver.super.preAppend(c, append); 338 } 339 340 @Override 341 public Result preIncrement(ObserverContext<? extends RegionCoprocessorEnvironment> c, 342 Increment increment) throws IOException { 343 doSleep(Region.Operation.INCREMENT); 344 return RegionObserver.super.preIncrement(c, increment); 345 } 346 347 } 348 349}