001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.hamcrest.MatcherAssert.assertThat;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.exceptions.ScannerResetException;
039import org.apache.hadoop.hbase.regionserver.InternalScanner;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.AfterClass;
044import org.junit.Before;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
051
052@Category({ MediumTests.class, ClientTests.class })
053public class TestAsyncTableScanException {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestAsyncTableScanException.class);
058
059  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
060
061  private static TableName TABLE_NAME = TableName.valueOf("scan");
062
063  private static byte[] FAMILY = Bytes.toBytes("family");
064
065  private static byte[] QUAL = Bytes.toBytes("qual");
066
067  private static AsyncConnection CONN;
068
069  private static AtomicInteger REQ_COUNT = new AtomicInteger();
070
071  private static volatile int ERROR_AT;
072
073  private static volatile boolean ERROR;
074
075  private static volatile boolean DO_NOT_RETRY;
076
077  private static final int ROW_COUNT = 100;
078
079  public static final class ErrorCP implements RegionObserver, RegionCoprocessor {
080
081    @Override
082    public Optional<RegionObserver> getRegionObserver() {
083      return Optional.of(this);
084    }
085
086    @Override
087    public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c,
088      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
089      REQ_COUNT.incrementAndGet();
090      if ((ERROR_AT == REQ_COUNT.get()) || ERROR) {
091        if (DO_NOT_RETRY) {
092          throw new DoNotRetryIOException("Injected exception");
093        } else {
094          throw new IOException("Injected exception");
095        }
096      }
097      return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext);
098    }
099
100  }
101
102  @BeforeClass
103  public static void setUp() throws Exception {
104    UTIL.startMiniCluster(1);
105    UTIL.getAdmin()
106      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
107        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
108        .setCoprocessor(ErrorCP.class.getName()).build());
109    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
110      for (int i = 0; i < ROW_COUNT; i++) {
111        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)));
112      }
113    }
114    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
115  }
116
117  @AfterClass
118  public static void tearDown() throws Exception {
119    Closeables.close(CONN, true);
120    UTIL.shutdownMiniCluster();
121  }
122
123  @Before
124  public void setUpBeforeTest() {
125    REQ_COUNT.set(0);
126    ERROR_AT = 0;
127    ERROR = false;
128    DO_NOT_RETRY = false;
129  }
130
131  @Test(expected = DoNotRetryIOException.class)
132  public void testDoNotRetryIOException() throws IOException {
133    ERROR_AT = 1;
134    DO_NOT_RETRY = true;
135    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) {
136      scanner.next();
137    }
138  }
139
140  @Test
141  public void testIOException() throws IOException {
142    ERROR = true;
143    try (ResultScanner scanner =
144      CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) {
145      scanner.next();
146      fail();
147    } catch (RetriesExhaustedException e) {
148      // expected
149      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
150    }
151    assertTrue(REQ_COUNT.get() >= 3);
152  }
153
154  private void count() throws IOException {
155    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) {
156      for (int i = 0; i < ROW_COUNT; i++) {
157        Result result = scanner.next();
158        assertArrayEquals(Bytes.toBytes(i), result.getRow());
159        assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL));
160      }
161    }
162  }
163
164  @Test
165  public void testRecoveryFromScannerResetWhileOpening() throws IOException {
166    ERROR_AT = 1;
167    count();
168    // we should at least request 1 time otherwise the error will not be triggered, and then we
169    // need at least one more request to get the remaining results.
170    assertTrue(REQ_COUNT.get() >= 2);
171  }
172
173  @Test
174  public void testRecoveryFromScannerResetInTheMiddle() throws IOException {
175    ERROR_AT = 2;
176    count();
177    // we should at least request 2 times otherwise the error will not be triggered, and then we
178    // need at least one more request to get the remaining results.
179    assertTrue(REQ_COUNT.get() >= 3);
180  }
181}