001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertArrayEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.hbase.DoNotRetryIOException; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.coprocessor.ObserverContext; 035import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 036import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 037import org.apache.hadoop.hbase.coprocessor.RegionObserver; 038import org.apache.hadoop.hbase.exceptions.ScannerResetException; 039import org.apache.hadoop.hbase.regionserver.InternalScanner; 040import org.apache.hadoop.hbase.testclassification.ClientTests; 041import org.apache.hadoop.hbase.testclassification.MediumTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.junit.AfterClass; 044import org.junit.Before; 045import org.junit.BeforeClass; 046import org.junit.ClassRule; 047import org.junit.Test; 048import org.junit.experimental.categories.Category; 049 050import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 051 052@Category({ MediumTests.class, ClientTests.class }) 053public class TestAsyncTableScanException { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestAsyncTableScanException.class); 058 059 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 060 061 private static TableName TABLE_NAME = TableName.valueOf("scan"); 062 063 private static byte[] FAMILY = Bytes.toBytes("family"); 064 065 private static byte[] QUAL = Bytes.toBytes("qual"); 066 067 private static AsyncConnection CONN; 068 069 private static AtomicInteger REQ_COUNT = new AtomicInteger(); 070 071 private static volatile int ERROR_AT; 072 073 private static volatile boolean ERROR; 074 075 private static volatile boolean DO_NOT_RETRY; 076 077 private static final int ROW_COUNT = 100; 078 079 public static final class ErrorCP implements RegionObserver, RegionCoprocessor { 080 081 @Override 082 public Optional<RegionObserver> getRegionObserver() { 083 return Optional.of(this); 084 } 085 086 @Override 087 public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, 088 InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException { 089 REQ_COUNT.incrementAndGet(); 090 if ((ERROR_AT == REQ_COUNT.get()) || ERROR) { 091 if (DO_NOT_RETRY) { 092 throw new DoNotRetryIOException("Injected exception"); 093 } else { 094 throw new IOException("Injected exception"); 095 } 096 } 097 return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext); 098 } 099 100 } 101 102 @BeforeClass 103 public static void setUp() throws Exception { 104 UTIL.startMiniCluster(1); 105 UTIL.getAdmin() 106 .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME) 107 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 108 .setCoprocessor(ErrorCP.class.getName()).build()); 109 try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { 110 for (int i = 0; i < ROW_COUNT; i++) { 111 table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i))); 112 } 113 } 114 CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); 115 } 116 117 @AfterClass 118 public static void tearDown() throws Exception { 119 Closeables.close(CONN, true); 120 UTIL.shutdownMiniCluster(); 121 } 122 123 @Before 124 public void setUpBeforeTest() { 125 REQ_COUNT.set(0); 126 ERROR_AT = 0; 127 ERROR = false; 128 DO_NOT_RETRY = false; 129 } 130 131 @Test(expected = DoNotRetryIOException.class) 132 public void testDoNotRetryIOException() throws IOException { 133 ERROR_AT = 1; 134 DO_NOT_RETRY = true; 135 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) { 136 scanner.next(); 137 } 138 } 139 140 @Test 141 public void testIOException() throws IOException { 142 ERROR = true; 143 try (ResultScanner scanner = 144 CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) { 145 scanner.next(); 146 fail(); 147 } catch (RetriesExhaustedException e) { 148 // expected 149 assertThat(e.getCause(), instanceOf(ScannerResetException.class)); 150 } 151 assertTrue(REQ_COUNT.get() >= 3); 152 } 153 154 private void count() throws IOException { 155 try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) { 156 for (int i = 0; i < ROW_COUNT; i++) { 157 Result result = scanner.next(); 158 assertArrayEquals(Bytes.toBytes(i), result.getRow()); 159 assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL)); 160 } 161 } 162 } 163 164 @Test 165 public void testRecoveryFromScannerResetWhileOpening() throws IOException { 166 ERROR_AT = 1; 167 count(); 168 // we should at least request 1 time otherwise the error will not be triggered, and then we 169 // need at least one more request to get the remaining results. 170 assertTrue(REQ_COUNT.get() >= 2); 171 } 172 173 @Test 174 public void testRecoveryFromScannerResetInTheMiddle() throws IOException { 175 ERROR_AT = 2; 176 count(); 177 // we should at least request 2 times otherwise the error will not be triggered, and then we 178 // need at least one more request to get the remaining results. 179 assertTrue(REQ_COUNT.get() >= 3); 180 } 181}