001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025import static org.junit.Assert.fail;
026import static org.mockito.ArgumentMatchers.any;
027import static org.mockito.ArgumentMatchers.anyBoolean;
028import static org.mockito.ArgumentMatchers.anyList;
029import static org.mockito.Mockito.doReturn;
030import static org.mockito.Mockito.spy;
031
032import java.io.IOException;
033import java.nio.ByteBuffer;
034import java.util.Deque;
035import java.util.List;
036import java.util.Map;
037import java.util.concurrent.CompletableFuture;
038import java.util.concurrent.atomic.AtomicInteger;
039import java.util.stream.IntStream;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.TableExistsException;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.Admin;
049import org.apache.hadoop.hbase.client.AsyncClusterConnection;
050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
051import org.apache.hadoop.hbase.client.Connection;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.Result;
054import org.apache.hadoop.hbase.client.ResultScanner;
055import org.apache.hadoop.hbase.client.Scan;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
060import org.apache.hadoop.hbase.regionserver.HRegionServer;
061import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
062import org.apache.hadoop.hbase.testclassification.LargeTests;
063import org.apache.hadoop.hbase.testclassification.MiscTests;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.Pair;
067import org.junit.AfterClass;
068import org.junit.BeforeClass;
069import org.junit.ClassRule;
070import org.junit.Rule;
071import org.junit.Test;
072import org.junit.experimental.categories.Category;
073import org.junit.rules.TestName;
074import org.slf4j.Logger;
075import org.slf4j.LoggerFactory;
076
077import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
078
079import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
080
081/**
082 * Test cases for the atomic load error handling of the bulk load functionality.
083 */
084@Category({ MiscTests.class, LargeTests.class })
085public class TestBulkLoadHFilesSplitRecovery {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestBulkLoadHFilesSplitRecovery.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
092
093  static HBaseTestingUtil util;
094  // used by secure subclass
095  static boolean useSecure = false;
096
097  final static int NUM_CFS = 10;
098  final static byte[] QUAL = Bytes.toBytes("qual");
099  final static int ROWCOUNT = 100;
100
101  private final static byte[][] families = new byte[NUM_CFS][];
102
103  @Rule
104  public TestName name = new TestName();
105
106  static {
107    for (int i = 0; i < NUM_CFS; i++) {
108      families[i] = Bytes.toBytes(family(i));
109    }
110  }
111
112  static byte[] rowkey(int i) {
113    return Bytes.toBytes(String.format("row_%08d", i));
114  }
115
116  static String family(int i) {
117    return String.format("family_%04d", i);
118  }
119
120  static byte[] value(int i) {
121    return Bytes.toBytes(String.format("%010d", i));
122  }
123
124  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
125    byte[] val = value(value);
126    for (int i = 0; i < NUM_CFS; i++) {
127      Path testIn = new Path(dir, family(i));
128
129      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
130        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
131    }
132  }
133
134  private TableDescriptor createTableDesc(TableName name, int cfs) {
135    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
136    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
137      .forEachOrdered(builder::setColumnFamily);
138    return builder.build();
139  }
140
141  /**
142   * Creates a table with given table name and specified number of column families if the table does
143   * not already exist.
144   */
145  private void setupTable(final Connection connection, TableName table, int cfs)
146    throws IOException {
147    try {
148      LOG.info("Creating table " + table);
149      try (Admin admin = connection.getAdmin()) {
150        admin.createTable(createTableDesc(table, cfs));
151      }
152    } catch (TableExistsException tee) {
153      LOG.info("Table " + table + " already exists");
154    }
155  }
156
157  /**
158   * Creates a table with given table name,specified number of column families<br>
159   * and splitkeys if the table does not already exist.
160   */
161  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
162    throws IOException {
163    try {
164      LOG.info("Creating table " + table);
165      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
166    } catch (TableExistsException tee) {
167      LOG.info("Table " + table + " already exists");
168    }
169  }
170
171  private Path buildBulkFiles(TableName table, int value) throws Exception {
172    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
173    Path bulk1 = new Path(dir, table.getNameAsString() + value);
174    FileSystem fs = util.getTestFileSystem();
175    buildHFiles(fs, bulk1, value);
176    return bulk1;
177  }
178
179  /**
180   * Populate table with known values.
181   */
182  private void populateTable(final Connection connection, TableName table, int value)
183    throws Exception {
184    // create HFiles for different column families
185    Path dir = buildBulkFiles(table, value);
186    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir);
187  }
188
189  /**
190   * Split the known table in half. (this is hard coded for this test suite)
191   */
192  private void forceSplit(TableName table) {
193    try {
194      // need to call regions server to by synchronous but isn't visible.
195      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
196
197      for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
198        if (hri.getTable().equals(table)) {
199          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
200          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
201        }
202      }
203
204      // verify that split completed.
205      int regions;
206      do {
207        regions = 0;
208        for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
209          if (hri.getTable().equals(table)) {
210            regions++;
211          }
212        }
213        if (regions != 2) {
214          LOG.info("Taking some time to complete split...");
215          Thread.sleep(250);
216        }
217      } while (regions != 2);
218    } catch (IOException e) {
219      e.printStackTrace();
220    } catch (InterruptedException e) {
221      e.printStackTrace();
222    }
223  }
224
225  @BeforeClass
226  public static void setupCluster() throws Exception {
227    util = new HBaseTestingUtil();
228    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
229    util.startMiniCluster(1);
230  }
231
232  @AfterClass
233  public static void teardownCluster() throws Exception {
234    util.shutdownMiniCluster();
235  }
236
237  /**
238   * Checks that all columns have the expected value and that there is the expected number of rows.
239   */
240  void assertExpectedTable(TableName table, int count, int value) throws IOException {
241    TableDescriptor htd = util.getAdmin().getDescriptor(table);
242    assertNotNull(htd);
243    try (Table t = util.getConnection().getTable(table);
244      ResultScanner sr = t.getScanner(new Scan())) {
245      int i = 0;
246      for (Result r; (r = sr.next()) != null;) {
247        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
248          .forEach(v -> assertArrayEquals(value(value), v));
249        i++;
250      }
251      assertEquals(count, i);
252    } catch (IOException e) {
253      fail("Failed due to exception");
254    }
255  }
256
257  private static <T> CompletableFuture<T> failedFuture(Throwable error) {
258    CompletableFuture<T> future = new CompletableFuture<>();
259    future.completeExceptionally(error);
260    return future;
261  }
262
263  private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
264    AsyncClusterConnection errConn = spy(conn);
265    doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn).bulkLoad(
266      any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), anyBoolean());
267    return errConn;
268  }
269
270  /**
271   * Test that shows that exception thrown from the RS side will result in an exception on the
272   * LIHFile client.
273   */
274  @Test(expected = IOException.class)
275  public void testBulkLoadPhaseFailure() throws Exception {
276    final TableName table = TableName.valueOf(name.getMethodName());
277    final AtomicInteger attemptedCalls = new AtomicInteger();
278    Configuration conf = new Configuration(util.getConfiguration());
279    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
280    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
281
282      @Override
283      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
284        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
285        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
286        AsyncClusterConnection c =
287          attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn;
288        super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap);
289      }
290    };
291    Path dir = buildBulkFiles(table, 1);
292    loader.bulkLoad(table, dir);
293  }
294
295  /**
296   * Test that shows that exception thrown from the RS side will result in the expected number of
297   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
298   * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set
299   */
300  @Test
301  public void testRetryOnIOException() throws Exception {
302    TableName table = TableName.valueOf(name.getMethodName());
303    AtomicInteger calls = new AtomicInteger(0);
304    setupTable(util.getConnection(), table, 10);
305    Configuration conf = new Configuration(util.getConfiguration());
306    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
307    conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true);
308    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
309
310      @Override
311      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
312        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
313        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
314        if (
315          calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
316            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
317        ) {
318          calls.incrementAndGet();
319          super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles,
320            item2RegionMap);
321        } else {
322          super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
323        }
324      }
325    };
326    Path dir = buildBulkFiles(table, 1);
327    loader.bulkLoad(table, dir);
328    assertEquals(calls.get(), 2);
329  }
330
331  /**
332   * This test exercises the path where there is a split after initial validation but before the
333   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
334   * split just before the atomic region load.
335   */
336  @Test
337  public void testSplitWhileBulkLoadPhase() throws Exception {
338    final TableName table = TableName.valueOf(name.getMethodName());
339    setupTable(util.getConnection(), table, 10);
340    populateTable(util.getConnection(), table, 1);
341    assertExpectedTable(table, ROWCOUNT, 1);
342
343    // Now let's cause trouble. This will occur after checks and cause bulk
344    // files to fail when attempt to atomically import. This is recoverable.
345    final AtomicInteger attemptedCalls = new AtomicInteger();
346    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
347
348      @Override
349      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
350        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
351        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
352        int i = attemptedCalls.incrementAndGet();
353        if (i == 1) {
354          // On first attempt force a split.
355          forceSplit(table);
356        }
357        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
358      }
359    };
360
361    // create HFiles for different column families
362    Path dir = buildBulkFiles(table, 2);
363    loader.bulkLoad(table, dir);
364
365    // check that data was loaded
366    // The three expected attempts are 1) failure because need to split, 2)
367    // load of split top 3) load of split bottom
368    assertEquals(3, attemptedCalls.get());
369    assertExpectedTable(table, ROWCOUNT, 2);
370  }
371
372  /**
373   * This test splits a table and attempts to bulk load. The bulk import files should be split
374   * before atomically importing.
375   */
376  @Test
377  public void testGroupOrSplitPresplit() throws Exception {
378    final TableName table = TableName.valueOf(name.getMethodName());
379    setupTable(util.getConnection(), table, 10);
380    populateTable(util.getConnection(), table, 1);
381    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1);
382    forceSplit(table);
383
384    final AtomicInteger countedLqis = new AtomicInteger();
385    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
386
387      @Override
388      protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
389        TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
390        List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
391        Pair<List<LoadQueueItem>, String> lqis =
392          super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
393        if (lqis != null && lqis.getFirst() != null) {
394          countedLqis.addAndGet(lqis.getFirst().size());
395        }
396        return lqis;
397      }
398    };
399
400    // create HFiles for different column families
401    Path dir = buildBulkFiles(table, 2);
402    loader.bulkLoad(table, dir);
403    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
404    assertEquals(20, countedLqis.get());
405  }
406
407  @Test
408  public void testCorrectSplitPoint() throws Exception {
409    final TableName table = TableName.valueOf(name.getMethodName());
410    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
411      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
412      Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") };
413    setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
414
415    final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
416    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
417
418      @Override
419      protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
420        Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
421        boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
422        bulkloadRpcTimes.addAndGet(1);
423        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
424      }
425    };
426
427    Path dir = buildBulkFiles(table, 1);
428    loader.bulkLoad(table, dir);
429    // before HBASE-25281 we need invoke bulkload rpc 8 times
430    assertEquals(4, bulkloadRpcTimes.get());
431  }
432
433  /**
434   * This test creates a table with many small regions. The bulk load files would be splitted
435   * multiple times before all of them can be loaded successfully.
436   */
437  @Test
438  public void testSplitTmpFileCleanUp() throws Exception {
439    final TableName table = TableName.valueOf(name.getMethodName());
440    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
441      Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
442      Bytes.toBytes("row_00000050") };
443    setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
444
445    BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration());
446
447    // create HFiles
448    Path dir = buildBulkFiles(table, 2);
449    loader.bulkLoad(table, dir);
450    // family path
451    Path tmpPath = new Path(dir, family(0));
452    // TMP_DIR under family path
453    tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR);
454    FileSystem fs = dir.getFileSystem(util.getConfiguration());
455    // HFiles have been splitted, there is TMP_DIR
456    assertTrue(fs.exists(tmpPath));
457    // TMP_DIR should have been cleaned-up
458    assertNull(BulkLoadHFilesTool.TMP_DIR + " should be empty.",
459      CommonFSUtils.listStatus(fs, tmpPath));
460    assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
461  }
462
463  /**
464   * This simulates an remote exception which should cause LIHF to exit with an exception.
465   */
466  @Test(expected = IOException.class)
467  public void testGroupOrSplitFailure() throws Exception {
468    final TableName tableName = TableName.valueOf(name.getMethodName());
469    setupTable(util.getConnection(), tableName, 10);
470    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
471
472      private int i = 0;
473
474      @Override
475      protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
476        TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
477        List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
478        i++;
479
480        if (i == 5) {
481          throw new IOException("failure");
482        }
483        return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
484      }
485    };
486
487    // create HFiles for different column families
488    Path dir = buildBulkFiles(tableName, 1);
489    loader.bulkLoad(tableName, dir);
490  }
491
492  /**
493   * We are testing a split after initial validation but before the atomic bulk load call. We cannot
494   * use presplitting to test this path, so we actually inject a split just before the atomic region
495   * load. However, we will pass null item2RegionMap and that should not affect the bulk load
496   * behavior.
497   */
498  @Test
499  public void testSplitWhileBulkLoadPhaseWithoutItemMap() throws Exception {
500    final TableName table = TableName.valueOf(name.getMethodName());
501    setupTable(util.getConnection(), table, 10);
502    populateTable(util.getConnection(), table, 1);
503    assertExpectedTable(table, ROWCOUNT, 1);
504
505    // Now let's cause trouble. This will occur after checks and cause bulk
506    // files to fail when attempt to atomically import. This is recoverable.
507    final AtomicInteger attemptedCalls = new AtomicInteger();
508    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
509
510      @Override
511      protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName,
512        final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups,
513        final boolean copyFiles, final Map<LoadQueueItem, ByteBuffer> item2RegionMap)
514        throws IOException {
515
516        int i = attemptedCalls.incrementAndGet();
517        if (i == 1) {
518          // On first attempt force a split.
519          forceSplit(table);
520        }
521
522        // Passing item2RegionMap null
523        // In the absence of LoadQueueItem, bulk load should work as expected
524        super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
525      }
526
527    };
528
529    // create HFiles for different column families
530    Path dir = buildBulkFiles(table, 2);
531    loader.bulkLoad(table, dir);
532
533    // check that data was loaded
534    // The three expected attempts are 1) failure because need to split, 2)
535    // load of split top 3) load of split bottom
536    assertEquals(3, attemptedCalls.get());
537    assertExpectedTable(table, ROWCOUNT, 2);
538  }
539
540  /**
541   * Checks that all columns have the expected value and that there is the expected number of rows.
542   */
543  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
544    throws IOException {
545    TableDescriptor htd = util.getAdmin().getDescriptor(table);
546    assertNotNull(htd);
547    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
548      int i = 0;
549      for (Result r; (r = sr.next()) != null;) {
550        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
551          .forEach(v -> assertArrayEquals(value(value), v));
552        i++;
553      }
554      assertEquals(count, i);
555    } catch (IOException e) {
556      fail("Failed due to exception");
557    }
558  }
559}