001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.lang.ref.SoftReference; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FilterFileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PositionedReadable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HColumnDescriptor; 038import org.apache.hadoop.hbase.HConstants; 039import org.apache.hadoop.hbase.HTableDescriptor; 040import org.apache.hadoop.hbase.KeyValue; 041import org.apache.hadoop.hbase.MiniHBaseCluster; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.client.Admin; 044import org.apache.hadoop.hbase.client.Table; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.hfile.CacheConfig; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.io.hfile.HFileScanner; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.Assume; 054import org.junit.ClassRule; 055import org.junit.Rule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.junit.rules.TestName; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * Test cases that ensure that file system level errors are bubbled up 064 * appropriately to clients, rather than swallowed. 065 */ 066@Category({RegionServerTests.class, LargeTests.class}) 067public class TestFSErrorsExposed { 068 069 @ClassRule 070 public static final HBaseClassTestRule CLASS_RULE = 071 HBaseClassTestRule.forClass(TestFSErrorsExposed.class); 072 073 private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class); 074 075 HBaseTestingUtility util = new HBaseTestingUtility(); 076 077 @Rule 078 public TestName name = new TestName(); 079 080 /** 081 * Injects errors into the pread calls of an on-disk file, and makes 082 * sure those bubble up to the HFile scanner 083 */ 084 @Test 085 public void testHFileScannerThrowsErrors() throws IOException { 086 Path hfilePath = new Path(new Path( 087 util.getDataTestDir("internalScannerExposesErrors"), 088 "regionname"), "familyname"); 089 HFileSystem hfs = (HFileSystem)util.getTestFileSystem(); 090 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 091 FileSystem fs = new HFileSystem(faultyfs); 092 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 093 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 094 StoreFileWriter writer = new StoreFileWriter.Builder( 095 util.getConfiguration(), cacheConf, hfs) 096 .withOutputDir(hfilePath) 097 .withFileContext(meta) 098 .build(); 099 TestHStoreFile.writeStoreFile( 100 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 101 102 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 103 BloomType.NONE, true); 104 sf.initReader(); 105 StoreFileReader reader = sf.getReader(); 106 HFileScanner scanner = reader.getScanner(false, true); 107 108 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 109 assertNotNull(inStream); 110 111 scanner.seekTo(); 112 // Do at least one successful read 113 assertTrue(scanner.next()); 114 115 faultyfs.startFaults(); 116 117 try { 118 int scanned=0; 119 while (scanner.next()) { 120 scanned++; 121 } 122 fail("Scanner didn't throw after faults injected"); 123 } catch (IOException ioe) { 124 LOG.info("Got expected exception", ioe); 125 assertTrue(ioe.getMessage().contains("Fault")); 126 } 127 reader.close(true); // end of test so evictOnClose 128 } 129 130 /** 131 * Injects errors into the pread calls of an on-disk file, and makes 132 * sure those bubble up to the StoreFileScanner 133 */ 134 @Test 135 public void testStoreFileScannerThrowsErrors() throws IOException { 136 Path hfilePath = new Path(new Path( 137 util.getDataTestDir("internalScannerExposesErrors"), 138 "regionname"), "familyname"); 139 HFileSystem hfs = (HFileSystem)util.getTestFileSystem(); 140 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 141 HFileSystem fs = new HFileSystem(faultyfs); 142 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 143 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 144 StoreFileWriter writer = new StoreFileWriter.Builder( 145 util.getConfiguration(), cacheConf, hfs) 146 .withOutputDir(hfilePath) 147 .withFileContext(meta) 148 .build(); 149 TestHStoreFile.writeStoreFile( 150 writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 151 152 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 153 BloomType.NONE, true); 154 155 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 156 Collections.singletonList(sf), false, true, false, false, 157 // 0 is passed as readpoint because this test operates on HStoreFile directly 158 0); 159 KeyValueScanner scanner = scanners.get(0); 160 161 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 162 assertNotNull(inStream); 163 164 scanner.seek(KeyValue.LOWESTKEY); 165 // Do at least one successful read 166 assertNotNull(scanner.next()); 167 faultyfs.startFaults(); 168 169 try { 170 int scanned=0; 171 while (scanner.next() != null) { 172 scanned++; 173 } 174 fail("Scanner didn't throw after faults injected"); 175 } catch (IOException ioe) { 176 LOG.info("Got expected exception", ioe); 177 assertTrue(ioe.getMessage().contains("Could not iterate")); 178 } 179 scanner.close(); 180 } 181 182 /** 183 * Cluster test which starts a region server with a region, then 184 * removes the data from HDFS underneath it, and ensures that 185 * errors are bubbled to the client. 186 */ 187 @Test 188 public void testFullSystemBubblesFSErrors() throws Exception { 189 // We won't have an error if the datanode is not there if we use short circuit 190 // it's a known 'feature'. 191 Assume.assumeTrue(!util.isReadShortCircuitOn()); 192 193 try { 194 // Make it fail faster. 195 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 196 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000); 197 util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 198 util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 199 util.startMiniCluster(1); 200 final TableName tableName = TableName.valueOf(name.getMethodName()); 201 byte[] fam = Bytes.toBytes("fam"); 202 203 Admin admin = util.getAdmin(); 204 HTableDescriptor desc = new HTableDescriptor(tableName); 205 desc.addFamily(new HColumnDescriptor(fam) 206 .setMaxVersions(1) 207 .setBlockCacheEnabled(false) 208 ); 209 admin.createTable(desc); 210 211 // Make a new Configuration so it makes a new connection that has the 212 // above configuration on it; else we use the old one w/ 10 as default. 213 try (Table table = util.getConnection().getTable(tableName)) { 214 // Load some data 215 util.loadTable(table, fam, false); 216 util.flush(); 217 util.countRows(table); 218 219 // Kill the DFS cluster 220 util.getDFSCluster().shutdownDataNodes(); 221 222 try { 223 util.countRows(table); 224 fail("Did not fail to count after removing data"); 225 } catch (Exception e) { 226 LOG.info("Got expected error", e); 227 assertTrue(e.getMessage().contains("Could not seek")); 228 } 229 } 230 231 // Restart data nodes so that HBase can shut down cleanly. 232 util.getDFSCluster().restartDataNodes(); 233 234 } finally { 235 MiniHBaseCluster cluster = util.getMiniHBaseCluster(); 236 if (cluster != null) cluster.killAll(); 237 util.shutdownMiniCluster(); 238 } 239 } 240 241 static class FaultyFileSystem extends FilterFileSystem { 242 List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>(); 243 244 public FaultyFileSystem(FileSystem testFileSystem) { 245 super(testFileSystem); 246 } 247 248 @Override 249 public FSDataInputStream open(Path p, int bufferSize) throws IOException { 250 FSDataInputStream orig = fs.open(p, bufferSize); 251 FaultyInputStream faulty = new FaultyInputStream(orig); 252 inStreams.add(new SoftReference<>(faulty)); 253 return faulty; 254 } 255 256 /** 257 * Starts to simulate faults on all streams opened so far 258 */ 259 public void startFaults() { 260 for (SoftReference<FaultyInputStream> is: inStreams) { 261 is.get().startFaults(); 262 } 263 } 264 } 265 266 static class FaultyInputStream extends FSDataInputStream { 267 boolean faultsStarted = false; 268 269 public FaultyInputStream(InputStream in) throws IOException { 270 super(in); 271 } 272 273 public void startFaults() { 274 faultsStarted = true; 275 } 276 277 @Override 278 public int read(long position, byte[] buffer, int offset, int length) 279 throws IOException { 280 injectFault(); 281 return ((PositionedReadable)in).read(position, buffer, offset, length); 282 } 283 284 private void injectFault() throws IOException { 285 if (faultsStarted) { 286 throw new IOException("Fault injected"); 287 } 288 } 289 } 290}