001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.junit.jupiter.api.Assertions.assertArrayEquals;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.assertThrows;
026import static org.junit.jupiter.api.Assertions.assertTrue;
027import static org.junit.jupiter.api.Assertions.fail;
028
029import java.io.IOException;
030import java.net.InetAddress;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.TreeMap;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.HRegionLocation;
048import org.apache.hadoop.hbase.NamespaceDescriptor;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.TableNotFoundException;
051import org.apache.hadoop.hbase.client.AsyncClusterConnection;
052import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
055import org.apache.hadoop.hbase.client.Table;
056import org.apache.hadoop.hbase.client.TableDescriptor;
057import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
059import org.apache.hadoop.hbase.io.hfile.CacheConfig;
060import org.apache.hadoop.hbase.io.hfile.HFile;
061import org.apache.hadoop.hbase.io.hfile.HFileScanner;
062import org.apache.hadoop.hbase.regionserver.BloomType;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.CommonFSUtils;
065import org.apache.hadoop.hbase.util.FutureUtils;
066import org.apache.hadoop.hbase.util.HFileTestUtil;
067import org.apache.hadoop.hdfs.DistributedFileSystem;
068import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
069import org.apache.hadoop.hdfs.protocol.LocatedBlock;
070import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
071import org.hamcrest.MatcherAssert;
072import org.junit.jupiter.api.AfterAll;
073import org.junit.jupiter.api.Test;
074import org.junit.jupiter.api.TestInfo;
075
076import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
077
078/**
079 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
080 * faster than the full MR cluster tests in TestHFileOutputFormat
081 */
082public class BulkLoadHFilesTestBase {
083
084  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
085  private static final byte[] FAMILY = Bytes.toBytes("myfam");
086  private static final String NAMESPACE = "bulkNS";
087
088  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
089  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
090
091  private static final byte[][] SPLIT_KEYS =
092    new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
093
094  static HBaseTestingUtil util = new HBaseTestingUtil();
095
096  protected static void setupNamespace() throws Exception {
097    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
098  }
099
100  @AfterAll
101  public static void tearDownAfterClass() throws Exception {
102    util.shutdownMiniCluster();
103  }
104
105  @Test
106  public void testSimpleLoadWithMap() throws Exception {
107    runTest("testSimpleLoadWithMap", BloomType.NONE,
108      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
109        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
110      true);
111  }
112
113  /**
114   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
115   */
116  @Test
117  public void testSimpleLoad() throws Exception {
118    runTest("testSimpleLoad", BloomType.NONE,
119      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
120        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
121  }
122
123  @Test
124  public void testSimpleLoadWithFileCopy(TestInfo testInfo) throws Exception {
125    String testName = testInfo.getTestMethod().get().getName();
126    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
127    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
128      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
129        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
130      false, true, 2);
131  }
132
133  /**
134   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
135   */
136  @Test
137  public void testRegionCrossingLoad() throws Exception {
138    runTest("testRegionCrossingLoad", BloomType.NONE,
139      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
140        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
141  }
142
143  /**
144   * Test loading into a column family that has a ROW bloom filter.
145   */
146  @Test
147  public void testRegionCrossingRowBloom() throws Exception {
148    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
149      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
150        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
151  }
152
153  /**
154   * Test loading into a column family that has a ROWCOL bloom filter.
155   */
156  @Test
157  public void testRegionCrossingRowColBloom() throws Exception {
158    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
159      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
160        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
161  }
162
163  /**
164   * Test case that creates some regions and loads HFiles that have different region boundaries than
165   * the table pre-split.
166   */
167  @Test
168  public void testSimpleHFileSplit() throws Exception {
169    runTest("testHFileSplit", BloomType.NONE,
170      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
171        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
172      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
173        new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
174  }
175
176  /**
177   * Test case that creates some regions and loads HFiles that cross the boundaries and have
178   * different region boundaries than the table pre-split.
179   */
180  @Test
181  public void testRegionCrossingHFileSplit() throws Exception {
182    testRegionCrossingHFileSplit(BloomType.NONE);
183  }
184
185  /**
186   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
187   * filter and a different region boundaries than the table pre-split.
188   */
189  @Test
190  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
191    testRegionCrossingHFileSplit(BloomType.ROW);
192  }
193
194  /**
195   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
196   * bloom filter and a different region boundaries than the table pre-split.
197   */
198  @Test
199  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
200    testRegionCrossingHFileSplit(BloomType.ROWCOL);
201  }
202
203  @Test
204  public void testSplitALot() throws Exception {
205    runTest("testSplitALot", BloomType.NONE,
206      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
207        Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
208        Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
209        Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
210        Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
211        Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
212      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
213  }
214
215  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
216    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
217      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
218        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
219      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
220        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
221  }
222
223  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
224    return TableDescriptorBuilder.newBuilder(tableName)
225      .setColumnFamily(
226        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
227      .build();
228  }
229
230  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
231    throws Exception {
232    runTest(testName, bloomType, null, hfileRanges);
233  }
234
235  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
236    throws Exception {
237    runTest(testName, bloomType, null, hfileRanges, useMap);
238  }
239
240  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
241    byte[][][] hfileRanges) throws Exception {
242    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
243  }
244
245  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
246    byte[][][] hfileRanges, boolean useMap) throws Exception {
247    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
248    final boolean preCreateTable = tableSplitKeys != null;
249
250    // Run the test bulkloading the table to the default namespace
251    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
252    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
253      useMap, 2);
254
255    /*
256     * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
257     * -- regionDir -- familyDir -- storeFileDir
258     */
259    if (preCreateTable) {
260      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
261        3);
262    }
263
264    // Run the test bulkloading the table to the specified namespace
265    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
266    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
267      2);
268  }
269
270  private void runTest(String testName, TableName tableName, BloomType bloomType,
271    boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
272    int depth) throws Exception {
273    TableDescriptor htd = buildHTD(tableName, bloomType);
274    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
275  }
276
277  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util,
278    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
279    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
280    int factor, int depth) throws Exception {
281    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
282    FileSystem fs = util.getTestFileSystem();
283    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
284    Path parentDir = baseDirectory;
285    if (depth == 3) {
286      assert !useMap;
287      parentDir = new Path(baseDirectory, "someRegion");
288    }
289    Path familyDir = new Path(parentDir, Bytes.toString(fam));
290
291    int hfileIdx = 0;
292    Map<byte[], List<Path>> map = null;
293    List<Path> list = null;
294    if (useMap || copyFiles) {
295      list = new ArrayList<>();
296    }
297    if (useMap) {
298      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
299      map.put(fam, list);
300    }
301    Path last = null;
302    for (byte[][] range : hfileRanges) {
303      byte[] from = range[0];
304      byte[] to = range[1];
305      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
306      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
307      if (useMap) {
308        last = path;
309        list.add(path);
310      }
311    }
312    int expectedRows = hfileIdx * factor;
313
314    TableName tableName = htd.getTableName();
315    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
316      if (tableSplitKeys != null) {
317        util.getAdmin().createTable(htd, tableSplitKeys);
318      } else {
319        util.getAdmin().createTable(htd);
320      }
321    }
322
323    Configuration conf = util.getConfiguration();
324    if (copyFiles) {
325      conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
326    }
327    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
328    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
329    if (depth == 3) {
330      args.add("-loadTable");
331    }
332
333    if (useMap) {
334      if (deleteFile) {
335        fs.delete(last, true);
336      }
337      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
338      if (deleteFile) {
339        expectedRows -= 1000;
340        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
341          if (item.getFilePath().getName().equals(last.getName())) {
342            fail(last + " should be missing");
343          }
344        }
345      }
346    } else {
347      loader.run(args.toArray(new String[] {}));
348    }
349
350    if (copyFiles) {
351      for (Path p : list) {
352        assertTrue(fs.exists(p), p + " should exist");
353      }
354    }
355
356    try (Table table = util.getConnection().getTable(tableName)) {
357      assertEquals(initRowCount + expectedRows, countRows(table));
358    }
359
360    return expectedRows;
361  }
362
363  private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
364    byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
365    throws Exception {
366    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
367      useMap, true, copyFiles, 0, 1000, depth);
368
369    final TableName tableName = htd.getTableName();
370    // verify staging folder has been cleaned up
371    Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
372      HConstants.BULKLOAD_STAGING_DIR_NAME);
373    FileSystem fs = util.getTestFileSystem();
374    if (fs.exists(stagingBasePath)) {
375      FileStatus[] files = fs.listStatus(stagingBasePath);
376      for (FileStatus file : files) {
377        assertTrue(file.getPath().getName() != "DONOTERASE",
378          "Folder=" + file.getPath() + " is not cleaned up.");
379      }
380    }
381
382    util.deleteTable(tableName);
383  }
384
385  /**
386   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
387   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
388   * responses.
389   */
390  @Test
391  public void testTagsSurviveBulkLoadSplit(TestInfo testInfo) throws Exception {
392    String name = testInfo.getTestMethod().get().getName();
393    Path dir = util.getDataTestDirOnTestFS(name);
394    FileSystem fs = util.getTestFileSystem();
395    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
396    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
397    // table has these split points
398    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
399      Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
400
401    // creating an hfile that has values that span the split points.
402    byte[] from = Bytes.toBytes("ddd");
403    byte[] to = Bytes.toBytes("ooo");
404    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
405      new Path(familyDir, name + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
406    int expectedRows = 1000;
407
408    TableName tableName = TableName.valueOf(name);
409    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
410    util.getAdmin().createTable(htd, tableSplitKeys);
411
412    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
413
414    Table table = util.getConnection().getTable(tableName);
415    try {
416      assertEquals(expectedRows, countRows(table));
417      HFileTestUtil.verifyTags(table);
418    } finally {
419      table.close();
420    }
421
422    util.deleteTable(tableName);
423  }
424
425  /**
426   * Test loading into a column family that does not exist.
427   */
428  @Test
429  public void testNonexistentColumnFamilyLoad(TestInfo testInfo) throws Exception {
430    String testName = testInfo.getTestMethod().get().getName();
431    byte[][][] hFileRanges =
432      new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
433        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
434
435    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
436    // set real family name to upper case in purpose to simulate the case that
437    // family name in HFiles is invalid
438    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
439      .setColumnFamily(ColumnFamilyDescriptorBuilder
440        .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
441      .build();
442
443    try {
444      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
445      assertTrue(false, "Loading into table with non-existent family should have failed");
446    } catch (Exception e) {
447      assertTrue(e instanceof IOException, "IOException expected");
448      // further check whether exception message is correct
449      String errMsg = e.getMessage();
450      assertTrue(errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY),
451        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY
452          + "], current message: [" + errMsg + "]");
453    }
454  }
455
456  @Test
457  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
458    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
459  }
460
461  @Test
462  public void testNonHfileFolder() throws Exception {
463    testNonHfileFolder("testNonHfileFolder", false);
464  }
465
466  /**
467   * Write a random data file and a non-file in a dir with a valid family name but not part of the
468   * table families. we should we able to bulkload without getting the unmatched family exception.
469   * HBASE-13037/HBASE-13227
470   */
471  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
472    Path dir = util.getDataTestDirOnTestFS(tableName);
473    FileSystem fs = util.getTestFileSystem();
474    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
475
476    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
477    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
478      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
479    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
480
481    final String NON_FAMILY_FOLDER = "_logs";
482    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
483    fs.mkdirs(nonFamilyDir);
484    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
485    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
486
487    Table table = null;
488    try {
489      if (preCreateTable) {
490        table = util.createTable(TableName.valueOf(tableName), FAMILY);
491      } else {
492        table = util.getConnection().getTable(TableName.valueOf(tableName));
493      }
494      BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir);
495      assertEquals(500, countRows(table));
496    } finally {
497      if (table != null) {
498        table.close();
499      }
500      fs.delete(dir, true);
501    }
502  }
503
504  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
505    FSDataOutputStream stream = fs.create(path);
506    try {
507      byte[] data = new byte[1024];
508      for (int i = 0; i < data.length; ++i) {
509        data[i] = (byte) (i & 0xff);
510      }
511      while (size >= data.length) {
512        stream.write(data, 0, data.length);
513        size -= data.length;
514      }
515      if (size > 0) {
516        stream.write(data, 0, size);
517      }
518    } finally {
519      stream.close();
520    }
521  }
522
523  @Test
524  public void testSplitStoreFile(TestInfo testInfo) throws IOException {
525    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
526    FileSystem fs = util.getTestFileSystem();
527    Path testIn = new Path(dir, "testhfile");
528    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
529    String tableName = testInfo.getTestMethod().get().getName();
530    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
531    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
532      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
533
534    Path bottomOut = new Path(dir, "bottom.out");
535    Path topOut = new Path(dir, "top.out");
536
537    BulkLoadHFilesTool.splitStoreFile(
538      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
539      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
540
541    int rowCount = verifyHFile(bottomOut);
542    rowCount += verifyHFile(topOut);
543    assertEquals(1000, rowCount);
544  }
545
546  /**
547   * Test hfile splits with the favored nodes
548   */
549  @Test
550  public void testSplitStoreFileWithFavoriteNodes(TestInfo testInfo) throws IOException {
551
552    Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
553    FileSystem fs = util.getDFSCluster().getFileSystem();
554
555    Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
556    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
557    String tableName = testInfo.getTestMethod().get().getName();
558    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
559    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
560      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
561
562    Path bottomOut = new Path(dir, "bottom.out");
563    Path topOut = new Path(dir, "top.out");
564
565    final AsyncTableRegionLocator regionLocator =
566      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
567    BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
568      Bytes.toBytes("ggg"), bottomOut, topOut);
569    verifyHFileFavoriteNode(topOut, regionLocator, fs);
570    verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
571    int rowCount = verifyHFile(bottomOut);
572    rowCount += verifyHFile(topOut);
573    assertEquals(1000, rowCount);
574  }
575
576  @Test
577  public void testSplitStoreFileWithCreateTimeTS(TestInfo testInfo) throws IOException {
578    Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
579    FileSystem fs = util.getTestFileSystem();
580    Path testIn = new Path(dir, "testhfile");
581    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
582    String tableName = testInfo.getTestMethod().get().getName();
583    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
584    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
585      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
586
587    Path bottomOut = new Path(dir, "bottom.out");
588    Path topOut = new Path(dir, "top.out");
589
590    BulkLoadHFilesTool.splitStoreFile(
591      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
592      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
593
594    verifyHFileCreateTimeTS(bottomOut);
595    verifyHFileCreateTimeTS(topOut);
596  }
597
598  @Test
599  public void testSplitStoreFileWithNoneToNone(TestInfo testInfo) throws IOException {
600    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE,
601      testInfo);
602  }
603
604  @Test
605  public void testSplitStoreFileWithEncodedToEncoded(TestInfo testInfo) throws IOException {
606    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF,
607      testInfo);
608  }
609
610  @Test
611  public void testSplitStoreFileWithEncodedToNone(TestInfo testInfo) throws IOException {
612    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE,
613      testInfo);
614  }
615
616  @Test
617  public void testSplitStoreFileWithNoneToEncoded(TestInfo testInfo) throws IOException {
618    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF,
619      testInfo);
620  }
621
622  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
623    DataBlockEncoding cfEncoding, TestInfo testInfo) throws IOException {
624    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
625    FileSystem fs = util.getTestFileSystem();
626    Path testIn = new Path(dir, "testhfile");
627    ColumnFamilyDescriptor familyDesc =
628      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
629    String tableName = testInfo.getTestMethod().get().getName();
630    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
631    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
632      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
633
634    Path bottomOut = new Path(dir, "bottom.out");
635    Path topOut = new Path(dir, "top.out");
636
637    BulkLoadHFilesTool.splitStoreFile(
638      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
639      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
640
641    int rowCount = verifyHFile(bottomOut);
642    rowCount += verifyHFile(topOut);
643    assertEquals(1000, rowCount);
644  }
645
646  private int verifyHFile(Path p) throws IOException {
647    Configuration conf = util.getConfiguration();
648    HFile.Reader reader =
649      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
650    HFileScanner scanner = reader.getScanner(conf, false, false);
651    scanner.seekTo();
652    int count = 0;
653    do {
654      count++;
655    } while (scanner.next());
656    assertTrue(count > 0);
657    reader.close();
658    return count;
659  }
660
661  private void verifyHFileCreateTimeTS(Path p) throws IOException {
662    Configuration conf = util.getConfiguration();
663
664    try (HFile.Reader reader =
665      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) {
666      long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime();
667      MatcherAssert.assertThat(fileCreateTime, greaterThan(0L));
668    }
669  }
670
671  /**
672   * test split storefile with favorite node information
673   */
674  private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
675    throws IOException {
676    Configuration conf = util.getConfiguration();
677
678    try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
679
680      final byte[] firstRowkey = reader.getFirstRowKey().get();
681      final HRegionLocation hRegionLocation =
682        FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
683
684      final String targetHostName = hRegionLocation.getHostname();
685
686      if (fs instanceof DistributedFileSystem) {
687        String pathStr = p.toUri().getPath();
688        LocatedBlocks blocks =
689          ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
690
691        boolean isFavoriteNode = false;
692        List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
693        int index = 0;
694        do {
695          if (index > 0) {
696            assertTrue(isFavoriteNode, "failed use favored nodes");
697          }
698          isFavoriteNode = false;
699          final LocatedBlock block = locatedBlocks.get(index);
700
701          final DatanodeInfo[] locations = getLocatedBlockLocations(block);
702          for (DatanodeInfo location : locations) {
703
704            final String hostName = location.getHostName();
705            if (
706              targetHostName.equals(hostName.equals("127.0.0.1")
707                ? InetAddress.getLocalHost().getHostName()
708                : "127.0.0.1") || targetHostName.equals(hostName)
709            ) {
710              isFavoriteNode = true;
711              break;
712            }
713          }
714
715          index++;
716        } while (index < locatedBlocks.size());
717        if (index > 0) {
718          assertTrue(isFavoriteNode, "failed use favored nodes");
719        }
720
721      }
722
723    }
724  }
725
726  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
727    Integer value = map.containsKey(first) ? map.get(first) : 0;
728    map.put(first, value + 1);
729
730    value = map.containsKey(last) ? map.get(last) : 0;
731    map.put(last, value - 1);
732  }
733
734  @Test
735  public void testInferBoundaries() {
736    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
737
738    /*
739     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
740     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
741     * u---------x The output should be (m,r,u)
742     */
743
744    String first;
745    String last;
746
747    first = "a";
748    last = "e";
749    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
750
751    first = "r";
752    last = "s";
753    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
754
755    first = "o";
756    last = "p";
757    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
758
759    first = "g";
760    last = "k";
761    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
762
763    first = "v";
764    last = "x";
765    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
766
767    first = "c";
768    last = "i";
769    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
770
771    first = "m";
772    last = "q";
773    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
774
775    first = "s";
776    last = "t";
777    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
778
779    first = "u";
780    last = "w";
781    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
782
783    byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map);
784    byte[][] compare = new byte[3][];
785    compare[0] = Bytes.toBytes("m");
786    compare[1] = Bytes.toBytes("r");
787    compare[2] = Bytes.toBytes("u");
788
789    assertEquals(3, keysArray.length);
790
791    for (int row = 0; row < keysArray.length; row++) {
792      assertArrayEquals(keysArray[row], compare[row]);
793    }
794  }
795
796  @Test
797  public void testLoadTooMayHFiles() throws Exception {
798    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
799    FileSystem fs = util.getTestFileSystem();
800    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
801    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
802
803    byte[] from = Bytes.toBytes("begin");
804    byte[] to = Bytes.toBytes("end");
805    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
806      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
807        FAMILY, QUALIFIER, from, to, 1000);
808    }
809
810    try {
811      BulkLoadHFiles.create(util.getConfiguration())
812        .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir);
813      fail("Bulk loading too many files should fail");
814    } catch (IOException ie) {
815      assertTrue(ie.getMessage()
816        .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
817    }
818  }
819
820  @Test
821  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
822    Configuration conf = util.getConfiguration();
823    conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no");
824    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
825    String[] args = { "directory", "nonExistingTable" };
826    assertThrows(TableNotFoundException.class, () -> loader.run(args));
827  }
828
829  @Test
830  public void testTableWithCFNameStartWithUnderScore(TestInfo testInfo) throws Exception {
831    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
832    FileSystem fs = util.getTestFileSystem();
833    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
834    String family = "_cf";
835    Path familyDir = new Path(dir, family);
836
837    byte[] from = Bytes.toBytes("begin");
838    byte[] to = Bytes.toBytes("end");
839    Configuration conf = util.getConfiguration();
840    String tableName = testInfo.getTestMethod().get().getName();
841    try (Table table = util.createTable(TableName.valueOf(tableName), family)) {
842      HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
843        QUALIFIER, from, to, 1000);
844      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir);
845      assertEquals(1000, countRows(table));
846    }
847  }
848
849  @Test
850  public void testBulkLoadByFamily(TestInfo testInfo) throws Exception {
851    Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily");
852    FileSystem fs = util.getTestFileSystem();
853    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
854    String tableName = testInfo.getTestMethod().get().getName();
855    String[] families = { "cf1", "cf2", "cf3" };
856    for (int i = 0; i < families.length; i++) {
857      byte[] from = Bytes.toBytes(i + "begin");
858      byte[] to = Bytes.toBytes(i + "end");
859      Path familyDir = new Path(dir, families[i]);
860      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"),
861        Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000);
862    }
863    Table table = util.createTable(TableName.valueOf(tableName), families);
864    final AtomicInteger attemptedCalls = new AtomicInteger();
865    util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
866    BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) {
867      @Override
868      protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
869        final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
870        final byte[] first, Collection<LoadQueueItem> lqis) {
871        attemptedCalls.incrementAndGet();
872        return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis);
873      }
874    };
875    try {
876      loader.bulkLoad(table.getName(), dir);
877      assertEquals(families.length, attemptedCalls.get());
878      assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table));
879    } finally {
880      if (null != table) {
881        table.close();
882      }
883      util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
884    }
885  }
886
887  @Test
888  public void testFailIfNeedSplitHFile(TestInfo testInfo) throws IOException {
889    TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
890    Table table = util.createTable(tableName, FAMILY);
891
892    util.loadTable(table, FAMILY);
893
894    FileSystem fs = util.getTestFileSystem();
895    Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
896    HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
897      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
898
899    util.getAdmin().split(tableName);
900    util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
901
902    Configuration config = new Configuration(util.getConfiguration());
903    config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
904    BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
905
906    String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
907    assertThrows(IOException.class, () -> tool.run(args));
908    util.getHBaseCluster().getRegions(tableName)
909      .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
910  }
911}