001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.lang.ref.SoftReference;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FilterFileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PositionedReadable;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HColumnDescriptor;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.HTableDescriptor;
040import org.apache.hadoop.hbase.KeyValue;
041import org.apache.hadoop.hbase.MiniHBaseCluster;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.Table;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.HFileContext;
048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
049import org.apache.hadoop.hbase.io.hfile.HFileScanner;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.Assume;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Test cases that ensure that file system level errors are bubbled up appropriately to clients,
064 * rather than swallowed.
065 */
066@Category({ RegionServerTests.class, LargeTests.class })
067public class TestFSErrorsExposed {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071    HBaseClassTestRule.forClass(TestFSErrorsExposed.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class);
074
075  HBaseTestingUtility util = new HBaseTestingUtility();
076
077  @Rule
078  public TestName name = new TestName();
079
080  /**
081   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
082   * HFile scanner
083   */
084  @Test
085  public void testHFileScannerThrowsErrors() throws IOException {
086    Path hfilePath = new Path(
087      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
088    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
089    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
090    FileSystem fs = new HFileSystem(faultyfs);
091    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
092    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
093    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
094      .withOutputDir(hfilePath).withFileContext(meta).build();
095    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
096
097    HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
098      BloomType.NONE, true);
099    sf.initReader();
100    StoreFileReader reader = sf.getReader();
101    HFileScanner scanner = reader.getScanner(false, true);
102
103    FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
104    assertNotNull(inStream);
105
106    scanner.seekTo();
107    // Do at least one successful read
108    assertTrue(scanner.next());
109
110    faultyfs.startFaults();
111
112    try {
113      int scanned = 0;
114      while (scanner.next()) {
115        scanned++;
116      }
117      fail("Scanner didn't throw after faults injected");
118    } catch (IOException ioe) {
119      LOG.info("Got expected exception", ioe);
120      assertTrue(ioe.getMessage().contains("Fault"));
121    }
122    reader.close(true); // end of test so evictOnClose
123  }
124
125  /**
126   * Injects errors into the pread calls of an on-disk file, and makes sure those bubble up to the
127   * StoreFileScanner
128   */
129  @Test
130  public void testStoreFileScannerThrowsErrors() throws IOException {
131    Path hfilePath = new Path(
132      new Path(util.getDataTestDir("internalScannerExposesErrors"), "regionname"), "familyname");
133    HFileSystem hfs = (HFileSystem) util.getTestFileSystem();
134    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
135    HFileSystem fs = new HFileSystem(faultyfs);
136    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
137    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
138    StoreFileWriter writer = new StoreFileWriter.Builder(util.getConfiguration(), cacheConf, hfs)
139      .withOutputDir(hfilePath).withFileContext(meta).build();
140    TestHStoreFile.writeStoreFile(writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
141
142    HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
143      BloomType.NONE, true);
144
145    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
146      Collections.singletonList(sf), false, true, false, false,
147      // 0 is passed as readpoint because this test operates on HStoreFile directly
148      0);
149    KeyValueScanner scanner = scanners.get(0);
150
151    FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
152    assertNotNull(inStream);
153
154    scanner.seek(KeyValue.LOWESTKEY);
155    // Do at least one successful read
156    assertNotNull(scanner.next());
157    faultyfs.startFaults();
158
159    try {
160      int scanned = 0;
161      while (scanner.next() != null) {
162        scanned++;
163      }
164      fail("Scanner didn't throw after faults injected");
165    } catch (IOException ioe) {
166      LOG.info("Got expected exception", ioe);
167      assertTrue(ioe.getMessage().contains("Could not iterate"));
168    }
169    scanner.close();
170  }
171
172  /**
173   * Cluster test which starts a region server with a region, then removes the data from HDFS
174   * underneath it, and ensures that errors are bubbled to the client.
175   */
176  @Test
177  public void testFullSystemBubblesFSErrors() throws Exception {
178    // We won't have an error if the datanode is not there if we use short circuit
179    // it's a known 'feature'.
180    Assume.assumeTrue(!util.isReadShortCircuitOn());
181
182    try {
183      // Make it fail faster.
184      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
185      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000);
186      util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
187      util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
188      util.startMiniCluster(1);
189      final TableName tableName = TableName.valueOf(name.getMethodName());
190      byte[] fam = Bytes.toBytes("fam");
191
192      Admin admin = util.getAdmin();
193      HTableDescriptor desc = new HTableDescriptor(tableName);
194      desc.addFamily(new HColumnDescriptor(fam).setMaxVersions(1).setBlockCacheEnabled(false));
195      admin.createTable(desc);
196
197      // Make a new Configuration so it makes a new connection that has the
198      // above configuration on it; else we use the old one w/ 10 as default.
199      try (Table table = util.getConnection().getTable(tableName)) {
200        // Load some data
201        util.loadTable(table, fam, false);
202        util.flush();
203        util.countRows(table);
204
205        // Kill the DFS cluster
206        util.getDFSCluster().shutdownDataNodes();
207
208        try {
209          util.countRows(table);
210          fail("Did not fail to count after removing data");
211        } catch (Exception e) {
212          LOG.info("Got expected error", e);
213          assertTrue(e.getMessage().contains("Could not seek"));
214        }
215      }
216
217      // Restart data nodes so that HBase can shut down cleanly.
218      util.getDFSCluster().restartDataNodes();
219
220    } finally {
221      MiniHBaseCluster cluster = util.getMiniHBaseCluster();
222      if (cluster != null) cluster.killAll();
223      util.shutdownMiniCluster();
224    }
225  }
226
227  static class FaultyFileSystem extends FilterFileSystem {
228    List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>();
229
230    public FaultyFileSystem(FileSystem testFileSystem) {
231      super(testFileSystem);
232    }
233
234    @Override
235    public FSDataInputStream open(Path p, int bufferSize) throws IOException {
236      FSDataInputStream orig = fs.open(p, bufferSize);
237      FaultyInputStream faulty = new FaultyInputStream(orig);
238      inStreams.add(new SoftReference<>(faulty));
239      return faulty;
240    }
241
242    /**
243     * Starts to simulate faults on all streams opened so far
244     */
245    public void startFaults() {
246      for (SoftReference<FaultyInputStream> is : inStreams) {
247        is.get().startFaults();
248      }
249    }
250  }
251
252  static class FaultyInputStream extends FSDataInputStream {
253    boolean faultsStarted = false;
254
255    public FaultyInputStream(InputStream in) throws IOException {
256      super(in);
257    }
258
259    public void startFaults() {
260      faultsStarted = true;
261    }
262
263    @Override
264    public int read(long position, byte[] buffer, int offset, int length) throws IOException {
265      injectFault();
266      return ((PositionedReadable) in).read(position, buffer, offset, length);
267    }
268
269    private void injectFault() throws IOException {
270      if (faultsStarted) {
271        throw new IOException("Fault injected");
272      }
273    }
274  }
275}