001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.regionserver;
020
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.util.Optional;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HColumnDescriptor;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.HTableDescriptor;
037import org.apache.hadoop.hbase.NotServingRegionException;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.TableNameTestRule;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Append;
043import org.apache.hadoop.hbase.client.BufferedMutator;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Durability;
046import org.apache.hadoop.hbase.client.Increment;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.ResultScanner;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.coprocessor.ObserverContext;
055import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
056import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
057import org.apache.hadoop.hbase.coprocessor.RegionObserver;
058import org.apache.hadoop.hbase.exceptions.DeserializationException;
059import org.apache.hadoop.hbase.filter.FilterBase;
060import org.apache.hadoop.hbase.testclassification.LargeTests;
061import org.apache.hadoop.hbase.testclassification.RegionServerTests;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.wal.WAL;
064import org.apache.hadoop.hbase.wal.WALEdit;
065import org.junit.After;
066import org.junit.Before;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Rule;
070import org.junit.Test;
071import org.junit.experimental.categories.Category;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({RegionServerTests.class, LargeTests.class})
076public class TestRegionInterrupt {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080    HBaseClassTestRule.forClass(TestRegionInterrupt.class);
081
082  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
083  private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
084
085  static final byte[] FAMILY = Bytes.toBytes("info");
086
087  static long sleepTime;
088
089  @Rule
090  public TableNameTestRule name = new TableNameTestRule();
091
092  @BeforeClass
093  public static void setUpBeforeClass() throws Exception {
094    Configuration conf = TEST_UTIL.getConfiguration();
095    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
096    conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
097    conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
098    // Ensure the sleep interval is long enough for interrupts to occur.
099    long waitInterval = conf.getLong(HRegion.CLOSE_WAIT_INTERVAL,
100      HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
101    sleepTime = waitInterval * 2;
102    // Try to bound the running time of this unit if expected actions do not take place.
103    conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
104  }
105
106  @Before
107  public void setUp() throws Exception {
108    TEST_UTIL.startMiniCluster();
109  }
110
111  @After
112  public void tearDown() throws Exception {
113    TEST_UTIL.shutdownMiniCluster();
114  }
115
116  @Test
117  public void testCloseInterruptScanning() throws Exception {
118    final TableName tableName = name.getTableName();
119    LOG.info("Creating table " + tableName);
120    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
121      // load some data
122      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
123      TEST_UTIL.loadTable(table, FAMILY);
124      final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
125      // scan the table in the background
126      Thread scanner = new Thread(new Runnable() {
127        @Override
128        public void run() {
129          Scan scan = new Scan();
130          scan.addFamily(FAMILY);
131          scan.setFilter(new DelayingFilter());
132          try {
133            LOG.info("Starting scan");
134            try (ResultScanner rs = table.getScanner(scan)) {
135              Result r;
136              do {
137                r = rs.next();
138                if (r != null) {
139                  LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
140                }
141              } while (r != null);
142            }
143          } catch (IOException e) {
144            LOG.info("Scanner caught exception", e);
145            expectedExceptionCaught.set(true);
146          } finally {
147            LOG.info("Finished scan");
148          }
149        }
150      });
151      scanner.start();
152
153      // Wait for the filter to begin sleeping
154      LOG.info("Waiting for scanner to start");
155      Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
156        @Override
157        public boolean evaluate() throws Exception {
158          return DelayingFilter.isSleeping();
159        }
160      });
161
162      // Offline the table, this will trigger closing
163      LOG.info("Offlining table " + tableName);
164      TEST_UTIL.getHBaseAdmin().disableTable(tableName);
165
166      // Wait for scanner termination
167      scanner.join();
168
169      // When we get here the region has closed and the table is offline
170      assertTrue("Region operations were not interrupted",
171        InterruptInterceptingHRegion.wasInterrupted());
172      assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get());
173    }
174  }
175
176  @Test
177  public void testCloseInterruptMutation() throws Exception {
178    final TableName tableName = name.getTableName();
179    final Admin admin = TEST_UTIL.getAdmin();
180    // Create the test table
181    HTableDescriptor htd = new HTableDescriptor(tableName);
182    htd.addFamily(new HColumnDescriptor(FAMILY));
183    htd.addCoprocessor(MutationDelayingCoprocessor.class.getName());
184    LOG.info("Creating table " + tableName);
185    admin.createTable(htd);
186    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
187
188    // Insert some data in the background
189    LOG.info("Starting writes to table " + tableName);
190    final int NUM_ROWS = 100;
191    final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
192    Thread inserter = new Thread(new Runnable() {
193      @Override
194      public void run() {
195        try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
196          for (int i = 0; i < NUM_ROWS; i++) {
197            LOG.info("Writing row " + i + " to " + tableName);
198            byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
199            Bytes.random(value);
200            t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
201            t.flush();
202          }
203        } catch (IOException e) {
204          LOG.info("Inserter caught exception", e);
205          expectedExceptionCaught.set(true);
206        }
207      }
208    });
209    inserter.start();
210
211    // Wait for delayed insertion to begin
212    LOG.info("Waiting for mutations to start");
213    Waiter.waitFor(TEST_UTIL.getConfiguration(), 10*1000, new Waiter.Predicate<Exception>() {
214      @Override
215      public boolean evaluate() throws Exception {
216        return MutationDelayingCoprocessor.isSleeping();
217      }
218    });
219
220    // Offline the table, this will trigger closing
221    LOG.info("Offlining table " + tableName);
222    admin.disableTable(tableName);
223
224    // Wait for the inserter to finish
225    inserter.join();
226
227    // When we get here the region has closed and the table is offline
228    assertTrue("Region operations were not interrupted",
229      InterruptInterceptingHRegion.wasInterrupted());
230    assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get());
231
232  }
233
234  public static class InterruptInterceptingHRegion extends HRegion {
235
236    private static boolean interrupted = false;
237
238    public static boolean wasInterrupted() {
239      return interrupted;
240    }
241
242    public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs,
243        Configuration conf, RegionInfo regionInfo, TableDescriptor htd,
244        RegionServerServices rsServices) {
245      super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
246    }
247
248    public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
249        TableDescriptor htd, RegionServerServices rsServices) {
250      super(fs, wal, conf, htd, rsServices);
251    }
252
253    @Override
254    void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
255      try {
256        super.checkInterrupt();
257      } catch (NotServingRegionException | InterruptedIOException e) {
258        interrupted = true;
259        throw e;
260      }
261    }
262
263    @Override
264    IOException throwOnInterrupt(Throwable t) {
265      interrupted = true;
266      return super.throwOnInterrupt(t);
267    }
268
269  }
270
271  public static class DelayingFilter extends FilterBase {
272
273    static volatile boolean sleeping = false;
274
275    public static boolean isSleeping() {
276      return sleeping;
277    }
278
279    @Override
280    public ReturnCode filterCell(Cell v) throws IOException {
281      LOG.info("Starting sleep on " + v);
282      sleeping = true;
283      try {
284        Thread.sleep(sleepTime);
285      } catch (InterruptedException e) {
286        // restore interrupt status so region scanner can handle it as expected
287        Thread.currentThread().interrupt();
288        LOG.info("Interrupted during sleep on " + v);
289      } finally {
290        LOG.info("Done sleep on " + v);
291        sleeping = false;
292      }
293      return ReturnCode.INCLUDE;
294    }
295
296    public static DelayingFilter parseFrom(final byte [] pbBytes)
297        throws DeserializationException {
298      // Just return a new instance.
299      return new DelayingFilter();
300    }
301
302  }
303
304  public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
305
306    static volatile boolean sleeping = false;
307
308    public static boolean isSleeping() {
309      return sleeping;
310    }
311
312    private void doSleep(Region.Operation op) {
313      LOG.info("Starting sleep for " + op);
314      sleeping = true;
315      try {
316        Thread.sleep(sleepTime);
317      } catch (InterruptedException e) {
318        // restore interrupt status so doMiniBatchMutation etc. can handle it as expected
319        Thread.currentThread().interrupt();
320        LOG.info("Interrupted during " + op);
321      } finally {
322        LOG.info("Done");
323        sleeping = false;
324      }
325    }
326
327    @Override
328    public Optional<RegionObserver> getRegionObserver() {
329      return Optional.of(this);
330    }
331
332    @Override
333    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
334        Durability durability) throws IOException {
335      doSleep(Region.Operation.PUT);
336      RegionObserver.super.prePut(c, put, edit, durability);
337    }
338
339    @Override
340    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
341        WALEdit edit, Durability durability) throws IOException {
342      doSleep(Region.Operation.DELETE);
343      RegionObserver.super.preDelete(c, delete, edit, durability);
344    }
345
346    @Override
347    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
348        throws IOException {
349      doSleep(Region.Operation.APPEND);
350      return RegionObserver.super.preAppend(c, append);
351    }
352
353    @Override
354    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
355        throws IOException {
356      doSleep(Region.Operation.INCREMENT);
357      return RegionObserver.super.preIncrement(c, increment);
358    }
359
360  }
361
362}