001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.DoNotRetryIOException; 036import org.apache.hadoop.hbase.ExtendedCell; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtil; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.RegionServerServices; 047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.StoreScanner; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.junit.AfterClass; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061 062@Category({ MediumTests.class, ClientTests.class }) 063public class TestFromClientSideScanExcpetion { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class); 068 069 protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); 070 071 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 072 073 private static int SLAVES = 3; 074 075 @Rule 076 public TestName name = new TestName(); 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 Configuration conf = TEST_UTIL.getConfiguration(); 081 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 082 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); 083 conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); 084 conf.setBoolean("hbase.client.log.scanner.activity", true); 085 // We need more than one region server in this test 086 TEST_UTIL.startMiniCluster(SLAVES); 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TEST_UTIL.shutdownMiniCluster(); 092 } 093 094 private static AtomicBoolean ON = new AtomicBoolean(false); 095 private static AtomicLong REQ_COUNT = new AtomicLong(0); 096 private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw 097 // DNRIOE 098 private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once 099 100 private static void reset() { 101 ON.set(false); 102 REQ_COUNT.set(0); 103 IS_DO_NOT_RETRY.set(false); 104 THROW_ONCE.set(true); 105 } 106 107 private static void inject() { 108 ON.set(true); 109 } 110 111 public static final class MyHRegion extends HRegion { 112 113 @SuppressWarnings("deprecation") 114 public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 115 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 116 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 117 } 118 119 @Override 120 protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup) 121 throws IOException { 122 return new MyHStore(this, family, conf, warmup); 123 } 124 } 125 126 public static final class MyHStore extends HStore { 127 128 public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam, 129 boolean warmup) throws IOException { 130 super(region, family, confParam, warmup); 131 } 132 133 @Override 134 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 135 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 136 return scan.isReversed() 137 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 138 : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); 139 } 140 } 141 142 public static final class MyStoreScanner extends StoreScanner { 143 public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 144 long readPt) throws IOException { 145 super(store, scanInfo, scan, columns, readPt); 146 } 147 148 @Override 149 protected List<KeyValueScanner> selectScannersFrom(HStore store, 150 List<? extends KeyValueScanner> allScanners) { 151 List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); 152 List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); 153 for (KeyValueScanner scanner : scanners) { 154 newScanners.add(new DelegatingKeyValueScanner(scanner) { 155 @Override 156 public boolean reseek(ExtendedCell key) throws IOException { 157 if (ON.get()) { 158 REQ_COUNT.incrementAndGet(); 159 if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { 160 if (IS_DO_NOT_RETRY.get()) { 161 throw new DoNotRetryIOException("Injected exception"); 162 } else { 163 throw new IOException("Injected exception"); 164 } 165 } 166 } 167 return super.reseek(key); 168 } 169 }); 170 } 171 return newScanners; 172 } 173 } 174 175 /** 176 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving 177 * the server side RegionScanner to be in dirty state. The client has to ensure that the 178 * ClientScanner does not get an exception and also sees all the data. 179 */ 180 @Test 181 public void testClientScannerIsResetWhenScanThrowsIOException() 182 throws IOException, InterruptedException { 183 reset(); 184 THROW_ONCE.set(true); // throw exceptions only once 185 TableName tableName = TableName.valueOf(name.getMethodName()); 186 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 187 int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); 188 TEST_UTIL.getAdmin().flush(tableName); 189 inject(); 190 int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 191 assertEquals(rowCount, actualRowCount); 192 } 193 assertTrue(REQ_COUNT.get() > 0); 194 } 195 196 /** 197 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation 198 * is that the exception will bubble up to the client scanner instead of being retried. 199 */ 200 @Test 201 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() 202 throws IOException, InterruptedException { 203 reset(); 204 IS_DO_NOT_RETRY.set(true); 205 TableName tableName = TableName.valueOf(name.getMethodName()); 206 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 207 TEST_UTIL.loadTable(t, FAMILY, false); 208 TEST_UTIL.getAdmin().flush(tableName); 209 inject(); 210 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 211 fail("Should have thrown an exception"); 212 } catch (DoNotRetryIOException expected) { 213 // expected 214 } 215 assertTrue(REQ_COUNT.get() > 0); 216 } 217 218 /** 219 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is 220 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying 221 * indefinitely. 222 */ 223 @Test 224 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() 225 throws IOException, InterruptedException { 226 TableName tableName = TableName.valueOf(name.getMethodName()); 227 reset(); 228 THROW_ONCE.set(false); // throw exceptions in every retry 229 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 230 TEST_UTIL.loadTable(t, FAMILY, false); 231 TEST_UTIL.getAdmin().flush(tableName); 232 inject(); 233 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 234 fail("Should have thrown an exception"); 235 } catch (ScannerResetException expected) { 236 // expected 237 } catch (RetriesExhaustedException e) { 238 // expected 239 assertThat(e.getCause(), instanceOf(ScannerResetException.class)); 240 } 241 assertTrue(REQ_COUNT.get() >= 3); 242 } 243}