001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertNotNull; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022import static org.junit.jupiter.api.Assertions.fail; 023 024import java.io.IOException; 025import java.io.InputStream; 026import java.lang.ref.SoftReference; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.List; 030import org.apache.hadoop.fs.FSDataInputStream; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.FilterFileSystem; 033import org.apache.hadoop.fs.Path; 034import org.apache.hadoop.fs.PositionedReadable; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.KeyValue; 038import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.fs.HFileSystem; 046import org.apache.hadoop.hbase.io.hfile.CacheConfig; 047import org.apache.hadoop.hbase.io.hfile.HFileContext; 048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 049import org.apache.hadoop.hbase.io.hfile.HFileScanner; 050import org.apache.hadoop.hbase.testclassification.LargeTests; 051import org.apache.hadoop.hbase.testclassification.RegionServerTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.junit.jupiter.api.Assumptions; 054import org.junit.jupiter.api.BeforeEach; 055import org.junit.jupiter.api.Tag; 056import org.junit.jupiter.api.Test; 057import org.junit.jupiter.api.TestInfo; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Test cases that ensure that file system level errors are bubbled up appropriately to clients, 063 * rather than swallowed. 064 */ 065@Tag(RegionServerTests.TAG) 066@Tag(LargeTests.TAG) 067public class TestFSErrorsExposed { 068 069 private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class); 070 071 HBaseTestingUtil util = new HBaseTestingUtil(); 072 private String name; 073 074 @BeforeEach 075 public void setTestName(TestInfo testInfo) { 076 this.name = testInfo.getTestMethod().get().getName(); 077 } 078 079 /** 080 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 081 * HFile scanner 082 */ 083 @Test 084 public void testHFileScannerThrowsErrors() throws IOException { 085 Path hfilePath = new Path( 086 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 087 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 088 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 089 FileSystem fs = new HFileSystem(faultyfs); 090 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 091 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 092 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 093 .withOutputDir(hfilePath).withFileContext(meta).build(); 094 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 095 096 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(), 097 fs, writer.getPath(), true); 098 HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 099 sf.initReader(); 100 StoreFileReader reader = sf.getReader(); 101 try (HFileScanner scanner = reader.getScanner(false, true, false)) { 102 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 103 assertNotNull(inStream); 104 105 scanner.seekTo(); 106 // Do at least one successful read 107 assertTrue(scanner.next()); 108 109 faultyfs.startFaults(); 110 111 try { 112 int scanned = 0; 113 while (scanner.next()) { 114 scanned++; 115 } 116 fail("Scanner didn't throw after faults injected"); 117 } catch (IOException ioe) { 118 LOG.info("Got expected exception", ioe); 119 assertTrue(ioe.getMessage().contains("Fault")); 120 } 121 } 122 reader.close(true); // end of test so evictOnClose 123 } 124 125 /** 126 * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the 127 * StoreFileScanner 128 */ 129 @Test 130 public void testStoreFileScannerThrowsErrors() throws IOException { 131 Path hfilePath = new Path( 132 new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname"); 133 HFileSystem hfs = (HFileSystem) util.getTestFileSystem(); 134 FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs()); 135 HFileSystem fs = new HFileSystem(faultyfs); 136 CacheConfig cacheConf = new CacheConfig(util.getConfiguration()); 137 HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build(); 138 StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs) 139 .withOutputDir(hfilePath).withFileContext(meta).build(); 140 TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual")); 141 142 StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(), 143 fs, writer.getPath(), true); 144 HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf); 145 146 List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles( 147 Collections.singletonList(sf), false, true, false, false, 148 // 0 is passed as readpoint because this test operates on HStoreFile directly 149 0); 150 try { 151 KeyValueScanner scanner = scanners.get(0); 152 153 FaultyInputStream inStream = faultyfs.inStreams.get(0).get(); 154 assertNotNull(inStream); 155 156 scanner.seek(KeyValue.LOWESTKEY); 157 // Do at least one successful read 158 assertNotNull(scanner.next()); 159 faultyfs.startFaults(); 160 161 try { 162 int scanned = 0; 163 while (scanner.next() != null) { 164 scanned++; 165 } 166 fail("Scanner didn't throw after faults injected"); 167 } catch (IOException ioe) { 168 LOG.info("Got expected exception", ioe); 169 assertTrue(ioe.getMessage().contains("Could not iterate")); 170 } 171 } finally { 172 for (StoreFileScanner scanner : scanners) { 173 scanner.close(); 174 } 175 } 176 } 177 178 /** 179 * Cluster test which starts a region server with a region, then removes the data from HDFS 180 * underneath it, and ensures that errors are bubbled to the client. 181 */ 182 @Test 183 public void testFullSystemBubblesFSErrors() throws Exception { 184 // We won't have an error if the datanode is not there if we use short circuit 185 // it's a known 'feature'. 186 Assumptions.assumeTrue(!util.isReadShortCircuitOn()); 187 188 try { 189 // Make it fail faster. 190 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); 191 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000); 192 util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000); 193 util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000); 194 util.startMiniCluster(1); 195 final TableName tableName = TableName.valueOf(name); 196 byte[] fam = Bytes.toBytes("fam"); 197 198 Admin admin = util.getAdmin(); 199 TableDescriptor tableDescriptor = 200 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 201 .newBuilder(fam).setMaxVersions(1).setBlockCacheEnabled(false).build()).build(); 202 admin.createTable(tableDescriptor); 203 204 // Make a new Configuration so it makes a new connection that has the 205 // above configuration on it; else we use the old one w/ 10 as default. 206 try (Table table = util.getConnection().getTable(tableName)) { 207 // Load some data 208 util.loadTable(table, fam, false); 209 util.flush(); 210 HBaseTestingUtil.countRows(table); 211 212 // Kill the DFS cluster 213 util.getDFSCluster().shutdownDataNodes(); 214 215 try { 216 HBaseTestingUtil.countRows(table); 217 fail("Did not fail to count after removing data"); 218 } catch (Exception e) { 219 LOG.info("Got expected error", e); 220 assertTrue(e.getMessage().contains("Could not seek")); 221 } 222 } 223 224 // Restart data nodes so that HBase can shut down cleanly. 225 util.getDFSCluster().restartDataNodes(); 226 227 } finally { 228 SingleProcessHBaseCluster cluster = util.getMiniHBaseCluster(); 229 if (cluster != null) cluster.killAll(); 230 util.shutdownMiniCluster(); 231 } 232 } 233 234 static class FaultyFileSystem extends FilterFileSystem { 235 List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>(); 236 237 public FaultyFileSystem(FileSystem testFileSystem) { 238 super(testFileSystem); 239 } 240 241 @Override 242 public FSDataInputStream open(Path p, int bufferSize) throws IOException { 243 FSDataInputStream orig = fs.open(p, bufferSize); 244 FaultyInputStream faulty = new FaultyInputStream(orig); 245 inStreams.add(new SoftReference<>(faulty)); 246 return faulty; 247 } 248 249 /** 250 * Starts to simulate faults on all streams opened so far 251 */ 252 public void startFaults() { 253 for (SoftReference<FaultyInputStream> is : inStreams) { 254 is.get().startFaults(); 255 } 256 } 257 } 258 259 static class FaultyInputStream extends FSDataInputStream { 260 boolean faultsStarted = false; 261 262 public FaultyInputStream(InputStream in) throws IOException { 263 super(in); 264 } 265 266 public void startFaults() { 267 faultsStarted = true; 268 } 269 270 @Override 271 public int read(long position, byte[] buffer, int offset, int length) throws IOException { 272 injectFault(); 273 return ((PositionedReadable) in).read(position, buffer, offset, length); 274 } 275 276 private void injectFault() throws IOException { 277 if (faultsStarted) { 278 throw new IOException("Fault injected"); 279 } 280 } 281 } 282}