001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.lang.ref.SoftReference; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FilterFileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PositionedReadable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.fs.HFileSystem; 047import org.apache.hadoop.hbase.io.hfile.CacheConfig; 048import org.apache.hadoop.hbase.io.hfile.HFileContext; 049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 050import org.apache.hadoop.hbase.io.hfile.HFileScanner; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.Assume; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Test cases that ensure that file system level errors are bubbled up appropriately to clients, 065 * rather than swallowed. 066 */ 067@Category({ RegionServerTests.class, LargeTests.class }) 068public class TestFSErrorsExposed { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestFSErrorsExposed.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class); 075 076 HBaseTestingUtil util = new HBaseTestingUtil(); 077 078 @Rule 079 public TestName name = new TestName(); 080 081 /** 082 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 083 * HFile scanner 084 */ 085 @Test 086 public void testHFileScannerThrowsErrors() throws IOException { 087 Path hfilePath = new Path( 088 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 089 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 090 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 091 FileSystem fs = new HFileSystem(faultyfs); 092 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 093 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 094 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 095 .withOutputDir(hfilePath).withFileContext(meta).build(); 096 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 097 098 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 099 BloomType.NONE, true); 100 sf.initReader(); 101 StoreFileReader reader = sf.getReader(); 102 HFileScanner scanner = reader.getScanner(false, true); 103 104 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 105 assertNotNull(inStream); 106 107 scanner.seekTo(); 108 // Do at least one successful read 109 assertTrue(scanner.next()); 110 111 faultyfs.startFaults(); 112 113 try { 114 int scanned = 0; 115 while (scanner.next()) { 116 scanned++; 117 } 118 fail("Scanner didn't throw after faults injected"); 119 } catch (IOException ioe) { 120 LOG.info("Got expected exception", ioe); 121 assertTrue(ioe.getMessage().contains("Fault")); 122 } 123 reader.close(true); // end of test so evictOnClose 124 } 125 126 /** 127 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 128 * StoreFileScanner 129 */ 130 @Test 131 public void testStoreFileScannerThrowsErrors() throws IOException { 132 Path hfilePath = new Path( 133 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 134 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 135 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 136 HFileSystem fs = new HFileSystem(faultyfs); 137 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 138 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 139 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 140 .withOutputDir(hfilePath).withFileContext(meta).build(); 141 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 142 143 HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf, 144 BloomType.NONE, true); 145 146 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 147 Collections.singletonList(sf), false, true, false, false, 148 // 0 is passed as readpoint because this test operates on HStoreFile directly 149 0); 150 KeyValueScanner scanner = scanners.get(0); 151 152 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 153 assertNotNull(inStream); 154 155 scanner.seek(KeyValue.LOWESTKEY); 156 // Do at least one successful read 157 assertNotNull(scanner.next()); 158 faultyfs.startFaults(); 159 160 try { 161 int scanned = 0; 162 while (scanner.next() != null) { 163 scanned++; 164 } 165 fail("Scanner didn't throw after faults injected"); 166 } catch (IOException ioe) { 167 LOG.info("Got expected exception", ioe); 168 assertTrue(ioe.getMessage().contains("Could not iterate")); 169 } 170 scanner.close(); 171 } 172 173 /** 174 * Cluster test which starts a region server with a region, then removes the data from HDFS 175 * underneath it, and ensures that errors are bubbled to the client. 176 */ 177 @Test 178 public void testFullSystemBubblesFSErrors() throws Exception { 179 // We won't have an error if the datanode is not there if we use short circuit 180 // it's a known 'feature'. 181 Assume.assumeTrue(!util.isReadShortCircuitOn()); 182 183 try { 184 // Make it fail faster. 185 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 186 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000); 187 util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 188 util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 189 util.startMiniCluster(1); 190 final TableName tableName = TableName.valueOf(name.getMethodName()); 191 byte[] fam = Bytes.toBytes("fam"); 192 193 Admin admin = util.getAdmin(); 194 TableDescriptor tableDescriptor = 195 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 196 .newBuilder(fam).setMaxVersions(1).setBlockCacheEnabled(false).build()).build(); 197 admin.createTable(tableDescriptor); 198 199 // Make a new Configuration so it makes a new connection that has the 200 // above configuration on it; else we use the old one w/ 10 as default. 201 try (Table table = util.getConnection().getTable(tableName)) { 202 // Load some data 203 util.loadTable(table, fam, false); 204 util.flush(); 205 util.countRows(table); 206 207 // Kill the DFS cluster 208 util.getDFSCluster().shutdownDataNodes(); 209 210 try { 211 util.countRows(table); 212 fail("Did not fail to count after removing data"); 213 } catch (Exception e) { 214 LOG.info("Got expected error", e); 215 assertTrue(e.getMessage().contains("Could not seek")); 216 } 217 } 218 219 // Restart data nodes so that HBase can shut down cleanly. 220 util.getDFSCluster().restartDataNodes(); 221 222 } finally { 223 SingleProcessHBaseCluster cluster = util.getMiniHBaseCluster(); 224 if (cluster != null) cluster.killAll(); 225 util.shutdownMiniCluster(); 226 } 227 } 228 229 static class FaultyFileSystem extends FilterFileSystem { 230 List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>(); 231 232 public FaultyFileSystem(FileSystem testFileSystem) { 233 super(testFileSystem); 234 } 235 236 @Override 237 public FSDataInputStream open(Path p, int bufferSize) throws IOException { 238 FSDataInputStream orig = fs.open(p, bufferSize); 239 FaultyInputStream faulty = new FaultyInputStream(orig); 240 inStreams.add(new SoftReference<>(faulty)); 241 return faulty; 242 } 243 244 /** 245 * Starts to simulate faults on all streams opened so far 246 */ 247 public void startFaults() { 248 for (SoftReference<FaultyInputStream> is : inStreams) { 249 is.get().startFaults(); 250 } 251 } 252 } 253 254 static class FaultyInputStream extends FSDataInputStream { 255 boolean faultsStarted = false; 256 257 public FaultyInputStream(InputStream in) throws IOException { 258 super(in); 259 } 260 261 public void startFaults() { 262 faultsStarted = true; 263 } 264 265 @Override 266 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 267 injectFault(); 268 return ((PositionedReadable) in).read(position, buffer, offset, length); 269 } 270 271 private void injectFault() throws IOException { 272 if (faultsStarted) { 273 throw new IOException("Fault injected"); 274 } 275 } 276 } 277}