001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseTestingUtil; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.TableNameTestExtension; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.RegionServerServices; 047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.StoreScanner; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.wal.WAL; 052import org.junit.jupiter.api.AfterAll; 053import org.junit.jupiter.api.Test; 054import org.junit.jupiter.api.extension.RegisterExtension; 055 056public class FromClientSideScanExcpetionTestBase { 057 058 protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 059 060 protected static byte[] FAMILY = Bytes.toBytes("testFamily"); 061 062 protected static int SLAVES = 3; 063 064 @RegisterExtension 065 private TableNameTestExtension name = new TableNameTestExtension(); 066 067 @AfterAll 068 public static void tearDownAfterClass() throws Exception { 069 TEST_UTIL.shutdownMiniCluster(); 070 } 071 072 private static AtomicBoolean ON = new AtomicBoolean(false); 073 private static AtomicLong REQ_COUNT = new AtomicLong(0); 074 private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw 075 // DNRIOE 076 private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once 077 078 private static void reset() { 079 ON.set(false); 080 REQ_COUNT.set(0); 081 IS_DO_NOT_RETRY.set(false); 082 THROW_ONCE.set(true); 083 } 084 085 private static void inject() { 086 ON.set(true); 087 } 088 089 protected static void startCluster() throws Exception { 090 Configuration conf = TEST_UTIL.getConfiguration(); 091 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 092 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); 093 conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); 094 conf.setBoolean("hbase.client.log.scanner.activity", true); 095 // We need more than one region server in this test 096 TEST_UTIL.startMiniCluster(SLAVES); 097 } 098 099 public static final class MyHRegion extends HRegion { 100 101 @SuppressWarnings("deprecation") 102 public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 103 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 104 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 105 } 106 107 @Override 108 protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup) 109 throws IOException { 110 return new MyHStore(this, family, conf, warmup); 111 } 112 } 113 114 public static final class MyHStore extends HStore { 115 116 public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam, 117 boolean warmup) throws IOException { 118 super(region, family, confParam, warmup); 119 } 120 121 @Override 122 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 123 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 124 return scan.isReversed() 125 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 126 : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); 127 } 128 } 129 130 public static final class MyStoreScanner extends StoreScanner { 131 public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 132 long readPt) throws IOException { 133 super(store, scanInfo, scan, columns, readPt); 134 } 135 136 @Override 137 protected List<KeyValueScanner> selectScannersFrom(HStore store, 138 List<? extends KeyValueScanner> allScanners) { 139 List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); 140 List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); 141 for (KeyValueScanner scanner : scanners) { 142 newScanners.add(new DelegatingKeyValueScanner(scanner) { 143 @Override 144 public boolean reseek(ExtendedCell key) throws IOException { 145 if (ON.get()) { 146 REQ_COUNT.incrementAndGet(); 147 if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { 148 if (IS_DO_NOT_RETRY.get()) { 149 throw new DoNotRetryIOException("Injected exception"); 150 } else { 151 throw new IOException("Injected exception"); 152 } 153 } 154 } 155 return super.reseek(key); 156 } 157 }); 158 } 159 return newScanners; 160 } 161 } 162 163 /** 164 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving 165 * the server side RegionScanner to be in dirty state. The client has to ensure that the 166 * ClientScanner does not get an exception and also sees all the data. 167 */ 168 @Test 169 public void testClientScannerIsResetWhenScanThrowsIOException() 170 throws IOException, InterruptedException { 171 reset(); 172 THROW_ONCE.set(true); // throw exceptions only once 173 TableName tableName = name.getTableName(); 174 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 175 int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); 176 TEST_UTIL.getAdmin().flush(tableName); 177 inject(); 178 int actualRowCount = HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 179 assertEquals(rowCount, actualRowCount); 180 } 181 assertTrue(REQ_COUNT.get() > 0); 182 } 183 184 /** 185 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation 186 * is that the exception will bubble up to the client scanner instead of being retried. 187 */ 188 @Test 189 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() 190 throws IOException, InterruptedException { 191 reset(); 192 IS_DO_NOT_RETRY.set(true); 193 TableName tableName = name.getTableName(); 194 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 195 TEST_UTIL.loadTable(t, FAMILY, false); 196 TEST_UTIL.getAdmin().flush(tableName); 197 inject(); 198 HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 199 fail("Should have thrown an exception"); 200 } catch (DoNotRetryIOException expected) { 201 // expected 202 } 203 assertTrue(REQ_COUNT.get() > 0); 204 } 205 206 /** 207 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is 208 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying 209 * indefinitely. 210 */ 211 @Test 212 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() 213 throws IOException, InterruptedException { 214 TableName tableName = name.getTableName(); 215 reset(); 216 THROW_ONCE.set(false); // throw exceptions in every retry 217 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 218 TEST_UTIL.loadTable(t, FAMILY, false); 219 TEST_UTIL.getAdmin().flush(tableName); 220 inject(); 221 HBaseTestingUtil.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 222 fail("Should have thrown an exception"); 223 } catch (ScannerResetException expected) { 224 // expected 225 } catch (RetriesExhaustedException e) { 226 // expected 227 assertThat(e.getCause(), instanceOf(ScannerResetException.class)); 228 } 229 assertTrue(REQ_COUNT.get() >= 3); 230 } 231}