001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertNotNull;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022import static org.junit.jupiter.api.Assertions.fail;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.lang.ref.SoftReference;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FilterFileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PositionedReadable;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.HFileContext;
048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
049import org.apache.hadoop.hbase.io.hfile.HFileScanner;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.jupiter.api.Assumptions;
054import org.junit.jupiter.api.BeforeEach;
055import org.junit.jupiter.api.Tag;
056import org.junit.jupiter.api.Test;
057import org.junit.jupiter.api.TestInfo;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Test cases that ensure that file system level errors are bubbled up appropriately to clients,
063 * rather than swallowed.
064 */
065@Tag(RegionServerTests.TAG)
066@Tag(LargeTests.TAG)
067public class TestFSErrorsExposed {
068
069  private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class);
070
071  HBaseTestingUtil util = new HBaseTestingUtil();
072  private String name;
073
074  @BeforeEach
075  public void setTestName(TestInfo testInfo) {
076    this.name = testInfo.getTestMethod().get().getName();
077  }
078
079  /**
080   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
081   * HFile scanner
082   */
083  @Test
084  public void testHFileScannerThrowsErrors() throws IOException {
085    Path hfilePath = new Path(
086      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
087    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
088    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
089    FileSystem fs = new HFileSystem(faultyfs);
090    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
091    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
092    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
093      .withOutputDir(hfilePath).withFileContext(meta).build();
094    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
095
096    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(),
097      fs, writer.getPath(), true);
098    HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
099    sf.initReader();
100    StoreFileReader reader = sf.getReader();
101    try (HFileScanner scanner = reader.getScanner(false, true, false)) {
102      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
103      assertNotNull(inStream);
104
105      scanner.seekTo();
106      // Do at least one successful read
107      assertTrue(scanner.next());
108
109      faultyfs.startFaults();
110
111      try {
112        int scanned = 0;
113        while (scanner.next()) {
114          scanned++;
115        }
116        fail("Scanner didn't throw after faults injected");
117      } catch (IOException ioe) {
118        LOG.info("Got expected exception", ioe);
119        assertTrue(ioe.getMessage().contains("Fault"));
120      }
121    }
122    reader.close(true); // end of test so evictOnClose
123  }
124
125  /**
126   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
127   * StoreFileScanner
128   */
129  @Test
130  public void testStoreFileScannerThrowsErrors() throws IOException {
131    Path hfilePath = new Path(
132      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
133    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
134    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
135    HFileSystem fs = new HFileSystem(faultyfs);
136    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
137    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
138    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
139      .withOutputDir(hfilePath).withFileContext(meta).build();
140    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
141
142    StoreFileInfo storeFileInfo = StoreFileInfo.createStoreFileInfoForHFile(util.getConfiguration(),
143      fs, writer.getPath(), true);
144    HStoreFile sf = new HStoreFile(storeFileInfo, BloomType.NONE, cacheConf);
145
146    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
147      Collections.singletonList(sf), false, true, false, false,
148      // 0 is passed as readpoint because this test operates on HStoreFile directly
149      0);
150    try {
151      KeyValueScanner scanner = scanners.get(0);
152
153      FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
154      assertNotNull(inStream);
155
156      scanner.seek(KeyValue.LOWESTKEY);
157      // Do at least one successful read
158      assertNotNull(scanner.next());
159      faultyfs.startFaults();
160
161      try {
162        int scanned = 0;
163        while (scanner.next() != null) {
164          scanned++;
165        }
166        fail("Scanner didn't throw after faults injected");
167      } catch (IOException ioe) {
168        LOG.info("Got expected exception", ioe);
169        assertTrue(ioe.getMessage().contains("Could not iterate"));
170      }
171    } finally {
172      for (StoreFileScanner scanner : scanners) {
173        scanner.close();
174      }
175    }
176  }
177
178  /**
179   * Cluster test which starts a region server with a region, then removes the data from HDFS
180   * underneath it, and ensures that errors are bubbled to the client.
181   */
182  @Test
183  public void testFullSystemBubblesFSErrors() throws Exception {
184    // We won't have an error if the datanode is not there if we use short circuit
185    // it's a known 'feature'.
186    Assumptions.assumeTrue(!util.isReadShortCircuitOn());
187
188    try {
189      // Make it fail faster.
190      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
191      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000);
192      util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
193      util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
194      util.startMiniCluster(1);
195      final TableName tableName = TableName.valueOf(name);
196      byte[] fam = Bytes.toBytes("fam");
197
198      Admin admin = util.getAdmin();
199      TableDescriptor tableDescriptor =
200        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
201          .newBuilder(fam).setMaxVersions(1).setBlockCacheEnabled(false).build()).build();
202      admin.createTable(tableDescriptor);
203
204      // Make a new Configuration so it makes a new connection that has the
205      // above configuration on it; else we use the old one w/ 10 as default.
206      try (Table table = util.getConnection().getTable(tableName)) {
207        // Load some data
208        util.loadTable(table, fam, false);
209        util.flush();
210        HBaseTestingUtil.countRows(table);
211
212        // Kill the DFS cluster
213        util.getDFSCluster().shutdownDataNodes();
214
215        try {
216          HBaseTestingUtil.countRows(table);
217          fail("Did not fail to count after removing data");
218        } catch (Exception e) {
219          LOG.info("Got expected error", e);
220          assertTrue(e.getMessage().contains("Could not seek"));
221        }
222      }
223
224      // Restart data nodes so that HBase can shut down cleanly.
225      util.getDFSCluster().restartDataNodes();
226
227    } finally {
228      SingleProcessHBaseCluster cluster = util.getMiniHBaseCluster();
229      if (cluster != null) cluster.killAll();
230      util.shutdownMiniCluster();
231    }
232  }
233
234  static class FaultyFileSystem extends FilterFileSystem {
235    List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>();
236
237    public FaultyFileSystem(FileSystem testFileSystem) {
238      super(testFileSystem);
239    }
240
241    @Override
242    public FSDataInputStream open(Path p, int bufferSize) throws IOException {
243      FSDataInputStream orig = fs.open(p, bufferSize);
244      FaultyInputStream faulty = new FaultyInputStream(orig);
245      inStreams.add(new SoftReference<>(faulty));
246      return faulty;
247    }
248
249    /**
250     * Starts to simulate faults on all streams opened so far
251     */
252    public void startFaults() {
253      for (SoftReference<FaultyInputStream> is : inStreams) {
254        is.get().startFaults();
255      }
256    }
257  }
258
259  static class FaultyInputStream extends FSDataInputStream {
260    boolean faultsStarted = false;
261
262    public FaultyInputStream(InputStream in) throws IOException {
263      super(in);
264    }
265
266    public void startFaults() {
267      faultsStarted = true;
268    }
269
270    @Override
271    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
272      injectFault();
273      return ((PositionedReadable) in).read(position, buffer, offset, length);
274    }
275
276    private void injectFault() throws IOException {
277      if (faultsStarted) {
278        throw new IOException("Fault injected");
279      }
280    }
281  }
282}