001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.regionserver; 020 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.util.Optional; 026import java.util.concurrent.atomic.AtomicBoolean; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HColumnDescriptor; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HTableDescriptor; 037import org.apache.hadoop.hbase.NotServingRegionException; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.TableNameTestRule; 040import org.apache.hadoop.hbase.Waiter; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.Append; 043import org.apache.hadoop.hbase.client.BufferedMutator; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Durability; 046import org.apache.hadoop.hbase.client.Increment; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.Result; 050import org.apache.hadoop.hbase.client.ResultScanner; 051import org.apache.hadoop.hbase.client.Scan; 052import org.apache.hadoop.hbase.client.Table; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.coprocessor.ObserverContext; 055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 057import org.apache.hadoop.hbase.coprocessor.RegionObserver; 058import org.apache.hadoop.hbase.exceptions.DeserializationException; 059import org.apache.hadoop.hbase.filter.FilterBase; 060import org.apache.hadoop.hbase.testclassification.LargeTests; 061import org.apache.hadoop.hbase.testclassification.RegionServerTests; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.wal.WAL; 064import org.apache.hadoop.hbase.wal.WALEdit; 065import org.junit.After; 066import org.junit.Before; 067import org.junit.BeforeClass; 068import org.junit.ClassRule; 069import org.junit.Rule; 070import org.junit.Test; 071import org.junit.experimental.categories.Category; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075@Category({RegionServerTests.class, LargeTests.class}) 076public class TestRegionInterrupt { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestRegionInterrupt.class); 081 082 private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 083 private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class); 084 085 static final byte[] FAMILY = Bytes.toBytes("info"); 086 087 static long sleepTime; 088 089 @Rule 090 public TableNameTestRule name = new TableNameTestRule(); 091 092 @BeforeClass 093 public static void setUpBeforeClass() throws Exception { 094 Configuration conf = TEST_UTIL.getConfiguration(); 095 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 096 conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class); 097 conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true); 098 // Ensure the sleep interval is long enough for interrupts to occur. 099 long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, 100 HRegion.DEFAULT_CLOSE_WAIT_INTERVAL); 101 sleepTime = waitInterval * 2; 102 // Try to bound the running time of this unit if expected actions do not take place. 103 conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2); 104 } 105 106 @Before 107 public void setUp() throws Exception { 108 TEST_UTIL.startMiniCluster(); 109 } 110 111 @After 112 public void tearDown() throws Exception { 113 TEST_UTIL.shutdownMiniCluster(); 114 } 115 116 @Test 117 public void testCloseInterruptScanning() throws Exception { 118 final TableName tableName = name.getTableName(); 119 LOG.info("Creating table " + tableName); 120 try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { 121 // load some data 122 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 123 TEST_UTIL.loadTable(table, FAMILY); 124 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 125 // scan the table in the background 126 Thread scanner = new Thread(new Runnable() { 127 @Override 128 public void run() { 129 Scan scan = new Scan(); 130 scan.addFamily(FAMILY); 131 scan.setFilter(new DelayingFilter()); 132 try { 133 LOG.info("Starting scan"); 134 try (ResultScanner rs = table.getScanner(scan)) { 135 Result r; 136 do { 137 r = rs.next(); 138 if (r != null) { 139 LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow())); 140 } 141 } while (r != null); 142 } 143 } catch (IOException e) { 144 LOG.info("Scanner caught exception", e); 145 expectedExceptionCaught.set(true); 146 } finally { 147 LOG.info("Finished scan"); 148 } 149 } 150 }); 151 scanner.start(); 152 153 // Wait for the filter to begin sleeping 154 LOG.info("Waiting for scanner to start"); 155 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() { 156 @Override 157 public boolean evaluate() throws Exception { 158 return DelayingFilter.isSleeping(); 159 } 160 }); 161 162 // Offline the table, this will trigger closing 163 LOG.info("Offlining table " + tableName); 164 TEST_UTIL.getHBaseAdmin().disableTable(tableName); 165 166 // Wait for scanner termination 167 scanner.join(); 168 169 // When we get here the region has closed and the table is offline 170 assertTrue("Region operations were not interrupted", 171 InterruptInterceptingHRegion.wasInterrupted()); 172 assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get()); 173 } 174 } 175 176 @Test 177 public void testCloseInterruptMutation() throws Exception { 178 final TableName tableName = name.getTableName(); 179 final Admin admin = TEST_UTIL.getAdmin(); 180 // Create the test table 181 HTableDescriptor htd = new HTableDescriptor(tableName); 182 htd.addFamily(new HColumnDescriptor(FAMILY)); 183 htd.addCoprocessor(MutationDelayingCoprocessor.class.getName()); 184 LOG.info("Creating table " + tableName); 185 admin.createTable(htd); 186 TEST_UTIL.waitUntilAllRegionsAssigned(tableName); 187 188 // Insert some data in the background 189 LOG.info("Starting writes to table " + tableName); 190 final int NUM_ROWS = 100; 191 final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false); 192 Thread inserter = new Thread(new Runnable() { 193 @Override 194 public void run() { 195 try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) { 196 for (int i = 0; i < NUM_ROWS; i++) { 197 LOG.info("Writing row " + i + " to " + tableName); 198 byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i)); 199 Bytes.random(value); 200 t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value)); 201 t.flush(); 202 } 203 } catch (IOException e) { 204 LOG.info("Inserter caught exception", e); 205 expectedExceptionCaught.set(true); 206 } 207 } 208 }); 209 inserter.start(); 210 211 // Wait for delayed insertion to begin 212 LOG.info("Waiting for mutations to start"); 213 Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() { 214 @Override 215 public boolean evaluate() throws Exception { 216 return MutationDelayingCoprocessor.isSleeping(); 217 } 218 }); 219 220 // Offline the table, this will trigger closing 221 LOG.info("Offlining table " + tableName); 222 admin.disableTable(tableName); 223 224 // Wait for the inserter to finish 225 inserter.join(); 226 227 // When we get here the region has closed and the table is offline 228 assertTrue("Region operations were not interrupted", 229 InterruptInterceptingHRegion.wasInterrupted()); 230 assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get()); 231 232 } 233 234 public static class InterruptInterceptingHRegion extends HRegion { 235 236 private static boolean interrupted = false; 237 238 public static boolean wasInterrupted() { 239 return interrupted; 240 } 241 242 public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, 243 Configuration conf, RegionInfo regionInfo, TableDescriptor htd, 244 RegionServerServices rsServices) { 245 super(tableDir, wal, fs, conf, regionInfo, htd, rsServices); 246 } 247 248 public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf, 249 TableDescriptor htd, RegionServerServices rsServices) { 250 super(fs, wal, conf, htd, rsServices); 251 } 252 253 @Override 254 void checkInterrupt() throws NotServingRegionException, InterruptedIOException { 255 try { 256 super.checkInterrupt(); 257 } catch (NotServingRegionException | InterruptedIOException e) { 258 interrupted = true; 259 throw e; 260 } 261 } 262 263 @Override 264 IOException throwOnInterrupt(Throwable t) { 265 interrupted = true; 266 return super.throwOnInterrupt(t); 267 } 268 269 } 270 271 public static class DelayingFilter extends FilterBase { 272 273 static volatile boolean sleeping = false; 274 275 public static boolean isSleeping() { 276 return sleeping; 277 } 278 279 @Override 280 public ReturnCode filterCell(Cell v) throws IOException { 281 LOG.info("Starting sleep on " + v); 282 sleeping = true; 283 try { 284 Thread.sleep(sleepTime); 285 } catch (InterruptedException e) { 286 // restore interrupt status so region scanner can handle it as expected 287 Thread.currentThread().interrupt(); 288 LOG.info("Interrupted during sleep on " + v); 289 } finally { 290 LOG.info("Done sleep on " + v); 291 sleeping = false; 292 } 293 return ReturnCode.INCLUDE; 294 } 295 296 public static DelayingFilter parseFrom(final byte [] pbBytes) 297 throws DeserializationException { 298 // Just return a new instance. 299 return new DelayingFilter(); 300 } 301 302 } 303 304 public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver { 305 306 static volatile boolean sleeping = false; 307 308 public static boolean isSleeping() { 309 return sleeping; 310 } 311 312 private void doSleep(Region.Operation op) { 313 LOG.info("Starting sleep for " + op); 314 sleeping = true; 315 try { 316 Thread.sleep(sleepTime); 317 } catch (InterruptedException e) { 318 // restore interrupt status so doMiniBatchMutation etc. can handle it as expected 319 Thread.currentThread().interrupt(); 320 LOG.info("Interrupted during " + op); 321 } finally { 322 LOG.info("Done"); 323 sleeping = false; 324 } 325 } 326 327 @Override 328 public Optional<RegionObserver> getRegionObserver() { 329 return Optional.of(this); 330 } 331 332 @Override 333 public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, 334 Durability durability) throws IOException { 335 doSleep(Region.Operation.PUT); 336 RegionObserver.super.prePut(c, put, edit, durability); 337 } 338 339 @Override 340 public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, 341 WALEdit edit, Durability durability) throws IOException { 342 doSleep(Region.Operation.DELETE); 343 RegionObserver.super.preDelete(c, delete, edit, durability); 344 } 345 346 @Override 347 public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) 348 throws IOException { 349 doSleep(Region.Operation.APPEND); 350 return RegionObserver.super.preAppend(c, append); 351 } 352 353 @Override 354 public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment) 355 throws IOException { 356 doSleep(Region.Operation.INCREMENT); 357 return RegionObserver.super.preIncrement(c, increment); 358 } 359 360 } 361 362}