001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.Optional;
025import java.util.concurrent.atomic.AtomicBoolean;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Cell;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseTestingUtility;
032import org.apache.hadoop.hbase.HColumnDescriptor;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.HTableDescriptor;
035import org.apache.hadoop.hbase.NotServingRegionException;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.TableNameTestRule;
038import org.apache.hadoop.hbase.Waiter;
039import org.apache.hadoop.hbase.client.Admin;
040import org.apache.hadoop.hbase.client.Append;
041import org.apache.hadoop.hbase.client.BufferedMutator;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Durability;
044import org.apache.hadoop.hbase.client.Increment;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.ResultScanner;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.coprocessor.ObserverContext;
053import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
054import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
055import org.apache.hadoop.hbase.coprocessor.RegionObserver;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.filter.FilterBase;
058import org.apache.hadoop.hbase.testclassification.LargeTests;
059import org.apache.hadoop.hbase.testclassification.RegionServerTests;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.apache.hadoop.hbase.wal.WALEdit;
063import org.junit.After;
064import org.junit.Before;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073@Category({ RegionServerTests.class, LargeTests.class })
074public class TestRegionInterrupt {
075
076  @ClassRule
077  public static final HBaseClassTestRule CLASS_RULE =
078    HBaseClassTestRule.forClass(TestRegionInterrupt.class);
079
080  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
081  private static final Logger LOG = LoggerFactory.getLogger(TestRegionInterrupt.class);
082
083  static final byte[] FAMILY = Bytes.toBytes("info");
084
085  static long sleepTime;
086
087  @Rule
088  public TableNameTestRule name = new TableNameTestRule();
089
090  @BeforeClass
091  public static void setUpBeforeClass() throws Exception {
092    Configuration conf = TEST_UTIL.getConfiguration();
093    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
094    conf.setClass(HConstants.REGION_IMPL, InterruptInterceptingHRegion.class, Region.class);
095    conf.setBoolean(HRegion.CLOSE_WAIT_ABORT, true);
096    // Ensure the sleep interval is long enough for interrupts to occur.
097    long waitInterval =
098      conf.getLong(HRegion.CLOSE_WAIT_INTERVAL, HRegion.DEFAULT_CLOSE_WAIT_INTERVAL);
099    sleepTime = waitInterval * 2;
100    // Try to bound the running time of this unit if expected actions do not take place.
101    conf.setLong(HRegion.CLOSE_WAIT_TIME, sleepTime * 2);
102  }
103
104  @Before
105  public void setUp() throws Exception {
106    TEST_UTIL.startMiniCluster();
107  }
108
109  @After
110  public void tearDown() throws Exception {
111    TEST_UTIL.shutdownMiniCluster();
112  }
113
114  @Test
115  public void testCloseInterruptScanning() throws Exception {
116    final TableName tableName = name.getTableName();
117    LOG.info("Creating table " + tableName);
118    try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) {
119      // load some data
120      TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
121      TEST_UTIL.loadTable(table, FAMILY);
122      final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
123      // scan the table in the background
124      Thread scanner = new Thread(new Runnable() {
125        @Override
126        public void run() {
127          Scan scan = new Scan();
128          scan.addFamily(FAMILY);
129          scan.setFilter(new DelayingFilter());
130          try {
131            LOG.info("Starting scan");
132            try (ResultScanner rs = table.getScanner(scan)) {
133              Result r;
134              do {
135                r = rs.next();
136                if (r != null) {
137                  LOG.info("Scanned row " + Bytes.toStringBinary(r.getRow()));
138                }
139              } while (r != null);
140            }
141          } catch (IOException e) {
142            LOG.info("Scanner caught exception", e);
143            expectedExceptionCaught.set(true);
144          } finally {
145            LOG.info("Finished scan");
146          }
147        }
148      });
149      scanner.start();
150
151      // Wait for the filter to begin sleeping
152      LOG.info("Waiting for scanner to start");
153      Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() {
154        @Override
155        public boolean evaluate() throws Exception {
156          return DelayingFilter.isSleeping();
157        }
158      });
159
160      // Offline the table, this will trigger closing
161      LOG.info("Offlining table " + tableName);
162      TEST_UTIL.getHBaseAdmin().disableTable(tableName);
163
164      // Wait for scanner termination
165      scanner.join();
166
167      // When we get here the region has closed and the table is offline
168      assertTrue("Region operations were not interrupted",
169        InterruptInterceptingHRegion.wasInterrupted());
170      assertTrue("Scanner did not catch expected exception", expectedExceptionCaught.get());
171    }
172  }
173
174  @Test
175  public void testCloseInterruptMutation() throws Exception {
176    final TableName tableName = name.getTableName();
177    final Admin admin = TEST_UTIL.getAdmin();
178    // Create the test table
179    HTableDescriptor htd = new HTableDescriptor(tableName);
180    htd.addFamily(new HColumnDescriptor(FAMILY));
181    htd.addCoprocessor(MutationDelayingCoprocessor.class.getName());
182    LOG.info("Creating table " + tableName);
183    admin.createTable(htd);
184    TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
185
186    // Insert some data in the background
187    LOG.info("Starting writes to table " + tableName);
188    final int NUM_ROWS = 100;
189    final AtomicBoolean expectedExceptionCaught = new AtomicBoolean(false);
190    Thread inserter = new Thread(new Runnable() {
191      @Override
192      public void run() {
193        try (BufferedMutator t = admin.getConnection().getBufferedMutator(tableName)) {
194          for (int i = 0; i < NUM_ROWS; i++) {
195            LOG.info("Writing row " + i + " to " + tableName);
196            byte[] value = new byte[10], row = Bytes.toBytes(Integer.toString(i));
197            Bytes.random(value);
198            t.mutate(new Put(row).addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, value));
199            t.flush();
200          }
201        } catch (IOException e) {
202          LOG.info("Inserter caught exception", e);
203          expectedExceptionCaught.set(true);
204        }
205      }
206    });
207    inserter.start();
208
209    // Wait for delayed insertion to begin
210    LOG.info("Waiting for mutations to start");
211    Waiter.waitFor(TEST_UTIL.getConfiguration(), 10 * 1000, new Waiter.Predicate<Exception>() {
212      @Override
213      public boolean evaluate() throws Exception {
214        return MutationDelayingCoprocessor.isSleeping();
215      }
216    });
217
218    // Offline the table, this will trigger closing
219    LOG.info("Offlining table " + tableName);
220    admin.disableTable(tableName);
221
222    // Wait for the inserter to finish
223    inserter.join();
224
225    // When we get here the region has closed and the table is offline
226    assertTrue("Region operations were not interrupted",
227      InterruptInterceptingHRegion.wasInterrupted());
228    assertTrue("Inserter did not catch expected exception", expectedExceptionCaught.get());
229
230  }
231
232  public static class InterruptInterceptingHRegion extends HRegion {
233
234    private static boolean interrupted = false;
235
236    public static boolean wasInterrupted() {
237      return interrupted;
238    }
239
240    public InterruptInterceptingHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration conf,
241      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
242      super(tableDir, wal, fs, conf, regionInfo, htd, rsServices);
243    }
244
245    public InterruptInterceptingHRegion(HRegionFileSystem fs, WAL wal, Configuration conf,
246      TableDescriptor htd, RegionServerServices rsServices) {
247      super(fs, wal, conf, htd, rsServices);
248    }
249
250    @Override
251    void checkInterrupt() throws NotServingRegionException, InterruptedIOException {
252      try {
253        super.checkInterrupt();
254      } catch (NotServingRegionException | InterruptedIOException e) {
255        interrupted = true;
256        throw e;
257      }
258    }
259
260    @Override
261    IOException throwOnInterrupt(Throwable t) {
262      interrupted = true;
263      return super.throwOnInterrupt(t);
264    }
265
266  }
267
268  public static class DelayingFilter extends FilterBase {
269
270    static volatile boolean sleeping = false;
271
272    public static boolean isSleeping() {
273      return sleeping;
274    }
275
276    @Override
277    public ReturnCode filterCell(Cell v) throws IOException {
278      LOG.info("Starting sleep on " + v);
279      sleeping = true;
280      try {
281        Thread.sleep(sleepTime);
282      } catch (InterruptedException e) {
283        // restore interrupt status so region scanner can handle it as expected
284        Thread.currentThread().interrupt();
285        LOG.info("Interrupted during sleep on " + v);
286      } finally {
287        LOG.info("Done sleep on " + v);
288        sleeping = false;
289      }
290      return ReturnCode.INCLUDE;
291    }
292
293    public static DelayingFilter parseFrom(final byte[] pbBytes) throws DeserializationException {
294      // Just return a new instance.
295      return new DelayingFilter();
296    }
297
298  }
299
300  public static class MutationDelayingCoprocessor implements RegionCoprocessor, RegionObserver {
301
302    static volatile boolean sleeping = false;
303
304    public static boolean isSleeping() {
305      return sleeping;
306    }
307
308    private void doSleep(Region.Operation op) {
309      LOG.info("Starting sleep for " + op);
310      sleeping = true;
311      try {
312        Thread.sleep(sleepTime);
313      } catch (InterruptedException e) {
314        // restore interrupt status so doMiniBatchMutation etc. can handle it as expected
315        Thread.currentThread().interrupt();
316        LOG.info("Interrupted during " + op);
317      } finally {
318        LOG.info("Done");
319        sleeping = false;
320      }
321    }
322
323    @Override
324    public Optional<RegionObserver> getRegionObserver() {
325      return Optional.of(this);
326    }
327
328    @Override
329    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
330      Durability durability) throws IOException {
331      doSleep(Region.Operation.PUT);
332      RegionObserver.super.prePut(c, put, edit, durability);
333    }
334
335    @Override
336    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
337      WALEdit edit, Durability durability) throws IOException {
338      doSleep(Region.Operation.DELETE);
339      RegionObserver.super.preDelete(c, delete, edit, durability);
340    }
341
342    @Override
343    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
344      throws IOException {
345      doSleep(Region.Operation.APPEND);
346      return RegionObserver.super.preAppend(c, append);
347    }
348
349    @Override
350    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
351      throws IOException {
352      doSleep(Region.Operation.INCREMENT);
353      return RegionObserver.super.preIncrement(c, increment);
354    }
355
356  }
357
358}