001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertThrows;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.net.InetAddress;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.TreeMap;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.NamespaceDescriptor;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.TableNotFoundException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
062import org.apache.hadoop.hbase.io.hfile.CacheConfig;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.io.hfile.HFileScanner;
065import org.apache.hadoop.hbase.regionserver.BloomType;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.MiscTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.FutureUtils;
071import org.apache.hadoop.hbase.util.HFileTestUtil;
072import org.apache.hadoop.hdfs.DistributedFileSystem;
073import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
074import org.apache.hadoop.hdfs.protocol.LocatedBlock;
075import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
076import org.hamcrest.MatcherAssert;
077import org.junit.AfterClass;
078import org.junit.BeforeClass;
079import org.junit.ClassRule;
080import org.junit.Rule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.junit.rules.TestName;
084
085import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
086
087/**
088 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
089 * faster than the full MR cluster tests in TestHFileOutputFormat
090 */
091@Category({ MiscTests.class, LargeTests.class })
092public class TestBulkLoadHFiles {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096    HBaseClassTestRule.forClass(TestBulkLoadHFiles.class);
097
098  @Rule
099  public TestName tn = new TestName();
100
101  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
102  private static final byte[] FAMILY = Bytes.toBytes("myfam");
103  private static final String NAMESPACE = "bulkNS";
104
105  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
106  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
107
108  private static final byte[][] SPLIT_KEYS =
109    new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
110
111  static HBaseTestingUtil util = new HBaseTestingUtil();
112
113  @BeforeClass
114  public static void setUpBeforeClass() throws Exception {
115    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
116    util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
117      MAX_FILES_PER_REGION_PER_FAMILY);
118    // change default behavior so that tag values are returned with normal rpcs
119    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
120      KeyValueCodecWithTags.class.getCanonicalName());
121    util.startMiniCluster();
122
123    setupNamespace();
124  }
125
126  protected static void setupNamespace() throws Exception {
127    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
128  }
129
130  @AfterClass
131  public static void tearDownAfterClass() throws Exception {
132    util.shutdownMiniCluster();
133  }
134
135  @Test
136  public void testSimpleLoadWithMap() throws Exception {
137    runTest("testSimpleLoadWithMap", BloomType.NONE,
138      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
139        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
140      true);
141  }
142
143  /**
144   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
145   */
146  @Test
147  public void testSimpleLoad() throws Exception {
148    runTest("testSimpleLoad", BloomType.NONE,
149      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
150        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
151  }
152
153  @Test
154  public void testSimpleLoadWithFileCopy() throws Exception {
155    String testName = tn.getMethodName();
156    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
157    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
158      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
159        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
160      false, true, 2);
161  }
162
163  /**
164   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
165   */
166  @Test
167  public void testRegionCrossingLoad() throws Exception {
168    runTest("testRegionCrossingLoad", BloomType.NONE,
169      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
170        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
171  }
172
173  /**
174   * Test loading into a column family that has a ROW bloom filter.
175   */
176  @Test
177  public void testRegionCrossingRowBloom() throws Exception {
178    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
179      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
180        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
181  }
182
183  /**
184   * Test loading into a column family that has a ROWCOL bloom filter.
185   */
186  @Test
187  public void testRegionCrossingRowColBloom() throws Exception {
188    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
189      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
190        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
191  }
192
193  /**
194   * Test case that creates some regions and loads HFiles that have different region boundaries than
195   * the table pre-split.
196   */
197  @Test
198  public void testSimpleHFileSplit() throws Exception {
199    runTest("testHFileSplit", BloomType.NONE,
200      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
201        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
202      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
203        new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
204  }
205
206  /**
207   * Test case that creates some regions and loads HFiles that cross the boundaries and have
208   * different region boundaries than the table pre-split.
209   */
210  @Test
211  public void testRegionCrossingHFileSplit() throws Exception {
212    testRegionCrossingHFileSplit(BloomType.NONE);
213  }
214
215  /**
216   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
217   * filter and a different region boundaries than the table pre-split.
218   */
219  @Test
220  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
221    testRegionCrossingHFileSplit(BloomType.ROW);
222  }
223
224  /**
225   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
226   * bloom filter and a different region boundaries than the table pre-split.
227   */
228  @Test
229  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
230    testRegionCrossingHFileSplit(BloomType.ROWCOL);
231  }
232
233  @Test
234  public void testSplitALot() throws Exception {
235    runTest("testSplitALot", BloomType.NONE,
236      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
237        Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
238        Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
239        Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
240        Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
241        Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
242      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
243  }
244
245  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
246    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
247      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
248        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
249      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
250        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
251  }
252
253  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
254    return TableDescriptorBuilder.newBuilder(tableName)
255      .setColumnFamily(
256        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
257      .build();
258  }
259
260  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
261    throws Exception {
262    runTest(testName, bloomType, null, hfileRanges);
263  }
264
265  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
266    throws Exception {
267    runTest(testName, bloomType, null, hfileRanges, useMap);
268  }
269
270  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
271    byte[][][] hfileRanges) throws Exception {
272    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
273  }
274
275  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
276    byte[][][] hfileRanges, boolean useMap) throws Exception {
277    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
278    final boolean preCreateTable = tableSplitKeys != null;
279
280    // Run the test bulkloading the table to the default namespace
281    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
282    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
283      useMap, 2);
284
285    /*
286     * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
287     * -- regionDir -- familyDir -- storeFileDir
288     */
289    if (preCreateTable) {
290      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
291        3);
292    }
293
294    // Run the test bulkloading the table to the specified namespace
295    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
296    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
297      2);
298  }
299
300  private void runTest(String testName, TableName tableName, BloomType bloomType,
301    boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
302    int depth) throws Exception {
303    TableDescriptor htd = buildHTD(tableName, bloomType);
304    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
305  }
306
307  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util,
308    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
309    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
310    int factor) throws Exception {
311    return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
312      useMap, deleteFile, copyFiles, initRowCount, factor, 2);
313  }
314
315  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util,
316    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
317    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
318    int factor, int depth) throws Exception {
319    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
320    FileSystem fs = util.getTestFileSystem();
321    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
322    Path parentDir = baseDirectory;
323    if (depth == 3) {
324      assert !useMap;
325      parentDir = new Path(baseDirectory, "someRegion");
326    }
327    Path familyDir = new Path(parentDir, Bytes.toString(fam));
328
329    int hfileIdx = 0;
330    Map<byte[], List<Path>> map = null;
331    List<Path> list = null;
332    if (useMap || copyFiles) {
333      list = new ArrayList<>();
334    }
335    if (useMap) {
336      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
337      map.put(fam, list);
338    }
339    Path last = null;
340    for (byte[][] range : hfileRanges) {
341      byte[] from = range[0];
342      byte[] to = range[1];
343      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
344      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
345      if (useMap) {
346        last = path;
347        list.add(path);
348      }
349    }
350    int expectedRows = hfileIdx * factor;
351
352    TableName tableName = htd.getTableName();
353    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
354      if (tableSplitKeys != null) {
355        util.getAdmin().createTable(htd, tableSplitKeys);
356      } else {
357        util.getAdmin().createTable(htd);
358      }
359    }
360
361    Configuration conf = util.getConfiguration();
362    if (copyFiles) {
363      conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
364    }
365    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
366    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
367    if (depth == 3) {
368      args.add("-loadTable");
369    }
370
371    if (useMap) {
372      if (deleteFile) {
373        fs.delete(last, true);
374      }
375      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
376      if (deleteFile) {
377        expectedRows -= 1000;
378        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
379          if (item.getFilePath().getName().equals(last.getName())) {
380            fail(last + " should be missing");
381          }
382        }
383      }
384    } else {
385      loader.run(args.toArray(new String[] {}));
386    }
387
388    if (copyFiles) {
389      for (Path p : list) {
390        assertTrue(p + " should exist", fs.exists(p));
391      }
392    }
393
394    try (Table table = util.getConnection().getTable(tableName)) {
395      assertEquals(initRowCount + expectedRows, countRows(table));
396    }
397
398    return expectedRows;
399  }
400
401  private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
402    byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
403    throws Exception {
404    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
405      useMap, true, copyFiles, 0, 1000, depth);
406
407    final TableName tableName = htd.getTableName();
408    // verify staging folder has been cleaned up
409    Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
410      HConstants.BULKLOAD_STAGING_DIR_NAME);
411    FileSystem fs = util.getTestFileSystem();
412    if (fs.exists(stagingBasePath)) {
413      FileStatus[] files = fs.listStatus(stagingBasePath);
414      for (FileStatus file : files) {
415        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
416          file.getPath().getName() != "DONOTERASE");
417      }
418    }
419
420    util.deleteTable(tableName);
421  }
422
423  /**
424   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
425   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
426   * responses.
427   */
428  @Test
429  public void testTagsSurviveBulkLoadSplit() throws Exception {
430    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
431    FileSystem fs = util.getTestFileSystem();
432    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
433    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
434    // table has these split points
435    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
436      Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
437
438    // creating an hfile that has values that span the split points.
439    byte[] from = Bytes.toBytes("ddd");
440    byte[] to = Bytes.toBytes("ooo");
441    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
442      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
443    int expectedRows = 1000;
444
445    TableName tableName = TableName.valueOf(tn.getMethodName());
446    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
447    util.getAdmin().createTable(htd, tableSplitKeys);
448
449    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
450
451    Table table = util.getConnection().getTable(tableName);
452    try {
453      assertEquals(expectedRows, countRows(table));
454      HFileTestUtil.verifyTags(table);
455    } finally {
456      table.close();
457    }
458
459    util.deleteTable(tableName);
460  }
461
462  /**
463   * Test loading into a column family that does not exist.
464   */
465  @Test
466  public void testNonexistentColumnFamilyLoad() throws Exception {
467    String testName = tn.getMethodName();
468    byte[][][] hFileRanges =
469      new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
470        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
471
472    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
473    // set real family name to upper case in purpose to simulate the case that
474    // family name in HFiles is invalid
475    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
476      .setColumnFamily(ColumnFamilyDescriptorBuilder
477        .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
478      .build();
479
480    try {
481      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
482      assertTrue("Loading into table with non-existent family should have failed", false);
483    } catch (Exception e) {
484      assertTrue("IOException expected", e instanceof IOException);
485      // further check whether the exception message is correct
486      String errMsg = e.getMessage();
487      assertTrue(
488        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY
489          + "], current message: [" + errMsg + "]",
490        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
491    }
492  }
493
494  @Test
495  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
496    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
497  }
498
499  @Test
500  public void testNonHfileFolder() throws Exception {
501    testNonHfileFolder("testNonHfileFolder", false);
502  }
503
504  /**
505   * Write a random data file and a non-file in a dir with a valid family name but not part of the
506   * table families. we should we able to bulkload without getting the unmatched family exception.
507   * HBASE-13037/HBASE-13227
508   */
509  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
510    Path dir = util.getDataTestDirOnTestFS(tableName);
511    FileSystem fs = util.getTestFileSystem();
512    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
513
514    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
515    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
516      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
517    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
518
519    final String NON_FAMILY_FOLDER = "_logs";
520    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
521    fs.mkdirs(nonFamilyDir);
522    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
523    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
524
525    Table table = null;
526    try {
527      if (preCreateTable) {
528        table = util.createTable(TableName.valueOf(tableName), FAMILY);
529      } else {
530        table = util.getConnection().getTable(TableName.valueOf(tableName));
531      }
532      BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir);
533      assertEquals(500, countRows(table));
534    } finally {
535      if (table != null) {
536        table.close();
537      }
538      fs.delete(dir, true);
539    }
540  }
541
542  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
543    FSDataOutputStream stream = fs.create(path);
544    try {
545      byte[] data = new byte[1024];
546      for (int i = 0; i < data.length; ++i) {
547        data[i] = (byte) (i & 0xff);
548      }
549      while (size >= data.length) {
550        stream.write(data, 0, data.length);
551        size -= data.length;
552      }
553      if (size > 0) {
554        stream.write(data, 0, size);
555      }
556    } finally {
557      stream.close();
558    }
559  }
560
561  @Test
562  public void testSplitStoreFile() throws IOException {
563    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
564    FileSystem fs = util.getTestFileSystem();
565    Path testIn = new Path(dir, "testhfile");
566    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
567    String tableName = tn.getMethodName();
568    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
569    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
570      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
571
572    Path bottomOut = new Path(dir, "bottom.out");
573    Path topOut = new Path(dir, "top.out");
574
575    BulkLoadHFilesTool.splitStoreFile(
576      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
577      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
578
579    int rowCount = verifyHFile(bottomOut);
580    rowCount += verifyHFile(topOut);
581    assertEquals(1000, rowCount);
582  }
583
584  /**
585   * Test hfile splits with the favored nodes
586   */
587  @Test
588  public void testSplitStoreFileWithFavoriteNodes() throws IOException {
589
590    Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
591    FileSystem fs = util.getDFSCluster().getFileSystem();
592
593    Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
594    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
595    String tableName = tn.getMethodName();
596    Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
597    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
598      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
599
600    Path bottomOut = new Path(dir, "bottom.out");
601    Path topOut = new Path(dir, "top.out");
602
603    final AsyncTableRegionLocator regionLocator =
604      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
605    BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
606      Bytes.toBytes("ggg"), bottomOut, topOut);
607    verifyHFileFavoriteNode(topOut, regionLocator, fs);
608    verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
609    int rowCount = verifyHFile(bottomOut);
610    rowCount += verifyHFile(topOut);
611    assertEquals(1000, rowCount);
612  }
613
614  @Test
615  public void testSplitStoreFileWithCreateTimeTS() throws IOException {
616    Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
617    FileSystem fs = util.getTestFileSystem();
618    Path testIn = new Path(dir, "testhfile");
619    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
620    String tableName = tn.getMethodName();
621    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
622    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
623      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
624
625    Path bottomOut = new Path(dir, "bottom.out");
626    Path topOut = new Path(dir, "top.out");
627
628    BulkLoadHFilesTool.splitStoreFile(
629      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
630      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
631
632    verifyHFileCreateTimeTS(bottomOut);
633    verifyHFileCreateTimeTS(topOut);
634  }
635
636  @Test
637  public void testSplitStoreFileWithNoneToNone() throws IOException {
638    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
639  }
640
641  @Test
642  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
643    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
644  }
645
646  @Test
647  public void testSplitStoreFileWithEncodedToNone() throws IOException {
648    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
649  }
650
651  @Test
652  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
653    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
654  }
655
656  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
657    DataBlockEncoding cfEncoding) throws IOException {
658    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
659    FileSystem fs = util.getTestFileSystem();
660    Path testIn = new Path(dir, "testhfile");
661    ColumnFamilyDescriptor familyDesc =
662      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
663    String tableName = tn.getMethodName();
664    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
665    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
666      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
667
668    Path bottomOut = new Path(dir, "bottom.out");
669    Path topOut = new Path(dir, "top.out");
670
671    BulkLoadHFilesTool.splitStoreFile(
672      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
673      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
674
675    int rowCount = verifyHFile(bottomOut);
676    rowCount += verifyHFile(topOut);
677    assertEquals(1000, rowCount);
678  }
679
680  private int verifyHFile(Path p) throws IOException {
681    Configuration conf = util.getConfiguration();
682    HFile.Reader reader =
683      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
684    HFileScanner scanner = reader.getScanner(conf, false, false);
685    scanner.seekTo();
686    int count = 0;
687    do {
688      count++;
689    } while (scanner.next());
690    assertTrue(count > 0);
691    reader.close();
692    return count;
693  }
694
695  private void verifyHFileCreateTimeTS(Path p) throws IOException {
696    Configuration conf = util.getConfiguration();
697
698    try (HFile.Reader reader =
699      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) {
700      long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime();
701      MatcherAssert.assertThat(fileCreateTime, greaterThan(0L));
702    }
703  }
704
705  /**
706   * test split storefile with favorite node information
707   */
708  private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
709    throws IOException {
710    Configuration conf = util.getConfiguration();
711
712    try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
713
714      final byte[] firstRowkey = reader.getFirstRowKey().get();
715      final HRegionLocation hRegionLocation =
716        FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
717
718      final String targetHostName = hRegionLocation.getHostname();
719
720      if (fs instanceof DistributedFileSystem) {
721        String pathStr = p.toUri().getPath();
722        LocatedBlocks blocks =
723          ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
724
725        boolean isFavoriteNode = false;
726        List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
727        int index = 0;
728        do {
729          if (index > 0) {
730            assertTrue("failed use favored nodes", isFavoriteNode);
731          }
732          isFavoriteNode = false;
733          final LocatedBlock block = locatedBlocks.get(index);
734
735          final DatanodeInfo[] locations = getLocatedBlockLocations(block);
736          for (DatanodeInfo location : locations) {
737
738            final String hostName = location.getHostName();
739            if (
740              targetHostName.equals(hostName.equals("127.0.0.1")
741                ? InetAddress.getLocalHost().getHostName()
742                : "127.0.0.1") || targetHostName.equals(hostName)
743            ) {
744              isFavoriteNode = true;
745              break;
746            }
747          }
748
749          index++;
750        } while (index < locatedBlocks.size());
751        if (index > 0) {
752          assertTrue("failed use favored nodes", isFavoriteNode);
753        }
754
755      }
756
757    }
758  }
759
760  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
761    Integer value = map.containsKey(first) ? map.get(first) : 0;
762    map.put(first, value + 1);
763
764    value = map.containsKey(last) ? map.get(last) : 0;
765    map.put(last, value - 1);
766  }
767
768  @Test
769  public void testInferBoundaries() {
770    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
771
772    /*
773     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
774     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
775     * u---------x The output should be (m,r,u)
776     */
777
778    String first;
779    String last;
780
781    first = "a";
782    last = "e";
783    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
784
785    first = "r";
786    last = "s";
787    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
788
789    first = "o";
790    last = "p";
791    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
792
793    first = "g";
794    last = "k";
795    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
796
797    first = "v";
798    last = "x";
799    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
800
801    first = "c";
802    last = "i";
803    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
804
805    first = "m";
806    last = "q";
807    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
808
809    first = "s";
810    last = "t";
811    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
812
813    first = "u";
814    last = "w";
815    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
816
817    byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map);
818    byte[][] compare = new byte[3][];
819    compare[0] = Bytes.toBytes("m");
820    compare[1] = Bytes.toBytes("r");
821    compare[2] = Bytes.toBytes("u");
822
823    assertEquals(3, keysArray.length);
824
825    for (int row = 0; row < keysArray.length; row++) {
826      assertArrayEquals(keysArray[row], compare[row]);
827    }
828  }
829
830  @Test
831  public void testLoadTooMayHFiles() throws Exception {
832    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
833    FileSystem fs = util.getTestFileSystem();
834    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
835    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
836
837    byte[] from = Bytes.toBytes("begin");
838    byte[] to = Bytes.toBytes("end");
839    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
840      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
841        FAMILY, QUALIFIER, from, to, 1000);
842    }
843
844    try {
845      BulkLoadHFiles.create(util.getConfiguration())
846        .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir);
847      fail("Bulk loading too many files should fail");
848    } catch (IOException ie) {
849      assertTrue(ie.getMessage()
850        .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
851    }
852  }
853
854  @Test(expected = TableNotFoundException.class)
855  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
856    Configuration conf = util.getConfiguration();
857    conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no");
858    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
859    String[] args = { "directory", "nonExistingTable" };
860    loader.run(args);
861  }
862
863  @Test
864  public void testTableWithCFNameStartWithUnderScore() throws Exception {
865    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
866    FileSystem fs = util.getTestFileSystem();
867    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
868    String family = "_cf";
869    Path familyDir = new Path(dir, family);
870
871    byte[] from = Bytes.toBytes("begin");
872    byte[] to = Bytes.toBytes("end");
873    Configuration conf = util.getConfiguration();
874    String tableName = tn.getMethodName();
875    try (Table table = util.createTable(TableName.valueOf(tableName), family)) {
876      HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
877        QUALIFIER, from, to, 1000);
878      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir);
879      assertEquals(1000, countRows(table));
880    }
881  }
882
883  @Test
884  public void testBulkLoadByFamily() throws Exception {
885    Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily");
886    FileSystem fs = util.getTestFileSystem();
887    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
888    String tableName = tn.getMethodName();
889    String[] families = { "cf1", "cf2", "cf3" };
890    for (int i = 0; i < families.length; i++) {
891      byte[] from = Bytes.toBytes(i + "begin");
892      byte[] to = Bytes.toBytes(i + "end");
893      Path familyDir = new Path(dir, families[i]);
894      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"),
895        Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000);
896    }
897    Table table = util.createTable(TableName.valueOf(tableName), families);
898    final AtomicInteger attmptedCalls = new AtomicInteger();
899    util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
900    BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) {
901      @Override
902      protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
903        final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
904        final byte[] first, Collection<LoadQueueItem> lqis) {
905        attmptedCalls.incrementAndGet();
906        return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis);
907      }
908    };
909    try {
910      loader.bulkLoad(table.getName(), dir);
911      assertEquals(families.length, attmptedCalls.get());
912      assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table));
913    } finally {
914      if (null != table) {
915        table.close();
916      }
917      util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
918    }
919  }
920
921  @Test
922  public void testFailIfNeedSplitHFile() throws IOException {
923    TableName tableName = TableName.valueOf(tn.getMethodName());
924    Table table = util.createTable(tableName, FAMILY);
925
926    util.loadTable(table, FAMILY);
927
928    FileSystem fs = util.getTestFileSystem();
929    Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
930    HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
931      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
932
933    util.getAdmin().split(tableName);
934    util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
935
936    Configuration config = new Configuration(util.getConfiguration());
937    config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
938    BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
939
940    String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
941    assertThrows(IOException.class, () -> tool.run(args));
942    util.getHBaseCluster().getRegions(tableName)
943      .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
944  }
945}