001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertThrows;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.junit.jupiter.api.Assertions.fail;
027import static org.mockito.ArgumentMatchers.any;
028import static org.mockito.ArgumentMatchers.anyBoolean;
029import static org.mockito.ArgumentMatchers.anyList;
030import static org.mockito.Mockito.doReturn;
031import static org.mockito.Mockito.spy;
032
033import java.io.IOException;
034import java.nio.ByteBuffer;
035import java.util.Deque;
036import java.util.List;
037import java.util.Map;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.atomic.AtomicInteger;
040import java.util.stream.IntStream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableExistsException;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.AsyncClusterConnection;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.regionserver.HRegionServer;
060import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.Pair;
064import org.junit.jupiter.api.AfterAll;
065import org.junit.jupiter.api.Test;
066import org.junit.jupiter.api.TestInfo;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
071
072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
073
074/**
075 * Test cases for the atomic load error handling of the bulk load functionality.
076 */
077public class BulkLoadHFilesSplitRecoveryTestBase {
078
079  private static final Logger LOG =
080    LoggerFactory.getLogger(BulkLoadHFilesSplitRecoveryTestBase.class);
081
082  static HBaseTestingUtil util;
083  // used by secure subclass
084  static boolean useSecure = false;
085
086  final static int NUM_CFS = 10;
087  final static byte[] QUAL = Bytes.toBytes("qual");
088  final static int ROWCOUNT = 100;
089
090  private final static byte[][] families = new byte[NUM_CFS][];
091
092  static {
093    for (int i = 0; i < NUM_CFS; i++) {
094      families[i] = Bytes.toBytes(family(i));
095    }
096  }
097
098  static byte[] rowkey(int i) {
099    return Bytes.toBytes(String.format("row_%08d", i));
100  }
101
102  static String family(int i) {
103    return String.format("family_%04d", i);
104  }
105
106  static byte[] value(int i) {
107    return Bytes.toBytes(String.format("%010d", i));
108  }
109
110  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
111    byte[] val = value(value);
112    for (int i = 0; i < NUM_CFS; i++) {
113      Path testIn = new Path(dir, family(i));
114
115      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
116        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
117    }
118  }
119
120  private TableDescriptor createTableDesc(TableName name, int cfs) {
121    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
122    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
123      .forEachOrdered(builder::setColumnFamily);
124    return builder.build();
125  }
126
127  /**
128   * Creates a table with given table name and specified number of column families if the table does
129   * not already exist.
130   */
131  private void setupTable(final Connection connection, TableName table, int cfs)
132    throws IOException {
133    try {
134      LOG.info("Creating table " + table);
135      try (Admin admin = connection.getAdmin()) {
136        admin.createTable(createTableDesc(table, cfs));
137      }
138    } catch (TableExistsException tee) {
139      LOG.info("Table " + table + " already exists");
140    }
141  }
142
143  /**
144   * Creates a table with given table name,specified number of column families<br>
145   * and splitkeys if the table does not already exist.
146   */
147  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
148    throws IOException {
149    try {
150      LOG.info("Creating table " + table);
151      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
152    } catch (TableExistsException tee) {
153      LOG.info("Table " + table + " already exists");
154    }
155  }
156
157  private Path buildBulkFiles(TableName table, int value) throws Exception {
158    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
159    Path bulk1 = new Path(dir, table.getNameAsString() + value);
160    FileSystem fs = util.getTestFileSystem();
161    buildHFiles(fs, bulk1, value);
162    return bulk1;
163  }
164
165  /**
166   * Populate table with known values.
167   */
168  private void populateTable(final Connection connection, TableName table, int value)
169    throws Exception {
170    // create HFiles for different column families
171    Path dir = buildBulkFiles(table, value);
172    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir);
173  }
174
175  /**
176   * Split the known table in half. (this is hard coded for this test suite)
177   */
178  private void forceSplit(TableName table) {
179    try {
180      // need to call regions server to by synchronous but isn't visible.
181      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
182
183      for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
184        if (hri.getTable().equals(table)) {
185          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
186          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
187        }
188      }
189
190      // verify that split completed.
191      int regions;
192      do {
193        regions = 0;
194        for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
195          if (hri.getTable().equals(table)) {
196            regions++;
197          }
198        }
199        if (regions != 2) {
200          LOG.info("Taking some time to complete split...");
201          Thread.sleep(250);
202        }
203      } while (regions != 2);
204    } catch (IOException e) {
205      e.printStackTrace();
206    } catch (InterruptedException e) {
207      e.printStackTrace();
208    }
209  }
210
211  @AfterAll
212  public static void teardownCluster() throws Exception {
213    util.shutdownMiniCluster();
214  }
215
216  /**
217   * Checks that all columns have the expected value and that there is the expected number of rows.
218   */
219  void assertExpectedTable(TableName table, int count, int value) throws IOException {
220    TableDescriptor htd = util.getAdmin().getDescriptor(table);
221    assertNotNull(htd);
222    try (Table t = util.getConnection().getTable(table);
223      ResultScanner sr = t.getScanner(new Scan())) {
224      int i = 0;
225      for (Result r; (r = sr.next()) != null;) {
226        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
227          .forEach(v -> assertArrayEquals(value(value), v));
228        i++;
229      }
230      assertEquals(count, i);
231    } catch (IOException e) {
232      fail("Failed due to exception");
233    }
234  }
235
236  private static <T> CompletableFuture<T> failedFuture(Throwable error) {
237    CompletableFuture<T> future = new CompletableFuture<>();
238    future.completeExceptionally(error);
239    return future;
240  }
241
242  private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
243    AsyncClusterConnection errConn = spy(conn);
244    doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn).bulkLoad(
245      any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), anyBoolean());
246    return errConn;
247  }
248
249  /**
250   * Test that shows that exception thrown from the RS side will result in an exception on the
251   * LIHFile client.
252   */
253  @Test
254  public void testBulkLoadPhaseFailure(TestInfo testInfo) throws Exception {
255    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
256    final AtomicInteger attemptedCalls = new AtomicInteger();
257    Configuration conf = new Configuration(util.getConfiguration());
258    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
259    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
260
261      @Override
262      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
263        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
264        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
265        AsyncClusterConnection c =
266          attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn;
267        super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap);
268      }
269    };
270    Path dir = buildBulkFiles(table, 1);
271    assertThrows(IOException.class, () -> loader.bulkLoad(table, dir));
272  }
273
274  /**
275   * Test that shows that exception thrown from the RS side will result in the expected number of
276   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
277   * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set
278   */
279  @Test
280  public void testRetryOnIOException(TestInfo testInfo) throws Exception {
281    TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
282    AtomicInteger calls = new AtomicInteger(0);
283    setupTable(util.getConnection(), table, 10);
284    Configuration conf = new Configuration(util.getConfiguration());
285    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
286    conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true);
287    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
288
289      @Override
290      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
291        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
292        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
293        if (
294          calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
295            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
296        ) {
297          calls.incrementAndGet();
298          super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles,
299            item2RegionMap);
300        } else {
301          super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
302        }
303      }
304    };
305    Path dir = buildBulkFiles(table, 1);
306    loader.bulkLoad(table, dir);
307    assertEquals(calls.get(), 2);
308  }
309
310  /**
311   * This test exercises the path where there is a split after initial validation but before the
312   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
313   * split just before the atomic region load.
314   */
315  @Test
316  public void testSplitWhileBulkLoadPhase(TestInfo testInfo) throws Exception {
317    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
318    setupTable(util.getConnection(), table, 10);
319    populateTable(util.getConnection(), table, 1);
320    assertExpectedTable(table, ROWCOUNT, 1);
321
322    // Now let's cause trouble. This will occur after checks and cause bulk
323    // files to fail when attempt to atomically import. This is recoverable.
324    final AtomicInteger attemptedCalls = new AtomicInteger();
325    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
326
327      @Override
328      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
329        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
330        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
331        int i = attemptedCalls.incrementAndGet();
332        if (i == 1) {
333          // On first attempt force a split.
334          forceSplit(table);
335        }
336        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
337      }
338    };
339
340    // create HFiles for different column families
341    Path dir = buildBulkFiles(table, 2);
342    loader.bulkLoad(table, dir);
343
344    // check that data was loaded
345    // The three expected attempts are 1) failure because need to split, 2)
346    // load of split top 3) load of split bottom
347    assertEquals(3, attemptedCalls.get());
348    assertExpectedTable(table, ROWCOUNT, 2);
349  }
350
351  /**
352   * This test splits a table and attempts to bulk load. The bulk import files should be split
353   * before atomically importing.
354   */
355  @Test
356  public void testGroupOrSplitPresplit(TestInfo testInfo) throws Exception {
357    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
358    setupTable(util.getConnection(), table, 10);
359    populateTable(util.getConnection(), table, 1);
360    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1);
361    forceSplit(table);
362
363    final AtomicInteger countedLqis = new AtomicInteger();
364    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
365
366      @Override
367      protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
368        TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
369        List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
370        Pair<List<LoadQueueItem>, String> lqis =
371          super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
372        if (lqis != null && lqis.getFirst() != null) {
373          countedLqis.addAndGet(lqis.getFirst().size());
374        }
375        return lqis;
376      }
377    };
378
379    // create HFiles for different column families
380    Path dir = buildBulkFiles(table, 2);
381    loader.bulkLoad(table, dir);
382    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
383    assertEquals(20, countedLqis.get());
384  }
385
386  @Test
387  public void testCorrectSplitPoint(TestInfo testInfo) throws Exception {
388    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
389    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
390      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
391      Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") };
392    setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
393
394    final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
395    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
396
397      @Override
398      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
399        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
400        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
401        bulkloadRpcTimes.addAndGet(1);
402        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
403      }
404    };
405
406    Path dir = buildBulkFiles(table, 1);
407    loader.bulkLoad(table, dir);
408    // before HBASE-25281 we need invoke bulkload rpc 8 times
409    assertEquals(4, bulkloadRpcTimes.get());
410  }
411
412  /**
413   * This test creates a table with many small regions. The bulk load files would be splitted
414   * multiple times before all of them can be loaded successfully.
415   */
416  @Test
417  public void testSplitTmpFileCleanUp(TestInfo testInfo) throws Exception {
418    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
419    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
420      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
421      Bytes.toBytes("row_00000050") };
422    setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
423
424    BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration());
425
426    // create HFiles
427    Path dir = buildBulkFiles(table, 2);
428    loader.bulkLoad(table, dir);
429    // family path
430    Path tmpPath = new Path(dir, family(0));
431    // TMP_DIR under family path
432    tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR);
433    FileSystem fs = dir.getFileSystem(util.getConfiguration());
434    // HFiles have been splitted, there is TMP_DIR
435    assertTrue(fs.exists(tmpPath));
436    // TMP_DIR should have been cleaned-up
437    assertNull(CommonFSUtils.listStatus(fs, tmpPath),
438      BulkLoadHFilesTool.TMP_DIR + " should be empty.");
439    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
440  }
441
442  /**
443   * This simulates an remote exception which should cause LIHF to exit with an exception.
444   */
445  @Test
446  public void testGroupOrSplitFailure(TestInfo testInfo) throws Exception {
447    final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
448    setupTable(util.getConnection(), tableName, 10);
449    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
450
451      private int i = 0;
452
453      @Override
454      protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
455        TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
456        List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
457        i++;
458
459        if (i == 5) {
460          throw new IOException("failure");
461        }
462        return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
463      }
464    };
465
466    // create HFiles for different column families
467    Path dir = buildBulkFiles(tableName, 1);
468    assertThrows(IOException.class, () -> loader.bulkLoad(tableName, dir));
469  }
470
471  /**
472   * We are testing a split after initial validation but before the atomic bulk load call. We cannot
473   * use presplitting to test this path, so we actually inject a split just before the atomic region
474   * load. However, we will pass null item2RegionMap and that should not affect the bulk load
475   * behavior.
476   */
477  @Test
478  public void testSplitWhileBulkLoadPhaseWithoutItemMap(TestInfo testInfo) throws Exception {
479    final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName());
480    setupTable(util.getConnection(), table, 10);
481    populateTable(util.getConnection(), table, 1);
482    assertExpectedTable(table, ROWCOUNT, 1);
483
484    // Now let's cause trouble. This will occur after checks and cause bulk
485    // files to fail when attempt to atomically import. This is recoverable.
486    final AtomicInteger attemptedCalls = new AtomicInteger();
487    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
488
489      @Override
490      protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName,
491        final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups,
492        final boolean copyFiles, final Map<LoadQueueItem, ByteBuffer> item2RegionMap)
493        throws IOException {
494
495        int i = attemptedCalls.incrementAndGet();
496        if (i == 1) {
497          // On first attempt force a split.
498          forceSplit(table);
499        }
500
501        // Passing item2RegionMap null
502        // In the absence of LoadQueueItem, bulk load should work as expected
503        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
504      }
505
506    };
507
508    // create HFiles for different column families
509    Path dir = buildBulkFiles(table, 2);
510    loader.bulkLoad(table, dir);
511
512    // check that data was loaded
513    // The three expected attempts are 1) failure because need to split, 2)
514    // load of split top 3) load of split bottom
515    assertEquals(3, attemptedCalls.get());
516    assertExpectedTable(table, ROWCOUNT, 2);
517  }
518
519  /**
520   * Checks that all columns have the expected value and that there is the expected number of rows.
521   */
522  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
523    throws IOException {
524    TableDescriptor htd = util.getAdmin().getDescriptor(table);
525    assertNotNull(htd);
526    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
527      int i = 0;
528      for (Result r; (r = sr.next()) != null;) {
529        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
530          .forEach(v -> assertArrayEquals(value(value), v));
531        i++;
532      }
533      assertEquals(count, i);
534    } catch (IOException e) {
535      fail("Failed due to exception");
536    }
537  }
538}