001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertNotNull;
021import static org.junit.Assert.assertTrue;
022import static org.junit.Assert.fail;
023
024import java.io.IOException;
025import java.io.InputStream;
026import java.lang.ref.SoftReference;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.List;
030import org.apache.hadoop.fs.FSDataInputStream;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.FilterFileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.fs.PositionedReadable;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.MiniHBaseCluster;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Table;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.fs.HFileSystem;
046import org.apache.hadoop.hbase.io.hfile.CacheConfig;
047import org.apache.hadoop.hbase.io.hfile.HFileContext;
048import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
049import org.apache.hadoop.hbase.io.hfile.HFileScanner;
050import org.apache.hadoop.hbase.testclassification.LargeTests;
051import org.apache.hadoop.hbase.testclassification.RegionServerTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.junit.Assume;
054import org.junit.ClassRule;
055import org.junit.Rule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.junit.rules.TestName;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * Test cases that ensure that file system level errors are bubbled up
064 * appropriately to clients, rather than swallowed.
065 */
066@Category({RegionServerTests.class, LargeTests.class})
067public class TestFSErrorsExposed {
068
069  @ClassRule
070  public static final HBaseClassTestRule CLASS_RULE =
071      HBaseClassTestRule.forClass(TestFSErrorsExposed.class);
072
073  private static final Logger LOG = LoggerFactory.getLogger(TestFSErrorsExposed.class);
074
075  HBaseTestingUtility util = new HBaseTestingUtility();
076
077  @Rule
078  public TestName name = new TestName();
079
080  /**
081   * Injects errors into the pread calls of an on-disk file, and makes
082   * sure those bubble up to the HFile scanner
083   */
084  @Test
085  public void testHFileScannerThrowsErrors() throws IOException {
086    Path hfilePath = new Path(new Path(
087        util.getDataTestDir("internalScannerExposesErrors"),
088        "regionname"), "familyname");
089    HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
090    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
091    FileSystem fs = new HFileSystem(faultyfs);
092    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
093    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
094    StoreFileWriter writer = new StoreFileWriter.Builder(
095        util.getConfiguration(), cacheConf, hfs)
096            .withOutputDir(hfilePath)
097            .withFileContext(meta)
098            .build();
099    TestHStoreFile.writeStoreFile(
100        writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
101
102    HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
103        BloomType.NONE, true);
104    sf.initReader();
105    StoreFileReader reader = sf.getReader();
106    HFileScanner scanner = reader.getScanner(false, true);
107
108    FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
109    assertNotNull(inStream);
110
111    scanner.seekTo();
112    // Do at least one successful read
113    assertTrue(scanner.next());
114
115    faultyfs.startFaults();
116
117    try {
118      int scanned=0;
119      while (scanner.next()) {
120        scanned++;
121      }
122      fail("Scanner didn't throw after faults injected");
123    } catch (IOException ioe) {
124      LOG.info("Got expected exception", ioe);
125      assertTrue(ioe.getMessage().contains("Fault"));
126    }
127    reader.close(true); // end of test so evictOnClose
128  }
129
130  /**
131   * Injects errors into the pread calls of an on-disk file, and makes
132   * sure those bubble up to the StoreFileScanner
133   */
134  @Test
135  public void testStoreFileScannerThrowsErrors() throws IOException {
136    Path hfilePath = new Path(new Path(
137        util.getDataTestDir("internalScannerExposesErrors"),
138        "regionname"), "familyname");
139    HFileSystem hfs = (HFileSystem)util.getTestFileSystem();
140    FaultyFileSystem faultyfs = new FaultyFileSystem(hfs.getBackingFs());
141    HFileSystem fs = new HFileSystem(faultyfs);
142    CacheConfig cacheConf = new CacheConfig(util.getConfiguration());
143    HFileContext meta = new HFileContextBuilder().withBlockSize(2 * 1024).build();
144    StoreFileWriter writer = new StoreFileWriter.Builder(
145        util.getConfiguration(), cacheConf, hfs)
146            .withOutputDir(hfilePath)
147            .withFileContext(meta)
148            .build();
149    TestHStoreFile.writeStoreFile(
150        writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
151
152    HStoreFile sf = new HStoreFile(fs, writer.getPath(), util.getConfiguration(), cacheConf,
153        BloomType.NONE, true);
154
155    List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
156        Collections.singletonList(sf), false, true, false, false,
157        // 0 is passed as readpoint because this test operates on HStoreFile directly
158        0);
159    KeyValueScanner scanner = scanners.get(0);
160
161    FaultyInputStream inStream = faultyfs.inStreams.get(0).get();
162    assertNotNull(inStream);
163
164    scanner.seek(KeyValue.LOWESTKEY);
165    // Do at least one successful read
166    assertNotNull(scanner.next());
167    faultyfs.startFaults();
168
169    try {
170      int scanned=0;
171      while (scanner.next() != null) {
172        scanned++;
173      }
174      fail("Scanner didn't throw after faults injected");
175    } catch (IOException ioe) {
176      LOG.info("Got expected exception", ioe);
177      assertTrue(ioe.getMessage().contains("Could not iterate"));
178    }
179    scanner.close();
180  }
181
182  /**
183   * Cluster test which starts a region server with a region, then
184   * removes the data from HDFS underneath it, and ensures that
185   * errors are bubbled to the client.
186   */
187  @Test
188  public void testFullSystemBubblesFSErrors() throws Exception {
189    // We won't have an error if the datanode is not there if we use short circuit
190    //  it's a known 'feature'.
191    Assume.assumeTrue(!util.isReadShortCircuitOn());
192
193    try {
194      // Make it fail faster.
195      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
196      util.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 90000);
197      util.getConfiguration().setInt("hbase.lease.recovery.timeout", 10000);
198      util.getConfiguration().setInt("hbase.lease.recovery.dfs.timeout", 1000);
199      util.startMiniCluster(1);
200      final TableName tableName = TableName.valueOf(name.getMethodName());
201      byte[] fam = Bytes.toBytes("fam");
202
203      Admin admin = util.getAdmin();
204      TableDescriptorBuilder.ModifyableTableDescriptor tableDescriptor =
205        new TableDescriptorBuilder.ModifyableTableDescriptor(tableName);
206      tableDescriptor.setColumnFamily(
207        new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(fam)
208          .setMaxVersions(1)
209          .setBlockCacheEnabled(false)
210      );
211      admin.createTable(tableDescriptor);
212
213      // Make a new Configuration so it makes a new connection that has the
214      // above configuration on it; else we use the old one w/ 10 as default.
215      try (Table table = util.getConnection().getTable(tableName)) {
216        // Load some data
217        util.loadTable(table, fam, false);
218        util.flush();
219        util.countRows(table);
220
221        // Kill the DFS cluster
222        util.getDFSCluster().shutdownDataNodes();
223
224        try {
225          util.countRows(table);
226          fail("Did not fail to count after removing data");
227        } catch (Exception e) {
228          LOG.info("Got expected error", e);
229          assertTrue(e.getMessage().contains("Could not seek"));
230        }
231      }
232
233      // Restart data nodes so that HBase can shut down cleanly.
234      util.getDFSCluster().restartDataNodes();
235
236    } finally {
237      MiniHBaseCluster cluster = util.getMiniHBaseCluster();
238      if (cluster != null) cluster.killAll();
239      util.shutdownMiniCluster();
240    }
241  }
242
243  static class FaultyFileSystem extends FilterFileSystem {
244    List<SoftReference<FaultyInputStream>> inStreams = new ArrayList<>();
245
246    public FaultyFileSystem(FileSystem testFileSystem) {
247      super(testFileSystem);
248    }
249
250    @Override
251    public FSDataInputStream open(Path p, int bufferSize) throws IOException  {
252      FSDataInputStream orig = fs.open(p, bufferSize);
253      FaultyInputStream faulty = new FaultyInputStream(orig);
254      inStreams.add(new SoftReference<>(faulty));
255      return faulty;
256    }
257
258    /**
259     * Starts to simulate faults on all streams opened so far
260     */
261    public void startFaults() {
262      for (SoftReference<FaultyInputStream> is: inStreams) {
263        is.get().startFaults();
264      }
265    }
266  }
267
268  static class FaultyInputStream extends FSDataInputStream {
269    boolean faultsStarted = false;
270
271    public FaultyInputStream(InputStream in) throws IOException {
272      super(in);
273    }
274
275    public void startFaults() {
276      faultsStarted = true;
277    }
278
279    @Override
280    public int read(long position, byte[] buffer, int offset, int length)
281      throws IOException {
282      injectFault();
283      return ((PositionedReadable)in).read(position, buffer, offset, length);
284    }
285
286    private void injectFault() throws IOException {
287      if (faultsStarted) {
288        throw new IOException("Fault injected");
289      }
290    }
291  }
292}