001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.NavigableSet;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.exceptions.ScannerResetException;
042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HStore;
045import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
046import org.apache.hadoop.hbase.regionserver.RegionServerServices;
047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.StoreScanner;
050import org.apache.hadoop.hbase.testclassification.ClientTests;
051import org.apache.hadoop.hbase.testclassification.MediumTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.wal.WAL;
054import org.junit.AfterClass;
055import org.junit.BeforeClass;
056import org.junit.ClassRule;
057import org.junit.Rule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.junit.rules.TestName;
061
062@Category({ MediumTests.class, ClientTests.class })
063public class TestFromClientSideScanExcpetion {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067    HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class);
068
069  protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
070
071  private static byte[] FAMILY = Bytes.toBytes("testFamily");
072
073  private static int SLAVES = 3;
074
075  @Rule
076  public TestName name = new TestName();
077
078  @BeforeClass
079  public static void setUpBeforeClass() throws Exception {
080    Configuration conf = TEST_UTIL.getConfiguration();
081    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
082    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
083    conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class);
084    conf.setBoolean("hbase.client.log.scanner.activity", true);
085    // We need more than one region server in this test
086    TEST_UTIL.startMiniCluster(SLAVES);
087  }
088
089  @AfterClass
090  public static void tearDownAfterClass() throws Exception {
091    TEST_UTIL.shutdownMiniCluster();
092  }
093
094  private static AtomicBoolean ON = new AtomicBoolean(false);
095  private static AtomicLong REQ_COUNT = new AtomicLong(0);
096  private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw
097                                                                           // DNRIOE
098  private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once
099
100  private static void reset() {
101    ON.set(false);
102    REQ_COUNT.set(0);
103    IS_DO_NOT_RETRY.set(false);
104    THROW_ONCE.set(true);
105  }
106
107  private static void inject() {
108    ON.set(true);
109  }
110
111  public static final class MyHRegion extends HRegion {
112
113    @SuppressWarnings("deprecation")
114    public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
115      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
116      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
117    }
118
119    @Override
120    protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup)
121      throws IOException {
122      return new MyHStore(this, family, conf, warmup);
123    }
124  }
125
126  public static final class MyHStore extends HStore {
127
128    public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam,
129      boolean warmup) throws IOException {
130      super(region, family, confParam, warmup);
131    }
132
133    @Override
134    protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
135      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
136      return scan.isReversed()
137        ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
138        : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
139    }
140  }
141
142  public static final class MyStoreScanner extends StoreScanner {
143    public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
144      long readPt) throws IOException {
145      super(store, scanInfo, scan, columns, readPt);
146    }
147
148    @Override
149    protected List<KeyValueScanner> selectScannersFrom(HStore store,
150      List<? extends KeyValueScanner> allScanners) {
151      List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
152      List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
153      for (KeyValueScanner scanner : scanners) {
154        newScanners.add(new DelegatingKeyValueScanner(scanner) {
155          @Override
156          public boolean reseek(ExtendedCell key) throws IOException {
157            if (ON.get()) {
158              REQ_COUNT.incrementAndGet();
159              if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
160                if (IS_DO_NOT_RETRY.get()) {
161                  throw new DoNotRetryIOException("Injected exception");
162                } else {
163                  throw new IOException("Injected exception");
164                }
165              }
166            }
167            return super.reseek(key);
168          }
169        });
170      }
171      return newScanners;
172    }
173  }
174
175  /**
176   * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
177   * the server side RegionScanner to be in dirty state. The client has to ensure that the
178   * ClientScanner does not get an exception and also sees all the data.
179   */
180  @Test
181  public void testClientScannerIsResetWhenScanThrowsIOException()
182    throws IOException, InterruptedException {
183    reset();
184    THROW_ONCE.set(true); // throw exceptions only once
185    TableName tableName = TableName.valueOf(name.getMethodName());
186    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
187      int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
188      TEST_UTIL.getAdmin().flush(tableName);
189      inject();
190      int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
191      assertEquals(rowCount, actualRowCount);
192    }
193    assertTrue(REQ_COUNT.get() > 0);
194  }
195
196  /**
197   * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
198   * is that the exception will bubble up to the client scanner instead of being retried.
199   */
200  @Test
201  public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
202    throws IOException, InterruptedException {
203    reset();
204    IS_DO_NOT_RETRY.set(true);
205    TableName tableName = TableName.valueOf(name.getMethodName());
206    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
207      TEST_UTIL.loadTable(t, FAMILY, false);
208      TEST_UTIL.getAdmin().flush(tableName);
209      inject();
210      TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
211      fail("Should have thrown an exception");
212    } catch (DoNotRetryIOException expected) {
213      // expected
214    }
215    assertTrue(REQ_COUNT.get() > 0);
216  }
217
218  /**
219   * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
220   * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
221   * indefinitely.
222   */
223  @Test
224  public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
225    throws IOException, InterruptedException {
226    TableName tableName = TableName.valueOf(name.getMethodName());
227    reset();
228    THROW_ONCE.set(false); // throw exceptions in every retry
229    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
230      TEST_UTIL.loadTable(t, FAMILY, false);
231      TEST_UTIL.getAdmin().flush(tableName);
232      inject();
233      TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
234      fail("Should have thrown an exception");
235    } catch (ScannerResetException expected) {
236      // expected
237    } catch (RetriesExhaustedException e) {
238      // expected
239      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
240    }
241    assertTrue(REQ_COUNT.get() >= 3);
242  }
243}