001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Optional; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseTestingUtility; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.TableNameTestRule; 034import org.apache.hadoop.hbase.client.Durability; 035import org.apache.hadoop.hbase.client.Put; 036import org.apache.hadoop.hbase.client.Result; 037import org.apache.hadoop.hbase.client.ResultScanner; 038import org.apache.hadoop.hbase.client.Scan; 039import org.apache.hadoop.hbase.client.Table; 040import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 041import org.apache.hadoop.hbase.coprocessor.ObserverContext; 042import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 043import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 044import org.apache.hadoop.hbase.coprocessor.RegionObserver; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.RegionServerTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.junit.AfterClass; 049import org.junit.BeforeClass; 050import org.junit.ClassRule; 051import org.junit.Rule; 052import org.junit.Test; 053import org.junit.experimental.categories.Category; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057@Category({RegionServerTests.class, MediumTests.class}) 058public class TestScannerRetriableFailure { 059 060 @ClassRule 061 public static final HBaseClassTestRule CLASS_RULE = 062 HBaseClassTestRule.forClass(TestScannerRetriableFailure.class); 063 064 private static final Logger LOG = LoggerFactory.getLogger(TestScannerRetriableFailure.class); 065 066 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 067 068 private static final String FAMILY_NAME_STR = "f"; 069 private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); 070 071 @Rule public TableNameTestRule testTable = new TableNameTestRule(); 072 073 public static class FaultyScannerObserver implements RegionCoprocessor, RegionObserver { 074 private int faults = 0; 075 076 @Override 077 public Optional<RegionObserver> getRegionObserver() { 078 return Optional.of(this); 079 } 080 081 @Override 082 public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, 083 final InternalScanner s, final List<Result> results, 084 final int limit, final boolean hasMore) throws IOException { 085 final TableName tableName = e.getEnvironment().getRegionInfo().getTable(); 086 if (!tableName.isSystemTable() && (faults++ % 2) == 0) { 087 LOG.debug(" Injecting fault in table=" + tableName + " scanner"); 088 throw new IOException("injected fault"); 089 } 090 return hasMore; 091 } 092 } 093 094 private static void setupConf(Configuration conf) { 095 conf.setLong("hbase.hstore.compaction.min", 20); 096 conf.setLong("hbase.hstore.compaction.max", 39); 097 conf.setLong("hbase.hstore.blockingStoreFiles", 40); 098 099 conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FaultyScannerObserver.class.getName()); 100 } 101 102 @BeforeClass 103 public static void setup() throws Exception { 104 setupConf(UTIL.getConfiguration()); 105 UTIL.startMiniCluster(1); 106 } 107 108 @AfterClass 109 public static void tearDown() throws Exception { 110 try { 111 UTIL.shutdownMiniCluster(); 112 } catch (Exception e) { 113 LOG.warn("failure shutting down cluster", e); 114 } 115 } 116 117 @Test 118 public void testFaultyScanner() throws Exception { 119 TableName tableName = testTable.getTableName(); 120 Table table = UTIL.createTable(tableName, FAMILY_NAME); 121 try { 122 final int NUM_ROWS = 100; 123 loadTable(table, NUM_ROWS); 124 checkTableRows(table, NUM_ROWS); 125 } finally { 126 table.close(); 127 } 128 } 129 130 // ========================================================================== 131 // Helpers 132 // ========================================================================== 133 private FileSystem getFileSystem() { 134 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); 135 } 136 137 private Path getRootDir() { 138 return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); 139 } 140 141 public void loadTable(final Table table, int numRows) throws IOException { 142 List<Put> puts = new ArrayList<>(numRows); 143 for (int i = 0; i < numRows; ++i) { 144 byte[] row = Bytes.toBytes(String.format("%09d", i)); 145 Put put = new Put(row); 146 put.setDurability(Durability.SKIP_WAL); 147 put.addColumn(FAMILY_NAME, null, row); 148 table.put(put); 149 } 150 } 151 152 private void checkTableRows(final Table table, int numRows) throws Exception { 153 Scan scan = new Scan(); 154 scan.setCaching(1); 155 scan.setCacheBlocks(false); 156 ResultScanner scanner = table.getScanner(scan); 157 try { 158 int count = 0; 159 for (int i = 0; i < numRows; ++i) { 160 byte[] row = Bytes.toBytes(String.format("%09d", i)); 161 Result result = scanner.next(); 162 assertTrue(result != null); 163 assertTrue(Bytes.equals(row, result.getRow())); 164 count++; 165 } 166 167 while (true) { 168 Result result = scanner.next(); 169 if (result == null) break; 170 count++; 171 } 172 assertEquals(numRows, count); 173 } finally { 174 scanner.close(); 175 } 176 } 177}