001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.jupiter.api.Assertions.assertEquals;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.junit.jupiter.api.Assertions.fail;
025
026import java.io.IOException;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.NavigableSet;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicLong;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.DoNotRetryIOException;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseTestingUtil;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.TableNameTestExtension;
041import org.apache.hadoop.hbase.exceptions.ScannerResetException;
042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.regionserver.HStore;
045import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
046import org.apache.hadoop.hbase.regionserver.RegionServerServices;
047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner;
048import org.apache.hadoop.hbase.regionserver.ScanInfo;
049import org.apache.hadoop.hbase.regionserver.StoreScanner;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.junit.jupiter.api.AfterAll;
053import org.junit.jupiter.api.Test;
054import org.junit.jupiter.api.extension.RegisterExtension;
055
056public class FromClientSideScanExcpetionTestBase {
057
058  protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
059
060  protected static byte[] FAMILY = Bytes.toBytes("testFamily");
061
062  protected static int SLAVES = 3;
063
064  @RegisterExtension
065  private TableNameTestExtension name = new TableNameTestExtension();
066
067  @AfterAll
068  public static void tearDownAfterClass() throws Exception {
069    TEST_UTIL.shutdownMiniCluster();
070  }
071
072  private static AtomicBoolean ON = new AtomicBoolean(false);
073  private static AtomicLong REQ_COUNT = new AtomicLong(0);
074  private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw
075                                                                           // DNRIOE
076  private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once
077
078  private static void reset() {
079    ON.set(false);
080    REQ_COUNT.set(0);
081    IS_DO_NOT_RETRY.set(false);
082    THROW_ONCE.set(true);
083  }
084
085  private static void inject() {
086    ON.set(true);
087  }
088
089  protected static void startCluster() throws Exception {
090    Configuration conf = TEST_UTIL.getConfiguration();
091    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
092    conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
093    conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class);
094    conf.setBoolean("hbase.client.log.scanner.activity", true);
095    // We need more than one region server in this test
096    TEST_UTIL.startMiniCluster(SLAVES);
097  }
098
099  public static final class MyHRegion extends HRegion {
100
101    @SuppressWarnings("deprecation")
102    public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
103      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
104      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
105    }
106
107    @Override
108    protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup)
109      throws IOException {
110      return new MyHStore(this, family, conf, warmup);
111    }
112  }
113
114  public static final class MyHStore extends HStore {
115
116    public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam,
117      boolean warmup) throws IOException {
118      super(region, family, confParam, warmup);
119    }
120
121    @Override
122    protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
123      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
124      return scan.isReversed()
125        ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
126        : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
127    }
128  }
129
130  public static final class MyStoreScanner extends StoreScanner {
131    public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
132      long readPt) throws IOException {
133      super(store, scanInfo, scan, columns, readPt);
134    }
135
136    @Override
137    protected List<KeyValueScanner> selectScannersFrom(HStore store,
138      List<? extends KeyValueScanner> allScanners) {
139      List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
140      List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
141      for (KeyValueScanner scanner : scanners) {
142        newScanners.add(new DelegatingKeyValueScanner(scanner) {
143          @Override
144          public boolean reseek(ExtendedCell key) throws IOException {
145            if (ON.get()) {
146              REQ_COUNT.incrementAndGet();
147              if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
148                if (IS_DO_NOT_RETRY.get()) {
149                  throw new DoNotRetryIOException("Injected exception");
150                } else {
151                  throw new IOException("Injected exception");
152                }
153              }
154            }
155            return super.reseek(key);
156          }
157        });
158      }
159      return newScanners;
160    }
161  }
162
163  /**
164   * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
165   * the server side RegionScanner to be in dirty state. The client has to ensure that the
166   * ClientScanner does not get an exception and also sees all the data.
167   */
168  @Test
169  public void testClientScannerIsResetWhenScanThrowsIOException()
170    throws IOException, InterruptedException {
171    reset();
172    THROW_ONCE.set(true); // throw exceptions only once
173    TableName tableName = name.getTableName();
174    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
175      int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
176      TEST_UTIL.getAdmin().flush(tableName);
177      inject();
178      int actualRowCount = HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
179      assertEquals(rowCount, actualRowCount);
180    }
181    assertTrue(REQ_COUNT.get() > 0);
182  }
183
184  /**
185   * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
186   * is that the exception will bubble up to the client scanner instead of being retried.
187   */
188  @Test
189  public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
190    throws IOException, InterruptedException {
191    reset();
192    IS_DO_NOT_RETRY.set(true);
193    TableName tableName = name.getTableName();
194    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
195      TEST_UTIL.loadTable(t, FAMILY, false);
196      TEST_UTIL.getAdmin().flush(tableName);
197      inject();
198      HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
199      fail("Should have thrown an exception");
200    } catch (DoNotRetryIOException expected) {
201      // expected
202    }
203    assertTrue(REQ_COUNT.get() > 0);
204  }
205
206  /**
207   * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
208   * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
209   * indefinitely.
210   */
211  @Test
212  public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
213    throws IOException, InterruptedException {
214    TableName tableName = name.getTableName();
215    reset();
216    THROW_ONCE.set(false); // throw exceptions in every retry
217    try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
218      TEST_UTIL.loadTable(t, FAMILY, false);
219      TEST_UTIL.getAdmin().flush(tableName);
220      inject();
221      HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
222      fail("Should have thrown an exception");
223    } catch (ScannerResetException expected) {
224      // expected
225    } catch (RetriesExhaustedException e) {
226      // expected
227      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
228    }
229    assertTrue(REQ_COUNT.get() >= 3);
230  }
231}