001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.RegionServerServices; 047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.StoreScanner; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.junit.AfterClass; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061 062@Category({ MediumTests.class, ClientTests.class }) 063public class TestFromClientSideScanExcpetion { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class); 068 069 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 070 071 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 072 073 private static int SLAVES = 3; 074 075 @Rule 076 public TestName name = new TestName(); 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 Configuration conf = TEST_UTIL.getConfiguration(); 081 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 082 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); 083 conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); 084 conf.setBoolean("hbase.client.log.scanner.activity", true); 085 // We need more than one region server in this test 086 TEST_UTIL.startMiniCluster(SLAVES); 087 } 088 089 @AfterClass 090 public static void tearDownAfterClass() throws Exception { 091 TEST_UTIL.shutdownMiniCluster(); 092 } 093 094 private static AtomicBoolean ON = new AtomicBoolean(false); 095 private static AtomicLong REQ_COUNT = new AtomicLong(0); 096 private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw 097 // DNRIOE 098 private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once 099 100 private static void reset() { 101 ON.set(false); 102 REQ_COUNT.set(0); 103 IS_DO_NOT_RETRY.set(false); 104 THROW_ONCE.set(true); 105 } 106 107 private static void inject() { 108 ON.set(true); 109 } 110 111 public static final class MyHRegion extends HRegion { 112 113 @SuppressWarnings("deprecation") 114 public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 115 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 116 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 117 } 118 119 @Override 120 protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup) 121 throws IOException { 122 return new MyHStore(this, family, conf, warmup); 123 } 124 } 125 126 public static final class MyHStore extends HStore { 127 128 public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam, 129 boolean warmup) throws IOException { 130 super(region, family, confParam, warmup); 131 } 132 133 @Override 134 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 135 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 136 return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 137 : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); 138 } 139 } 140 141 public static final class MyStoreScanner extends StoreScanner { 142 public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 143 long readPt) throws IOException { 144 super(store, scanInfo, scan, columns, readPt); 145 } 146 147 @Override 148 protected List<KeyValueScanner> selectScannersFrom(HStore store, 149 List<? extends KeyValueScanner> allScanners) { 150 List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); 151 List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); 152 for (KeyValueScanner scanner : scanners) { 153 newScanners.add(new DelegatingKeyValueScanner(scanner) { 154 @Override 155 public boolean reseek(Cell key) throws IOException { 156 if (ON.get()) { 157 REQ_COUNT.incrementAndGet(); 158 if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { 159 if (IS_DO_NOT_RETRY.get()) { 160 throw new DoNotRetryIOException("Injected exception"); 161 } else { 162 throw new IOException("Injected exception"); 163 } 164 } 165 } 166 return super.reseek(key); 167 } 168 }); 169 } 170 return newScanners; 171 } 172 } 173 174 /** 175 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving 176 * the server side RegionScanner to be in dirty state. The client has to ensure that the 177 * ClientScanner does not get an exception and also sees all the data. 178 * @throws IOException 179 * @throws InterruptedException 180 */ 181 @Test 182 public void testClientScannerIsResetWhenScanThrowsIOException() 183 throws IOException, InterruptedException { 184 reset(); 185 THROW_ONCE.set(true); // throw exceptions only once 186 TableName tableName = TableName.valueOf(name.getMethodName()); 187 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 188 int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); 189 TEST_UTIL.getAdmin().flush(tableName); 190 inject(); 191 int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 192 assertEquals(rowCount, actualRowCount); 193 } 194 assertTrue(REQ_COUNT.get() > 0); 195 } 196 197 /** 198 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation 199 * is that the exception will bubble up to the client scanner instead of being retried. 200 */ 201 @Test 202 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() 203 throws IOException, InterruptedException { 204 reset(); 205 IS_DO_NOT_RETRY.set(true); 206 TableName tableName = TableName.valueOf(name.getMethodName()); 207 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 208 TEST_UTIL.loadTable(t, FAMILY, false); 209 TEST_UTIL.getAdmin().flush(tableName); 210 inject(); 211 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 212 fail("Should have thrown an exception"); 213 } catch (DoNotRetryIOException expected) { 214 // expected 215 } 216 assertTrue(REQ_COUNT.get() > 0); 217 } 218 219 /** 220 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is 221 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying 222 * indefinitely. 223 */ 224 @Test 225 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() 226 throws IOException, InterruptedException { 227 TableName tableName = TableName.valueOf(name.getMethodName()); 228 reset(); 229 THROW_ONCE.set(false); // throw exceptions in every retry 230 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 231 TEST_UTIL.loadTable(t, FAMILY, false); 232 TEST_UTIL.getAdmin().flush(tableName); 233 inject(); 234 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 235 fail("Should have thrown an exception"); 236 } catch (ScannerResetException expected) { 237 // expected 238 } catch (RetriesExhaustedException e) { 239 // expected 240 assertThat(e.getCause(), instanceOf(ScannerResetException.class)); 241 } 242 assertTrue(REQ_COUNT.get() >= 3); 243 } 244}