001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.greaterThanOrEqualTo;
022import static org.hamcrest.Matchers.instanceOf;
023import static org.junit.jupiter.api.Assertions.assertArrayEquals;
024import static org.junit.jupiter.api.Assertions.assertThrows;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.hbase.DoNotRetryIOException;
031import org.apache.hadoop.hbase.HBaseTestingUtil;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.coprocessor.ObserverContext;
034import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
036import org.apache.hadoop.hbase.coprocessor.RegionObserver;
037import org.apache.hadoop.hbase.exceptions.ScannerResetException;
038import org.apache.hadoop.hbase.regionserver.InternalScanner;
039import org.apache.hadoop.hbase.testclassification.ClientTests;
040import org.apache.hadoop.hbase.testclassification.MediumTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.jupiter.api.AfterAll;
043import org.junit.jupiter.api.BeforeAll;
044import org.junit.jupiter.api.BeforeEach;
045import org.junit.jupiter.api.Tag;
046import org.junit.jupiter.api.Test;
047
048import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
049
050@Tag(MediumTests.TAG)
051@Tag(ClientTests.TAG)
052public class TestAsyncTableScanException {
053
054  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
055
056  private static TableName TABLE_NAME = TableName.valueOf("scan");
057
058  private static byte[] FAMILY = Bytes.toBytes("family");
059
060  private static byte[] QUAL = Bytes.toBytes("qual");
061
062  private static AsyncConnection CONN;
063
064  private static AtomicInteger REQ_COUNT = new AtomicInteger();
065
066  private static volatile int ERROR_AT;
067
068  private static volatile boolean ERROR;
069
070  private static volatile boolean DO_NOT_RETRY;
071
072  private static final int ROW_COUNT = 100;
073
074  public static final class ErrorCP implements RegionObserver, RegionCoprocessor {
075
076    @Override
077    public Optional<RegionObserver> getRegionObserver() {
078      return Optional.of(this);
079    }
080
081    @Override
082    public boolean postScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c,
083      InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
084      REQ_COUNT.incrementAndGet();
085      if ((ERROR_AT == REQ_COUNT.get()) || ERROR) {
086        if (DO_NOT_RETRY) {
087          throw new DoNotRetryIOException("Injected exception");
088        } else {
089          throw new IOException("Injected exception");
090        }
091      }
092      return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext);
093    }
094
095  }
096
097  @BeforeAll
098  public static void setUp() throws Exception {
099    UTIL.startMiniCluster(1);
100    UTIL.getAdmin()
101      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
102        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
103        .setCoprocessor(ErrorCP.class.getName()).build());
104    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
105      for (int i = 0; i < ROW_COUNT; i++) {
106        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)));
107      }
108    }
109    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
110  }
111
112  @AfterAll
113  public static void tearDown() throws Exception {
114    Closeables.close(CONN, true);
115    UTIL.shutdownMiniCluster();
116  }
117
118  @BeforeEach
119  public void setUpBeforeTest() {
120    REQ_COUNT.set(0);
121    ERROR_AT = 0;
122    ERROR = false;
123    DO_NOT_RETRY = false;
124  }
125
126  @Test
127  public void testDoNotRetryIOException() throws IOException {
128    ERROR_AT = 1;
129    DO_NOT_RETRY = true;
130    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) {
131      assertThrows(DoNotRetryIOException.class, () -> scanner.next());
132    }
133  }
134
135  @Test
136  public void testIOException() throws IOException {
137    ERROR = true;
138    try (ResultScanner scanner =
139      CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) {
140      RetriesExhaustedException e =
141        assertThrows(RetriesExhaustedException.class, () -> scanner.next());
142      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
143    }
144    assertThat(REQ_COUNT.get(), greaterThanOrEqualTo(3));
145  }
146
147  private void count() throws IOException {
148    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) {
149      for (int i = 0; i < ROW_COUNT; i++) {
150        Result result = scanner.next();
151        assertArrayEquals(Bytes.toBytes(i), result.getRow());
152        assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL));
153      }
154    }
155  }
156
157  @Test
158  public void testRecoveryFromScannerResetWhileOpening() throws IOException {
159    ERROR_AT = 1;
160    count();
161    // we should at least request 1 time otherwise the error will not be triggered, and then we
162    // need at least one more request to get the remaining results.
163    assertThat(REQ_COUNT.get(), greaterThanOrEqualTo(2));
164  }
165
166  @Test
167  public void testRecoveryFromScannerResetInTheMiddle() throws IOException {
168    ERROR_AT = 2;
169    count();
170    // we should at least request 2 times otherwise the error will not be triggered, and then we
171    // need at least one more request to get the remaining results.
172    assertThat(REQ_COUNT.get(), greaterThanOrEqualTo(3));
173  }
174}