001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.hamcrest.CoreMatchers.instanceOf; 021import static org.hamcrest.MatcherAssert.assertThat; 022import static org.junit.Assert.assertEquals; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.NavigableSet; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicLong; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Cell; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.HBaseClassTestRule; 038import org.apache.hadoop.hbase.HBaseTestingUtility; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.exceptions.ScannerResetException; 042import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HStore; 045import org.apache.hadoop.hbase.regionserver.KeyValueScanner; 046import org.apache.hadoop.hbase.regionserver.RegionServerServices; 047import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; 048import org.apache.hadoop.hbase.regionserver.ScanInfo; 049import org.apache.hadoop.hbase.regionserver.StoreScanner; 050import org.apache.hadoop.hbase.testclassification.ClientTests; 051import org.apache.hadoop.hbase.testclassification.MediumTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.wal.WAL; 054import org.junit.AfterClass; 055import org.junit.BeforeClass; 056import org.junit.ClassRule; 057import org.junit.Rule; 058import org.junit.Test; 059import org.junit.experimental.categories.Category; 060import org.junit.rules.TestName; 061 062@Category({ MediumTests.class, ClientTests.class }) 063public class TestFromClientSideScanExcpetion { 064 065 @ClassRule 066 public static final HBaseClassTestRule CLASS_RULE = 067 HBaseClassTestRule.forClass(TestFromClientSideScanExcpetion.class); 068 069 protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 070 071 private static byte[] FAMILY = Bytes.toBytes("testFamily"); 072 073 private static int SLAVES = 3; 074 075 @Rule 076 public TestName name = new TestName(); 077 078 @BeforeClass 079 public static void setUpBeforeClass() throws Exception { 080 Configuration conf = TEST_UTIL.getConfiguration(); 081 conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); 082 conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); 083 conf.setBoolean("hbase.client.log.scanner.activity", true); 084 // We need more than one region server in this test 085 TEST_UTIL.startMiniCluster(SLAVES); 086 } 087 088 @AfterClass 089 public static void tearDownAfterClass() throws Exception { 090 TEST_UTIL.shutdownMiniCluster(); 091 } 092 093 private static AtomicBoolean ON = new AtomicBoolean(false); 094 private static AtomicLong REQ_COUNT = new AtomicLong(0); 095 private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw 096 // DNRIOE 097 private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once 098 099 private static void reset() { 100 ON.set(false); 101 REQ_COUNT.set(0); 102 IS_DO_NOT_RETRY.set(false); 103 THROW_ONCE.set(true); 104 } 105 106 private static void inject() { 107 ON.set(true); 108 } 109 110 public static final class MyHRegion extends HRegion { 111 112 @SuppressWarnings("deprecation") 113 public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 114 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 115 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 116 } 117 118 @Override 119 protected HStore instantiateHStore(ColumnFamilyDescriptor family, boolean warmup) 120 throws IOException { 121 return new MyHStore(this, family, conf, warmup); 122 } 123 } 124 125 public static final class MyHStore extends HStore { 126 127 public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam, 128 boolean warmup) throws IOException { 129 super(region, family, confParam, warmup); 130 } 131 132 @Override 133 protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, 134 NavigableSet<byte[]> targetCols, long readPt) throws IOException { 135 return scan.isReversed() 136 ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) 137 : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); 138 } 139 } 140 141 public static final class MyStoreScanner extends StoreScanner { 142 public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, 143 long readPt) throws IOException { 144 super(store, scanInfo, scan, columns, readPt); 145 } 146 147 @Override 148 protected List<KeyValueScanner> selectScannersFrom(HStore store, 149 List<? extends KeyValueScanner> allScanners) { 150 List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); 151 List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); 152 for (KeyValueScanner scanner : scanners) { 153 newScanners.add(new DelegatingKeyValueScanner(scanner) { 154 @Override 155 public boolean reseek(Cell key) throws IOException { 156 if (ON.get()) { 157 REQ_COUNT.incrementAndGet(); 158 if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { 159 if (IS_DO_NOT_RETRY.get()) { 160 throw new DoNotRetryIOException("Injected exception"); 161 } else { 162 throw new IOException("Injected exception"); 163 } 164 } 165 } 166 return super.reseek(key); 167 } 168 }); 169 } 170 return newScanners; 171 } 172 } 173 174 /** 175 * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving 176 * the server side RegionScanner to be in dirty state. The client has to ensure that the 177 * ClientScanner does not get an exception and also sees all the data. nn 178 */ 179 @Test 180 public void testClientScannerIsResetWhenScanThrowsIOException() 181 throws IOException, InterruptedException { 182 reset(); 183 THROW_ONCE.set(true); // throw exceptions only once 184 TableName tableName = TableName.valueOf(name.getMethodName()); 185 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 186 int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); 187 TEST_UTIL.getAdmin().flush(tableName); 188 inject(); 189 int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 190 assertEquals(rowCount, actualRowCount); 191 } 192 assertTrue(REQ_COUNT.get() > 0); 193 } 194 195 /** 196 * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation 197 * is that the exception will bubble up to the client scanner instead of being retried. 198 */ 199 @Test 200 public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() 201 throws IOException, InterruptedException { 202 reset(); 203 IS_DO_NOT_RETRY.set(true); 204 TableName tableName = TableName.valueOf(name.getMethodName()); 205 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 206 TEST_UTIL.loadTable(t, FAMILY, false); 207 TEST_UTIL.getAdmin().flush(tableName); 208 inject(); 209 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 210 fail("Should have thrown an exception"); 211 } catch (DoNotRetryIOException expected) { 212 // expected 213 } 214 assertTrue(REQ_COUNT.get() > 0); 215 } 216 217 /** 218 * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is 219 * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying 220 * indefinitely. 221 */ 222 @Test 223 public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() 224 throws IOException, InterruptedException { 225 TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); 226 TableName tableName = TableName.valueOf(name.getMethodName()); 227 reset(); 228 THROW_ONCE.set(false); // throw exceptions in every retry 229 try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { 230 TEST_UTIL.loadTable(t, FAMILY, false); 231 TEST_UTIL.getAdmin().flush(tableName); 232 inject(); 233 TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); 234 fail("Should have thrown an exception"); 235 } catch (DoNotRetryIOException expected) { 236 assertThat(expected, instanceOf(ScannerResetException.class)); 237 // expected 238 } 239 assertTrue(REQ_COUNT.get() >= 3); 240 } 241 242}