001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertTrue;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.NotServingRegionException;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.Append;
037import org.apache.hadoop.hbase.client.BufferedMutator;
038import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
039import org.apache.hadoop.hbase.client.Delete;
040import org.apache.hadoop.hbase.client.Durability;
041import org.apache.hadoop.hbase.client.Increment;
042import org.apache.hadoop.hbase.client.Put;
043import org.apache.hadoop.hbase.client.RegionInfo;
044import org.apache.hadoop.hbase.client.Result;
045import org.apache.hadoop.hbase.client.ResultScanner;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
053import org.apache.hadoop.hbase.coprocessor.RegionObserver;
054import org.apache.hadoop.hbase.exceptions.DeserializationException;
055import org.apache.hadoop.hbase.filter.FilterBase;
056import org.apache.hadoop.hbase.testclassification.LargeTests;
057import org.apache.hadoop.hbase.testclassification.RegionServerTests;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.wal.WAL;
060import org.apache.hadoop.hbase.wal.WALEdit;
061import org.junit.jupiter.api.AfterEach;
062import org.junit.jupiter.api.BeforeAll;
063import org.junit.jupiter.api.BeforeEach;
064import org.junit.jupiter.api.Tag;
065import org.junit.jupiter.api.Test;
066import org.junit.jupiter.api.TestInfo;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070@Tag(RegionServerTests.TAG)
071@Tag(LargeTests.TAG)
072public class TestRegionInterrupt {
073
074  private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
075  private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
076
077  static final byte[] FAMILY = Bytes.toBytes("info");
078
079  static long sleepTime;
080
081  @BeforeAll
082  public static void setUpBeforeClass() throws Exception {
083    Configuration conf = TEST_UTIL.getConfiguration();
084    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
085    conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
086    conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
087    // Ensure the sleep interval is long enough for interrupts to occur.
088    long waitInterval =
089      conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
090    sleepTime = waitInterval * 2;
091    // Try to bound the running time of this unit if expected actions do not take place.
092    conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
093  }
094
095  @BeforeEach
096  public void setUp() throws Exception {
097    TEST_UTIL.startMiniCluster();
098  }
099
100  @AfterEach
101  public void tearDown() throws Exception {
102    TEST_UTIL.shutdownMiniCluster();
103  }
104
105  @Test
106  public void testCloseInterruptScanning(TestInfo testInfo) throws Exception {
107    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
108    LOG.info("Creating table " + tableName);
109    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
110      // load some data
111      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
112      TEST_UTIL.loadTable(table, FAMILY);
113      final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
114      // scan the table in the background
115      Thread scanner = new Thread(new Runnable() {
116        @Override
117        public void run() {
118          Scan scan = new Scan();
119          scan.addFamily(FAMILY);
120          scan.setFilter(new DelayingFilter());
121          try {
122            LOG.info("Starting scan");
123            try (ResultScanner rs = table.getScanner(scan)) {
124              Result r;
125              do {
126                r = rs.next();
127                if (r != null) {
128                  LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
129                }
130              } while (r != null);
131            }
132          } catch (IOException e) {
133            LOG.info("Scanner caught exception", e);
134            expectedExceptionCaught.set(true);
135          } finally {
136            LOG.info("Finished scan");
137          }
138        }
139      });
140      scanner.start();
141
142      // Wait for the filter to begin sleeping
143      LOG.info("Waiting for scanner to start");
144      Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() {
145        @Override
146        public boolean evaluate() throws Exception {
147          return DelayingFilter.isSleeping();
148        }
149      });
150
151      // Offline the table, this will trigger closing
152      LOG.info("Offlining table " + tableName);
153      TEST_UTIL.getAdmin().disableTable(tableName);
154
155      // Wait for scanner termination
156      scanner.join();
157
158      // When we get here the region has closed and the table is offline
159      assertTrue(InterruptInterceptingHRegion.wasInterrupted(),
160        "Region operations were not interrupted");
161      assertTrue(expectedExceptionCaught.get(), "Scanner did not catch expected exception");
162    }
163  }
164
165  @Test
166  public void testCloseInterruptMutation(TestInfo testInfo) throws Exception {
167    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
168    final Admin admin = TEST_UTIL.getAdmin();
169    // Create the test table
170    TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
171      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
172      .setCoprocessor(MutationDelayingCoprocessor.class.getName()).build();
173    LOG.info("Creating table " + tableName);
174    admin.createTable(htd);
175    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
176
177    // Insert some data in the background
178    LOG.info("Starting writes to table " + tableName);
179    final int NUM_ROWS = 100;
180    final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
181    Thread inserter = new Thread(new Runnable() {
182      @Override
183      public void run() {
184        try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
185          for (int i = 0; i < NUM_ROWS; i++) {
186            LOG.info("Writing row " + i + " to " + tableName);
187            byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
188            Bytes.random(value);
189            t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
190            t.flush();
191          }
192        } catch (IOException e) {
193          LOG.info("Inserter caught exception", e);
194          expectedExceptionCaught.set(true);
195        }
196      }
197    });
198    inserter.start();
199
200    // Wait for delayed insertion to begin
201    LOG.info("Waiting for mutations to start");
202    Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() {
203      @Override
204      public boolean evaluate() throws Exception {
205        return MutationDelayingCoprocessor.isSleeping();
206      }
207    });
208
209    // Offline the table, this will trigger closing
210    LOG.info("Offlining table " + tableName);
211    admin.disableTable(tableName);
212
213    // Wait for the inserter to finish
214    inserter.join();
215
216    // When we get here the region has closed and the table is offline
217    assertTrue(InterruptInterceptingHRegion.wasInterrupted(),
218      "Region operations were not interrupted");
219    assertTrue(expectedExceptionCaught.get(), "Inserter did not catch expected exception");
220
221  }
222
223  public static class InterruptInterceptingHRegion extends HRegion {
224
225    private static boolean interrupted = false;
226
227    public static boolean wasInterrupted() {
228      return interrupted;
229    }
230
231    public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration conf,
232      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
233      super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
234    }
235
236    public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
237      TableDescriptor htd, RegionServerServices rsServices) {
238      super(fs, wal, conf, htd, rsServices);
239    }
240
241    @Override
242    void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
243      try {
244        super.checkInterrupt();
245      } catch (NotServingRegionException | InterruptedIOException e) {
246        interrupted = true;
247        throw e;
248      }
249    }
250
251    @Override
252    IOException throwOnInterrupt(Throwable t) {
253      interrupted = true;
254      return super.throwOnInterrupt(t);
255    }
256
257  }
258
259  public static class DelayingFilter extends FilterBase {
260
261    static volatile boolean sleeping = false;
262
263    public static boolean isSleeping() {
264      return sleeping;
265    }
266
267    @Override
268    public ReturnCode filterCell(Cell v) throws IOException {
269      LOG.info("Starting sleep on " + v);
270      sleeping = true;
271      try {
272        Thread.sleep(sleepTime);
273      } catch (InterruptedException e) {
274        // restore interrupt status so region scanner can handle it as expected
275        Thread.currentThread().interrupt();
276        LOG.info("Interrupted during sleep on " + v);
277      } finally {
278        LOG.info("Done sleep on " + v);
279        sleeping = false;
280      }
281      return ReturnCode.INCLUDE;
282    }
283
284    public static DelayingFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
285      // Just return a new instance.
286      return new DelayingFilter();
287    }
288
289  }
290
291  public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
292
293    static volatile boolean sleeping = false;
294
295    public static boolean isSleeping() {
296      return sleeping;
297    }
298
299    private void doSleep(Region.Operation op) {
300      LOG.info("Starting sleep for " + op);
301      sleeping = true;
302      try {
303        Thread.sleep(sleepTime);
304      } catch (InterruptedException e) {
305        // restore interrupt status so doMiniBatchMutation etc. can handle it as expected
306        Thread.currentThread().interrupt();
307        LOG.info("Interrupted during " + op);
308      } finally {
309        LOG.info("Done");
310        sleeping = false;
311      }
312    }
313
314    @Override
315    public Optional<RegionObserver> getRegionObserver() {
316      return Optional.of(this);
317    }
318
319    @Override
320    public void prePut(ObserverContext<? extends RegionCoprocessorEnvironment> c, Put put,
321      WALEdit edit, Durability durability) throws IOException {
322      doSleep(Region.Operation.PUT);
323      RegionObserver.super.prePut(c, put, edit, durability);
324    }
325
326    @Override
327    public void preDelete(ObserverContext<? extends RegionCoprocessorEnvironment> c, Delete delete,
328      WALEdit edit, Durability durability) throws IOException {
329      doSleep(Region.Operation.DELETE);
330      RegionObserver.super.preDelete(c, delete, edit, durability);
331    }
332
333    @Override
334    public Result preAppend(ObserverContext<? extends RegionCoprocessorEnvironment> c,
335      Append append) throws IOException {
336      doSleep(Region.Operation.APPEND);
337      return RegionObserver.super.preAppend(c, append);
338    }
339
340    @Override
341    public Result preIncrement(ObserverContext<? extends RegionCoprocessorEnvironment> c,
342      Increment increment) throws IOException {
343      doSleep(Region.Operation.INCREMENT);
344      return RegionObserver.super.preIncrement(c, increment);
345    }
346
347  }
348
349}