001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.lang.ref.SoftReference; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FilterFileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PositionedReadable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.MiniHBaseCluster; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.hfile.CacheConfig; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.io.hfile.HFileScanner; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.Assume; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Test cases that ensure that file system level errors are bubbled up appropriately to clients, 064 * rather than swallowed. 065 */ 066@Category({ RegionServerTests.class, LargeTests.class }) 067public class TestFSErrorsExposed { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestFSErrorsExposed.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class); 074 075 HBaseTestingUtility util = new HBaseTestingUtility(); 076 077 @Rule 078 public TestName name = new TestName(); 079 080 /** 081 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 082 * HFile scanner 083 */ 084 @Test 085 public void testHFileScannerThrowsErrors() throws IOException { 086 Path hfilePath = new Path( 087 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 088 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 089 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 090 FileSystem fs = new HFileSystem(faultyfs); 091 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 092 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 093 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 094 .withOutputDir(hfilePath).withFileContext(meta).build(); 095 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 096 097 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 098 BloomType.NONE, true); 099 sf.initReader(); 100 StoreFileReader reader = sf.getReader(); 101 HFileScanner scanner = reader.getScanner(false, true); 102 103 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 104 assertNotNull(inStream); 105 106 scanner.seekTo(); 107 // Do at least one successful read 108 assertTrue(scanner.next()); 109 110 faultyfs.startFaults(); 111 112 try { 113 int scanned = 0; 114 while (scanner.next()) { 115 scanned++; 116 } 117 fail("Scanner didn't throw after faults injected"); 118 } catch (IOException ioe) { 119 LOG.info("Got expected exception", ioe); 120 assertTrue(ioe.getMessage().contains("Fault")); 121 } 122 reader.close(true); // end of test so evictOnClose 123 } 124 125 /** 126 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 127 * StoreFileScanner 128 */ 129 @Test 130 public void testStoreFileScannerThrowsErrors() throws IOException { 131 Path hfilePath = new Path( 132 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 133 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 134 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 135 HFileSystem fs = new HFileSystem(faultyfs); 136 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 137 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 138 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 139 .withOutputDir(hfilePath).withFileContext(meta).build(); 140 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 141 142 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 143 BloomType.NONE, true); 144 145 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 146 Collections.singletonList(sf), false, true, false, false, 147 // 0 is passed as readpoint because this test operates on HStoreFile directly 148 0); 149 KeyValueScanner scanner = scanners.get(0); 150 151 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 152 assertNotNull(inStream); 153 154 scanner.seek(KeyValue.LOWESTKEY); 155 // Do at least one successful read 156 assertNotNull(scanner.next()); 157 faultyfs.startFaults(); 158 159 try { 160 int scanned = 0; 161 while (scanner.next() != null) { 162 scanned++; 163 } 164 fail("Scanner didn't throw after faults injected"); 165 } catch (IOException ioe) { 166 LOG.info("Got expected exception", ioe); 167 assertTrue(ioe.getMessage().contains("Could not iterate")); 168 } 169 scanner.close(); 170 } 171 172 /** 173 * Cluster test which starts a region server with a region, then removes the data from HDFS 174 * underneath it, and ensures that errors are bubbled to the client. 175 */ 176 @Test 177 public void testFullSystemBubblesFSErrors() throws Exception { 178 // We won't have an error if the datanode is not there if we use short circuit 179 // it's a known 'feature'. 180 Assume.assumeTrue(!util.isReadShortCircuitOn()); 181 182 try { 183 // Make it fail faster. 184 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 185 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000); 186 util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 187 util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 188 util.startMiniCluster(1); 189 final TableName tableName = TableName.valueOf(name.getMethodName()); 190 byte[] fam = Bytes.toBytes("fam"); 191 192 Admin admin = util.getAdmin(); 193 HTableDescriptor desc = new HTableDescriptor(tableName); 194 desc.addFamily(new HColumnDescriptor(fam).setMaxVersions(1).setBlockCacheEnabled(false)); 195 admin.createTable(desc); 196 197 // Make a new Configuration so it makes a new connection that has the 198 // above configuration on it; else we use the old one w/ 10 as default. 199 try (Table table = util.getConnection().getTable(tableName)) { 200 // Load some data 201 util.loadTable(table, fam, false); 202 util.flush(); 203 util.countRows(table); 204 205 // Kill the DFS cluster 206 util.getDFSCluster().shutdownDataNodes(); 207 208 try { 209 util.countRows(table); 210 fail("Did not fail to count after removing data"); 211 } catch (Exception e) { 212 LOG.info("Got expected error", e); 213 assertTrue(e.getMessage().contains("Could not seek")); 214 } 215 } 216 217 // Restart data nodes so that HBase can shut down cleanly. 218 util.getDFSCluster().restartDataNodes(); 219 220 } finally { 221 MiniHBaseCluster cluster = util.getMiniHBaseCluster(); 222 if (cluster != null) cluster.killAll(); 223 util.shutdownMiniCluster(); 224 } 225 } 226 227 static class FaultyFileSystem extends FilterFileSystem { 228 List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>(); 229 230 public FaultyFileSystem(FileSystem testFileSystem) { 231 super(testFileSystem); 232 } 233 234 @Override 235 public FSDataInputStream open(Path p, int bufferSize) throws IOException { 236 FSDataInputStream orig = fs.open(p, bufferSize); 237 FaultyInputStream faulty = new FaultyInputStream(orig); 238 inStreams.add(new SoftReference<>(faulty)); 239 return faulty; 240 } 241 242 /** 243 * Starts to simulate faults on all streams opened so far 244 */ 245 public void startFaults() { 246 for (SoftReference<FaultyInputStream> is : inStreams) { 247 is.get().startFaults(); 248 } 249 } 250 } 251 252 static class FaultyInputStream extends FSDataInputStream { 253 boolean faultsStarted = false; 254 255 public FaultyInputStream(InputStream in) throws IOException { 256 super(in); 257 } 258 259 public void startFaults() { 260 faultsStarted = true; 261 } 262 263 @Override 264 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 265 injectFault(); 266 return ((PositionedReadable) in).read(position, buffer, offset, length); 267 } 268 269 private void injectFault() throws IOException { 270 if (faultsStarted) { 271 throw new IOException("Fault injected"); 272 } 273 } 274 } 275}