001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertNotNull; 021import static org.junit.Assert.assertTrue; 022import static org.junit.Assert.fail; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.lang.ref.SoftReference; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FilterFileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PositionedReadable; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtil; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.KeyValue; 039import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.Admin; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Table; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.fs.HFileSystem; 047import org.apache.hadoop.hbase.io.hfile.CacheConfig; 048import org.apache.hadoop.hbase.io.hfile.HFileContext; 049import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 050import org.apache.hadoop.hbase.io.hfile.HFileScanner; 051import org.apache.hadoop.hbase.testclassification.LargeTests; 052import org.apache.hadoop.hbase.testclassification.RegionServerTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.junit.Assume; 055import org.junit.ClassRule; 056import org.junit.Rule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.junit.rules.TestName; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063/** 064 * Test cases that ensure that file system level errors are bubbled up appropriately to clients, 065 * rather than swallowed. 066 */ 067@Category({ RegionServerTests.class, LargeTests.class }) 068public class TestFSErrorsExposed { 069 070 @ClassRule 071 public static final HBaseClassTestRule CLASS_RULE = 072 HBaseClassTestRule.forClass(TestFSErrorsExposed.class); 073 074 private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class); 075 076 HBaseTestingUtil util = new HBaseTestingUtil(); 077 078 @Rule 079 public TestName name = new TestName(); 080 081 /** 082 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 083 * HFile scanner 084 */ 085 @Test 086 public void testHFileScannerThrowsErrors() throws IOException { 087 Path hfilePath = new Path( 088 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 089 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 090 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 091 FileSystem fs = new HFileSystem(faultyfs); 092 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 093 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 094 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 095 .withOutputDir(hfilePath).withFileContext(meta).build(); 096 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 097 098 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(), 099 fs, writer.getPath(), true); 100 HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 101 sf.initReader(); 102 StoreFileReader reader = sf.getReader(); 103 try (HFileScanner scanner = reader.getScanner(false, true, false)) { 104 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 105 assertNotNull(inStream); 106 107 scanner.seekTo(); 108 // Do at least one successful read 109 assertTrue(scanner.next()); 110 111 faultyfs.startFaults(); 112 113 try { 114 int scanned = 0; 115 while (scanner.next()) { 116 scanned++; 117 } 118 fail("Scanner didn't throw after faults injected"); 119 } catch (IOException ioe) { 120 LOG.info("Got expected exception", ioe); 121 assertTrue(ioe.getMessage().contains("Fault")); 122 } 123 } 124 reader.close(true); // end of test so evictOnClose 125 } 126 127 /** 128 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 129 * StoreFileScanner 130 */ 131 @Test 132 public void testStoreFileScannerThrowsErrors() throws IOException { 133 Path hfilePath = new Path( 134 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 135 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 136 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 137 HFileSystem fs = new HFileSystem(faultyfs); 138 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 139 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 140 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 141 .withOutputDir(hfilePath).withFileContext(meta).build(); 142 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 143 144 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(), 145 fs, writer.getPath(), true); 146 HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 147 148 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 149 Collections.singletonList(sf), false, true, false, false, 150 // 0 is passed as readpoint because this test operates on HStoreFile directly 151 0); 152 try { 153 KeyValueScanner scanner = scanners.get(0); 154 155 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 156 assertNotNull(inStream); 157 158 scanner.seek(KeyValue.LOWESTKEY); 159 // Do at least one successful read 160 assertNotNull(scanner.next()); 161 faultyfs.startFaults(); 162 163 try { 164 int scanned = 0; 165 while (scanner.next() != null) { 166 scanned++; 167 } 168 fail("Scanner didn't throw after faults injected"); 169 } catch (IOException ioe) { 170 LOG.info("Got expected exception", ioe); 171 assertTrue(ioe.getMessage().contains("Could not iterate")); 172 } 173 } finally { 174 for (StoreFileScanner scanner : scanners) { 175 scanner.close(); 176 } 177 } 178 } 179 180 /** 181 * Cluster test which starts a region server with a region, then removes the data from HDFS 182 * underneath it, and ensures that errors are bubbled to the client. 183 */ 184 @Test 185 public void testFullSystemBubblesFSErrors() throws Exception { 186 // We won't have an error if the datanode is not there if we use short circuit 187 // it's a known 'feature'. 188 Assume.assumeTrue(!util.isReadShortCircuitOn()); 189 190 try { 191 // Make it fail faster. 192 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 193 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000); 194 util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 195 util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 196 util.startMiniCluster(1); 197 final TableName tableName = TableName.valueOf(name.getMethodName()); 198 byte[] fam = Bytes.toBytes("fam"); 199 200 Admin admin = util.getAdmin(); 201 TableDescriptor tableDescriptor = 202 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 203 .newBuilder(fam).setMaxVersions(1).setBlockCacheEnabled(false).build()).build(); 204 admin.createTable(tableDescriptor); 205 206 // Make a new Configuration so it makes a new connection that has the 207 // above configuration on it; else we use the old one w/ 10 as default. 208 try (Table table = util.getConnection().getTable(tableName)) { 209 // Load some data 210 util.loadTable(table, fam, false); 211 util.flush(); 212 HBaseTestingUtil.countRows(table); 213 214 // Kill the DFS cluster 215 util.getDFSCluster().shutdownDataNodes(); 216 217 try { 218 HBaseTestingUtil.countRows(table); 219 fail("Did not fail to count after removing data"); 220 } catch (Exception e) { 221 LOG.info("Got expected error", e); 222 assertTrue(e.getMessage().contains("Could not seek")); 223 } 224 } 225 226 // Restart data nodes so that HBase can shut down cleanly. 227 util.getDFSCluster().restartDataNodes(); 228 229 } finally { 230 SingleProcessHBaseCluster cluster = util.getMiniHBaseCluster(); 231 if (cluster != null) cluster.killAll(); 232 util.shutdownMiniCluster(); 233 } 234 } 235 236 static class FaultyFileSystem extends FilterFileSystem { 237 List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>(); 238 239 public FaultyFileSystem(FileSystem testFileSystem) { 240 super(testFileSystem); 241 } 242 243 @Override 244 public FSDataInputStream open(Path p, int bufferSize) throws IOException { 245 FSDataInputStream orig = fs.open(p, bufferSize); 246 FaultyInputStream faulty = new FaultyInputStream(orig); 247 inStreams.add(new SoftReference<>(faulty)); 248 return faulty; 249 } 250 251 /** 252 * Starts to simulate faults on all streams opened so far 253 */ 254 public void startFaults() { 255 for (SoftReference<FaultyInputStream> is : inStreams) { 256 is.get().startFaults(); 257 } 258 } 259 } 260 261 static class FaultyInputStream extends FSDataInputStream { 262 boolean faultsStarted = false; 263 264 public FaultyInputStream(InputStream in) throws IOException { 265 super(in); 266 } 267 268 public void startFaults() { 269 faultsStarted = true; 270 } 271 272 @Override 273 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 274 injectFault(); 275 return ((PositionedReadable) in).read(position, buffer, offset, length); 276 } 277 278 private void injectFault() throws IOException { 279 if (faultsStarted) { 280 throw new IOException("Fault injected"); 281 } 282 } 283 } 284}