001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.CoreMatchers.instanceOf;
021import static org.junit.Assert.assertArrayEquals;
022import static org.junit.Assert.assertThat;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.coprocessor.ObserverContext;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
037import org.apache.hadoop.hbase.coprocessor.RegionObserver;
038import org.apache.hadoop.hbase.exceptions.ScannerResetException;
039import org.apache.hadoop.hbase.regionserver.InternalScanner;
040import org.apache.hadoop.hbase.testclassification.ClientTests;
041import org.apache.hadoop.hbase.testclassification.MediumTests;
042import org.apache.hadoop.hbase.util.Bytes;
043import org.junit.AfterClass;
044import org.junit.Before;
045import org.junit.BeforeClass;
046import org.junit.ClassRule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049
050import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
051
052@Category({ MediumTests.class, ClientTests.class })
053public class TestAsyncTableScanException {
054
055  @ClassRule
056  public static final HBaseClassTestRule CLASS_RULE =
057    HBaseClassTestRule.forClass(TestAsyncTableScanException.class);
058
059  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
060
061  private static TableName TABLE_NAME = TableName.valueOf("scan");
062
063  private static byte[] FAMILY = Bytes.toBytes("family");
064
065  private static byte[] QUAL = Bytes.toBytes("qual");
066
067  private static AsyncConnection CONN;
068
069  private static AtomicInteger REQ_COUNT = new AtomicInteger();
070
071  private static volatile int ERROR_AT;
072
073  private static volatile boolean ERROR;
074
075  private static volatile boolean DO_NOT_RETRY;
076
077  public static final class ErrorCP implements RegionObserver, RegionCoprocessor {
078
079    @Override
080    public Optional<RegionObserver> getRegionObserver() {
081      return Optional.of(this);
082    }
083
084    @Override
085    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
086        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
087      REQ_COUNT.incrementAndGet();
088      if ((ERROR_AT == REQ_COUNT.get()) || ERROR) {
089        if (DO_NOT_RETRY) {
090          throw new DoNotRetryIOException("Injected exception");
091        } else {
092          throw new IOException("Injected exception");
093        }
094      }
095      return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext);
096    }
097
098  }
099
100  @BeforeClass
101  public static void setUp() throws Exception {
102    UTIL.startMiniCluster(3);
103    UTIL.getAdmin()
104      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
105        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
106        .setCoprocessor(ErrorCP.class.getName()).build());
107    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
108      for (int i = 0; i < 100; i++) {
109        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)));
110      }
111    }
112    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
113  }
114
115  @AfterClass
116  public static void tearDown() throws Exception {
117    Closeables.close(CONN, true);
118    UTIL.shutdownMiniCluster();
119  }
120
121  @Before
122  public void setUpBeforeTest() {
123    REQ_COUNT.set(0);
124    ERROR_AT = 0;
125    ERROR = false;
126    DO_NOT_RETRY = false;
127  }
128
129  @Test(expected = DoNotRetryIOException.class)
130  public void testDoNotRetryIOException() throws IOException {
131    ERROR_AT = 1;
132    DO_NOT_RETRY = true;
133    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) {
134      scanner.next();
135    }
136  }
137
138  @Test
139  public void testIOException() throws IOException {
140    ERROR = true;
141    try (ResultScanner scanner =
142      CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) {
143      scanner.next();
144      fail();
145    } catch (RetriesExhaustedException e) {
146      // expected
147      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
148    }
149    assertTrue(REQ_COUNT.get() >= 3);
150  }
151
152  private void count() throws IOException {
153    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) {
154      for (int i = 0; i < 100; i++) {
155        Result result = scanner.next();
156        assertArrayEquals(Bytes.toBytes(i), result.getRow());
157        assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL));
158      }
159    }
160  }
161
162  @Test
163  public void testRecoveryFromScannerResetWhileOpening() throws IOException {
164    ERROR_AT = 1;
165    count();
166    // we should at least request 1 time otherwise the error will not be triggered, and then we
167    // need at least one more request to get the remaining results.
168    assertTrue(REQ_COUNT.get() >= 2);
169  }
170
171  @Test
172  public void testRecoveryFromScannerResetInTheMiddle() throws IOException {
173    ERROR_AT = 2;
174    count();
175    // we should at least request 2 times otherwise the error will not be triggered, and then we
176    // need at least one more request to get the remaining results.
177    assertTrue(REQ_COUNT.get() >= 3);
178  }
179}