001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Optional;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.TableName;
032import org.apache.hadoop.hbase.client.Durability;
033import org.apache.hadoop.hbase.client.Put;
034import org.apache.hadoop.hbase.client.Result;
035import org.apache.hadoop.hbase.client.ResultScanner;
036import org.apache.hadoop.hbase.client.Scan;
037import org.apache.hadoop.hbase.client.Table;
038import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
039import org.apache.hadoop.hbase.coprocessor.ObserverContext;
040import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
041import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
042import org.apache.hadoop.hbase.coprocessor.RegionObserver;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.RegionServerTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.junit.jupiter.api.AfterAll;
047import org.junit.jupiter.api.BeforeAll;
048import org.junit.jupiter.api.Tag;
049import org.junit.jupiter.api.Test;
050import org.junit.jupiter.api.TestInfo;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054@Tag(RegionServerTests.TAG)
055@Tag(MediumTests.TAG)
056public class TestScannerRetriableFailure {
057
058  private static final Logger LOG = LoggerFactory.getLogger(TestScannerRetriableFailure.class);
059
060  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
061
062  private static final String FAMILY_NAME_STR = "f";
063  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
064
065  public static class FaultyScannerObserver implements RegionCoprocessor, RegionObserver {
066    private int faults = 0;
067
068    @Override
069    public Optional<RegionObserver> getRegionObserver() {
070      return Optional.of(this);
071    }
072
073    @Override
074    public boolean preScannerNext(final ObserverContext<? extends RegionCoprocessorEnvironment> e,
075      final InternalScanner s, final List<Result> results, final int limit, final boolean hasMore)
076      throws IOException {
077      final TableName tableName = e.getEnvironment().getRegionInfo().getTable();
078      if (!tableName.isSystemTable() && (faults++ % 2) == 0) {
079        LOG.debug(" Injecting fault in table=" + tableName + " scanner");
080        throw new IOException("injected fault");
081      }
082      return hasMore;
083    }
084  }
085
086  private static void setupConf(Configuration conf) {
087    conf.setLong("hbase.hstore.compaction.min", 20);
088    conf.setLong("hbase.hstore.compaction.max", 39);
089    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
090
091    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FaultyScannerObserver.class.getName());
092  }
093
094  @BeforeAll
095  public static void setup() throws Exception {
096    setupConf(UTIL.getConfiguration());
097    UTIL.startMiniCluster(1);
098  }
099
100  @AfterAll
101  public static void tearDown() throws Exception {
102    try {
103      UTIL.shutdownMiniCluster();
104    } catch (Exception e) {
105      LOG.warn("failure shutting down cluster", e);
106    }
107  }
108
109  @Test
110  public void testFaultyScanner(TestInfo testInfo) throws Exception {
111    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
112    Table table = UTIL.createTable(tableName, FAMILY_NAME);
113    try {
114      final int NUM_ROWS = 100;
115      loadTable(table, NUM_ROWS);
116      checkTableRows(table, NUM_ROWS);
117    } finally {
118      table.close();
119    }
120  }
121
122  // ==========================================================================
123  // Helpers
124  // ==========================================================================
125  private FileSystem getFileSystem() {
126    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
127  }
128
129  private Path getRootDir() {
130    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
131  }
132
133  public void loadTable(final Table table, int numRows) throws IOException {
134    List<Put> puts = new ArrayList<>(numRows);
135    for (int i = 0; i < numRows; ++i) {
136      byte[] row = Bytes.toBytes(String.format("%09d", i));
137      Put put = new Put(row);
138      put.setDurability(Durability.SKIP_WAL);
139      put.addColumn(FAMILY_NAME, null, row);
140      table.put(put);
141    }
142  }
143
144  private void checkTableRows(final Table table, int numRows) throws Exception {
145    Scan scan = new Scan();
146    scan.setCaching(1);
147    scan.setCacheBlocks(false);
148    ResultScanner scanner = table.getScanner(scan);
149    try {
150      int count = 0;
151      for (int i = 0; i < numRows; ++i) {
152        byte[] row = Bytes.toBytes(String.format("%09d", i));
153        Result result = scanner.next();
154        assertTrue(result != null);
155        assertTrue(Bytes.equals(row, result.getRow()));
156        count++;
157      }
158
159      while (true) {
160        Result result = scanner.next();
161        if (result == null) break;
162        count++;
163      }
164      assertEquals(numRows, count);
165    } finally {
166      scanner.close();
167    }
168  }
169}