001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertThat; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.RegionServerServices; 047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.StoreScanner; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.junit.AfterClass; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061 062@Category({ MediumTests.class, ClientTests.class }) 063public class TestFromClientSideScanExcpetion { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class); 068 069 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 070 071 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 072 073 private static int SLAVES = 3; 074 075 @Rule 076 public TestName name = new TestName(); 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 Configuration conf = TEST_UTIL.getConfiguration(); 081 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); 082 conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); 083 conf.setBoolean("hbase.client.log.scanner.activity", true); 084 // We need more than one region server in this test 085 TEST_UTIL.startMiniCluster(SLAVES); 086 } 087 088 @AfterClass 089 public static void tearDownAfterClass() throws Exception { 090 TEST_UTIL.shutdownMiniCluster(); 091 } 092 093 private static AtomicBoolean ON = new AtomicBoolean(false); 094 private static AtomicLong REQ_COUNT = new AtomicLong(0); 095 private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw 096 // DNRIOE 097 private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once 098 099 private static void reset() { 100 ON.set(false); 101 REQ_COUNT.set(0); 102 IS_DO_NOT_RETRY.set(false); 103 THROW_ONCE.set(true); 104 } 105 106 private static void inject() { 107 ON.set(true); 108 } 109 110 public static final class MyHRegion extends HRegion { 111 112 @SuppressWarnings("deprecation") 113 public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 114 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 115 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 116 } 117 118 @Override 119 protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup) 120 throws IOException { 121 return new MyHStore(this, family, conf, warmup); 122 } 123 } 124 125 public static final class MyHStore extends HStore { 126 127 public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam, 128 boolean warmup) throws IOException { 129 super(region, family, confParam, warmup); 130 } 131 132 @Override 133 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 134 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 135 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 136 : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); 137 } 138 } 139 140 public static final class MyStoreScanner extends StoreScanner { 141 public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 142 long readPt) throws IOException { 143 super(store, scanInfo, scan, columns, readPt); 144 } 145 146 @Override 147 protected List<KeyValueScanner> selectScannersFrom(HStore store, 148 List<? extends KeyValueScanner> allScanners) { 149 List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); 150 List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); 151 for (KeyValueScanner scanner : scanners) { 152 newScanners.add(new DelegatingKeyValueScanner(scanner) { 153 @Override 154 public boolean reseek(Cell key) throws IOException { 155 if (ON.get()) { 156 REQ_COUNT.incrementAndGet(); 157 if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { 158 if (IS_DO_NOT_RETRY.get()) { 159 throw new DoNotRetryIOException("Injected exception"); 160 } else { 161 throw new IOException("Injected exception"); 162 } 163 } 164 } 165 return super.reseek(key); 166 } 167 }); 168 } 169 return newScanners; 170 } 171 } 172 173 /** 174 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving 175 * the server side RegionScanner to be in dirty state. The client has to ensure that the 176 * ClientScanner does not get an exception and also sees all the data. 177 * @throws IOException 178 * @throws InterruptedException 179 */ 180 @Test 181 public void testClientScannerIsResetWhenScanThrowsIOException() 182 throws IOException, InterruptedException { 183 reset(); 184 THROW_ONCE.set(true); // throw exceptions only once 185 TableName tableName = TableName.valueOf(name.getMethodName()); 186 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 187 int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); 188 TEST_UTIL.getAdmin().flush(tableName); 189 inject(); 190 int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 191 assertEquals(rowCount, actualRowCount); 192 } 193 assertTrue(REQ_COUNT.get() > 0); 194 } 195 196 /** 197 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation 198 * is that the exception will bubble up to the client scanner instead of being retried. 199 */ 200 @Test 201 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() 202 throws IOException, InterruptedException { 203 reset(); 204 IS_DO_NOT_RETRY.set(true); 205 TableName tableName = TableName.valueOf(name.getMethodName()); 206 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 207 TEST_UTIL.loadTable(t, FAMILY, false); 208 TEST_UTIL.getAdmin().flush(tableName); 209 inject(); 210 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 211 fail("Should have thrown an exception"); 212 } catch (DoNotRetryIOException expected) { 213 // expected 214 } 215 assertTrue(REQ_COUNT.get() > 0); 216 } 217 218 /** 219 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is 220 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying 221 * indefinitely. 222 */ 223 @Test 224 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() 225 throws IOException, InterruptedException { 226 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 227 TableName tableName = TableName.valueOf(name.getMethodName()); 228 reset(); 229 THROW_ONCE.set(false); // throw exceptions in every retry 230 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 231 TEST_UTIL.loadTable(t, FAMILY, false); 232 TEST_UTIL.getAdmin().flush(tableName); 233 inject(); 234 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 235 fail("Should have thrown an exception"); 236 } catch (DoNotRetryIOException expected) { 237 assertThat(expected, instanceOf(ScannerResetException.class)); 238 // expected 239 } 240 assertTrue(REQ_COUNT.get() >= 3); 241 } 242 243}