001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.junit.Assert.assertArrayEquals;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertThrows;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.net.InetAddress;
031import java.nio.ByteBuffer;
032import java.util.ArrayList;
033import java.util.Collection;
034import java.util.List;
035import java.util.Locale;
036import java.util.Map;
037import java.util.TreeMap;
038import java.util.concurrent.CompletableFuture;
039import java.util.concurrent.atomic.AtomicInteger;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataOutputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseClassTestRule;
046import org.apache.hadoop.hbase.HBaseTestingUtil;
047import org.apache.hadoop.hbase.HConstants;
048import org.apache.hadoop.hbase.HRegionLocation;
049import org.apache.hadoop.hbase.NamespaceDescriptor;
050import org.apache.hadoop.hbase.TableName;
051import org.apache.hadoop.hbase.TableNotFoundException;
052import org.apache.hadoop.hbase.client.AsyncClusterConnection;
053import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
056import org.apache.hadoop.hbase.client.Table;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
062import org.apache.hadoop.hbase.io.hfile.CacheConfig;
063import org.apache.hadoop.hbase.io.hfile.HFile;
064import org.apache.hadoop.hbase.io.hfile.HFileScanner;
065import org.apache.hadoop.hbase.regionserver.BloomType;
066import org.apache.hadoop.hbase.testclassification.LargeTests;
067import org.apache.hadoop.hbase.testclassification.MiscTests;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.CommonFSUtils;
070import org.apache.hadoop.hbase.util.FutureUtils;
071import org.apache.hadoop.hbase.util.HFileTestUtil;
072import org.apache.hadoop.hdfs.DistributedFileSystem;
073import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
074import org.apache.hadoop.hdfs.protocol.LocatedBlock;
075import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
076import org.hamcrest.MatcherAssert;
077import org.junit.AfterClass;
078import org.junit.BeforeClass;
079import org.junit.ClassRule;
080import org.junit.Rule;
081import org.junit.Test;
082import org.junit.experimental.categories.Category;
083import org.junit.rules.TestName;
084
085import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
086
087/**
088 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
089 * faster than the full MR cluster tests in TestHFileOutputFormat
090 */
091@Category({ MiscTests.class, LargeTests.class })
092public class TestBulkLoadHFiles {
093
094  @ClassRule
095  public static final HBaseClassTestRule CLASS_RULE =
096    HBaseClassTestRule.forClass(TestBulkLoadHFiles.class);
097
098  @Rule
099  public TestName tn = new TestName();
100
101  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
102  private static final byte[] FAMILY = Bytes.toBytes("myfam");
103  private static final String NAMESPACE = "bulkNS";
104
105  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
106  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
107
108  private static final byte[][] SPLIT_KEYS =
109    new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
110
111  static HBaseTestingUtil util = new HBaseTestingUtil();
112
113  @BeforeClass
114  public static void setUpBeforeClass() throws Exception {
115    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
116    util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
117      MAX_FILES_PER_REGION_PER_FAMILY);
118    // change default behavior so that tag values are returned with normal rpcs
119    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
120      KeyValueCodecWithTags.class.getCanonicalName());
121    util.startMiniCluster();
122
123    setupNamespace();
124  }
125
126  protected static void setupNamespace() throws Exception {
127    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
128  }
129
130  @AfterClass
131  public static void tearDownAfterClass() throws Exception {
132    util.shutdownMiniCluster();
133  }
134
135  @Test
136  public void testSimpleLoadWithMap() throws Exception {
137    runTest("testSimpleLoadWithMap", BloomType.NONE,
138      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
139        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
140      true);
141  }
142
143  /**
144   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
145   */
146  @Test
147  public void testSimpleLoad() throws Exception {
148    runTest("testSimpleLoad", BloomType.NONE,
149      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
150        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
151  }
152
153  @Test
154  public void testSimpleLoadWithFileCopy() throws Exception {
155    String testName = tn.getMethodName();
156    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
157    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
158      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
159        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
160      false, true, 2);
161  }
162
163  /**
164   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
165   */
166  @Test
167  public void testRegionCrossingLoad() throws Exception {
168    runTest("testRegionCrossingLoad", BloomType.NONE,
169      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
170        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
171  }
172
173  /**
174   * Test loading into a column family that has a ROW bloom filter.
175   */
176  @Test
177  public void testRegionCrossingRowBloom() throws Exception {
178    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
179      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
180        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
181  }
182
183  /**
184   * Test loading into a column family that has a ROWCOL bloom filter.
185   */
186  @Test
187  public void testRegionCrossingRowColBloom() throws Exception {
188    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
189      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
190        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
191  }
192
193  /**
194   * Test case that creates some regions and loads HFiles that have different region boundaries than
195   * the table pre-split.
196   */
197  @Test
198  public void testSimpleHFileSplit() throws Exception {
199    runTest("testHFileSplit", BloomType.NONE,
200      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
201        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
202      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
203        new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
204  }
205
206  /**
207   * Test case that creates some regions and loads HFiles that cross the boundaries and have
208   * different region boundaries than the table pre-split.
209   */
210  @Test
211  public void testRegionCrossingHFileSplit() throws Exception {
212    testRegionCrossingHFileSplit(BloomType.NONE);
213  }
214
215  /**
216   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
217   * filter and a different region boundaries than the table pre-split.
218   */
219  @Test
220  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
221    testRegionCrossingHFileSplit(BloomType.ROW);
222  }
223
224  /**
225   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
226   * bloom filter and a different region boundaries than the table pre-split.
227   */
228  @Test
229  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
230    testRegionCrossingHFileSplit(BloomType.ROWCOL);
231  }
232
233  @Test
234  public void testSplitALot() throws Exception {
235    runTest("testSplitALot", BloomType.NONE,
236      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
237        Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
238        Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
239        Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
240        Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
241        Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
242      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
243  }
244
245  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
246    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
247      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
248        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
249      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
250        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
251  }
252
253  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
254    return TableDescriptorBuilder.newBuilder(tableName)
255      .setColumnFamily(
256        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
257      .build();
258  }
259
260  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
261    throws Exception {
262    runTest(testName, bloomType, null, hfileRanges);
263  }
264
265  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
266    throws Exception {
267    runTest(testName, bloomType, null, hfileRanges, useMap);
268  }
269
270  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
271    byte[][][] hfileRanges) throws Exception {
272    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
273  }
274
275  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
276    byte[][][] hfileRanges, boolean useMap) throws Exception {
277    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
278    final boolean preCreateTable = tableSplitKeys != null;
279
280    // Run the test bulkloading the table to the default namespace
281    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
282    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
283      useMap, 2);
284
285    /*
286     * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
287     * -- regionDir -- familyDir -- storeFileDir
288     */
289    if (preCreateTable) {
290      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
291        3);
292    }
293
294    // Run the test bulkloading the table to the specified namespace
295    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
296    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
297      2);
298  }
299
300  private void runTest(String testName, TableName tableName, BloomType bloomType,
301    boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
302    int depth) throws Exception {
303    TableDescriptor htd = buildHTD(tableName, bloomType);
304    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
305  }
306
307  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util,
308    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
309    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
310    int factor, int depth) throws Exception {
311    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
312    FileSystem fs = util.getTestFileSystem();
313    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
314    Path parentDir = baseDirectory;
315    if (depth == 3) {
316      assert !useMap;
317      parentDir = new Path(baseDirectory, "someRegion");
318    }
319    Path familyDir = new Path(parentDir, Bytes.toString(fam));
320
321    int hfileIdx = 0;
322    Map<byte[], List<Path>> map = null;
323    List<Path> list = null;
324    if (useMap || copyFiles) {
325      list = new ArrayList<>();
326    }
327    if (useMap) {
328      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
329      map.put(fam, list);
330    }
331    Path last = null;
332    for (byte[][] range : hfileRanges) {
333      byte[] from = range[0];
334      byte[] to = range[1];
335      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
336      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
337      if (useMap) {
338        last = path;
339        list.add(path);
340      }
341    }
342    int expectedRows = hfileIdx * factor;
343
344    TableName tableName = htd.getTableName();
345    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
346      if (tableSplitKeys != null) {
347        util.getAdmin().createTable(htd, tableSplitKeys);
348      } else {
349        util.getAdmin().createTable(htd);
350      }
351    }
352
353    Configuration conf = util.getConfiguration();
354    if (copyFiles) {
355      conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
356    }
357    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
358    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
359    if (depth == 3) {
360      args.add("-loadTable");
361    }
362
363    if (useMap) {
364      if (deleteFile) {
365        fs.delete(last, true);
366      }
367      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
368      if (deleteFile) {
369        expectedRows -= 1000;
370        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
371          if (item.getFilePath().getName().equals(last.getName())) {
372            fail(last + " should be missing");
373          }
374        }
375      }
376    } else {
377      loader.run(args.toArray(new String[] {}));
378    }
379
380    if (copyFiles) {
381      for (Path p : list) {
382        assertTrue(p + " should exist", fs.exists(p));
383      }
384    }
385
386    try (Table table = util.getConnection().getTable(tableName)) {
387      assertEquals(initRowCount + expectedRows, countRows(table));
388    }
389
390    return expectedRows;
391  }
392
393  private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
394    byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
395    throws Exception {
396    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
397      useMap, true, copyFiles, 0, 1000, depth);
398
399    final TableName tableName = htd.getTableName();
400    // verify staging folder has been cleaned up
401    Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
402      HConstants.BULKLOAD_STAGING_DIR_NAME);
403    FileSystem fs = util.getTestFileSystem();
404    if (fs.exists(stagingBasePath)) {
405      FileStatus[] files = fs.listStatus(stagingBasePath);
406      for (FileStatus file : files) {
407        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
408          file.getPath().getName() != "DONOTERASE");
409      }
410    }
411
412    util.deleteTable(tableName);
413  }
414
415  /**
416   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
417   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
418   * responses.
419   */
420  @Test
421  public void testTagsSurviveBulkLoadSplit() throws Exception {
422    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
423    FileSystem fs = util.getTestFileSystem();
424    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
425    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
426    // table has these split points
427    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
428      Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
429
430    // creating an hfile that has values that span the split points.
431    byte[] from = Bytes.toBytes("ddd");
432    byte[] to = Bytes.toBytes("ooo");
433    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
434      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
435    int expectedRows = 1000;
436
437    TableName tableName = TableName.valueOf(tn.getMethodName());
438    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
439    util.getAdmin().createTable(htd, tableSplitKeys);
440
441    BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
442
443    Table table = util.getConnection().getTable(tableName);
444    try {
445      assertEquals(expectedRows, countRows(table));
446      HFileTestUtil.verifyTags(table);
447    } finally {
448      table.close();
449    }
450
451    util.deleteTable(tableName);
452  }
453
454  /**
455   * Test loading into a column family that does not exist.
456   */
457  @Test
458  public void testNonexistentColumnFamilyLoad() throws Exception {
459    String testName = tn.getMethodName();
460    byte[][][] hFileRanges =
461      new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
462        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
463
464    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
465    // set real family name to upper case in purpose to simulate the case that
466    // family name in HFiles is invalid
467    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
468      .setColumnFamily(ColumnFamilyDescriptorBuilder
469        .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
470      .build();
471
472    try {
473      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
474      assertTrue("Loading into table with non-existent family should have failed", false);
475    } catch (Exception e) {
476      assertTrue("IOException expected", e instanceof IOException);
477      // further check whether the exception message is correct
478      String errMsg = e.getMessage();
479      assertTrue(
480        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY
481          + "], current message: [" + errMsg + "]",
482        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
483    }
484  }
485
486  @Test
487  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
488    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
489  }
490
491  @Test
492  public void testNonHfileFolder() throws Exception {
493    testNonHfileFolder("testNonHfileFolder", false);
494  }
495
496  /**
497   * Write a random data file and a non-file in a dir with a valid family name but not part of the
498   * table families. we should we able to bulkload without getting the unmatched family exception.
499   * HBASE-13037/HBASE-13227
500   */
501  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
502    Path dir = util.getDataTestDirOnTestFS(tableName);
503    FileSystem fs = util.getTestFileSystem();
504    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
505
506    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
507    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
508      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
509    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
510
511    final String NON_FAMILY_FOLDER = "_logs";
512    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
513    fs.mkdirs(nonFamilyDir);
514    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
515    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
516
517    Table table = null;
518    try {
519      if (preCreateTable) {
520        table = util.createTable(TableName.valueOf(tableName), FAMILY);
521      } else {
522        table = util.getConnection().getTable(TableName.valueOf(tableName));
523      }
524      BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir);
525      assertEquals(500, countRows(table));
526    } finally {
527      if (table != null) {
528        table.close();
529      }
530      fs.delete(dir, true);
531    }
532  }
533
534  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
535    FSDataOutputStream stream = fs.create(path);
536    try {
537      byte[] data = new byte[1024];
538      for (int i = 0; i < data.length; ++i) {
539        data[i] = (byte) (i & 0xff);
540      }
541      while (size >= data.length) {
542        stream.write(data, 0, data.length);
543        size -= data.length;
544      }
545      if (size > 0) {
546        stream.write(data, 0, size);
547      }
548    } finally {
549      stream.close();
550    }
551  }
552
553  @Test
554  public void testSplitStoreFile() throws IOException {
555    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
556    FileSystem fs = util.getTestFileSystem();
557    Path testIn = new Path(dir, "testhfile");
558    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
559    String tableName = tn.getMethodName();
560    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
561    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
562      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
563
564    Path bottomOut = new Path(dir, "bottom.out");
565    Path topOut = new Path(dir, "top.out");
566
567    BulkLoadHFilesTool.splitStoreFile(
568      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
569      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
570
571    int rowCount = verifyHFile(bottomOut);
572    rowCount += verifyHFile(topOut);
573    assertEquals(1000, rowCount);
574  }
575
576  /**
577   * Test hfile splits with the favored nodes
578   */
579  @Test
580  public void testSplitStoreFileWithFavoriteNodes() throws IOException {
581
582    Path dir = new Path(util.getDefaultRootDirPath(), "testhfile");
583    FileSystem fs = util.getDFSCluster().getFileSystem();
584
585    Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes");
586    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
587    String tableName = tn.getMethodName();
588    Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
589    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
590      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
591
592    Path bottomOut = new Path(dir, "bottom.out");
593    Path topOut = new Path(dir, "top.out");
594
595    final AsyncTableRegionLocator regionLocator =
596      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName));
597    BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc,
598      Bytes.toBytes("ggg"), bottomOut, topOut);
599    verifyHFileFavoriteNode(topOut, regionLocator, fs);
600    verifyHFileFavoriteNode(bottomOut, regionLocator, fs);
601    int rowCount = verifyHFile(bottomOut);
602    rowCount += verifyHFile(topOut);
603    assertEquals(1000, rowCount);
604  }
605
606  @Test
607  public void testSplitStoreFileWithCreateTimeTS() throws IOException {
608    Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS");
609    FileSystem fs = util.getTestFileSystem();
610    Path testIn = new Path(dir, "testhfile");
611    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
612    String tableName = tn.getMethodName();
613    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
614    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
615      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
616
617    Path bottomOut = new Path(dir, "bottom.out");
618    Path topOut = new Path(dir, "top.out");
619
620    BulkLoadHFilesTool.splitStoreFile(
621      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
622      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
623
624    verifyHFileCreateTimeTS(bottomOut);
625    verifyHFileCreateTimeTS(topOut);
626  }
627
628  @Test
629  public void testSplitStoreFileWithNoneToNone() throws IOException {
630    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
631  }
632
633  @Test
634  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
635    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
636  }
637
638  @Test
639  public void testSplitStoreFileWithEncodedToNone() throws IOException {
640    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
641  }
642
643  @Test
644  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
645    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
646  }
647
648  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
649    DataBlockEncoding cfEncoding) throws IOException {
650    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
651    FileSystem fs = util.getTestFileSystem();
652    Path testIn = new Path(dir, "testhfile");
653    ColumnFamilyDescriptor familyDesc =
654      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
655    String tableName = tn.getMethodName();
656    util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString());
657    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
658      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
659
660    Path bottomOut = new Path(dir, "bottom.out");
661    Path topOut = new Path(dir, "top.out");
662
663    BulkLoadHFilesTool.splitStoreFile(
664      util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)),
665      util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut);
666
667    int rowCount = verifyHFile(bottomOut);
668    rowCount += verifyHFile(topOut);
669    assertEquals(1000, rowCount);
670  }
671
672  private int verifyHFile(Path p) throws IOException {
673    Configuration conf = util.getConfiguration();
674    HFile.Reader reader =
675      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
676    HFileScanner scanner = reader.getScanner(conf, false, false);
677    scanner.seekTo();
678    int count = 0;
679    do {
680      count++;
681    } while (scanner.next());
682    assertTrue(count > 0);
683    reader.close();
684    return count;
685  }
686
687  private void verifyHFileCreateTimeTS(Path p) throws IOException {
688    Configuration conf = util.getConfiguration();
689
690    try (HFile.Reader reader =
691      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) {
692      long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime();
693      MatcherAssert.assertThat(fileCreateTime, greaterThan(0L));
694    }
695  }
696
697  /**
698   * test split storefile with favorite node information
699   */
700  private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs)
701    throws IOException {
702    Configuration conf = util.getConfiguration();
703
704    try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) {
705
706      final byte[] firstRowkey = reader.getFirstRowKey().get();
707      final HRegionLocation hRegionLocation =
708        FutureUtils.get(regionLocator.getRegionLocation(firstRowkey));
709
710      final String targetHostName = hRegionLocation.getHostname();
711
712      if (fs instanceof DistributedFileSystem) {
713        String pathStr = p.toUri().getPath();
714        LocatedBlocks blocks =
715          ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L);
716
717        boolean isFavoriteNode = false;
718        List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks();
719        int index = 0;
720        do {
721          if (index > 0) {
722            assertTrue("failed use favored nodes", isFavoriteNode);
723          }
724          isFavoriteNode = false;
725          final LocatedBlock block = locatedBlocks.get(index);
726
727          final DatanodeInfo[] locations = getLocatedBlockLocations(block);
728          for (DatanodeInfo location : locations) {
729
730            final String hostName = location.getHostName();
731            if (
732              targetHostName.equals(hostName.equals("127.0.0.1")
733                ? InetAddress.getLocalHost().getHostName()
734                : "127.0.0.1") || targetHostName.equals(hostName)
735            ) {
736              isFavoriteNode = true;
737              break;
738            }
739          }
740
741          index++;
742        } while (index < locatedBlocks.size());
743        if (index > 0) {
744          assertTrue("failed use favored nodes", isFavoriteNode);
745        }
746
747      }
748
749    }
750  }
751
752  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
753    Integer value = map.containsKey(first) ? map.get(first) : 0;
754    map.put(first, value + 1);
755
756    value = map.containsKey(last) ? map.get(last) : 0;
757    map.put(last, value - 1);
758  }
759
760  @Test
761  public void testInferBoundaries() {
762    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
763
764    /*
765     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
766     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
767     * u---------x The output should be (m,r,u)
768     */
769
770    String first;
771    String last;
772
773    first = "a";
774    last = "e";
775    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
776
777    first = "r";
778    last = "s";
779    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
780
781    first = "o";
782    last = "p";
783    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
784
785    first = "g";
786    last = "k";
787    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
788
789    first = "v";
790    last = "x";
791    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
792
793    first = "c";
794    last = "i";
795    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
796
797    first = "m";
798    last = "q";
799    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
800
801    first = "s";
802    last = "t";
803    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
804
805    first = "u";
806    last = "w";
807    addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
808
809    byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map);
810    byte[][] compare = new byte[3][];
811    compare[0] = Bytes.toBytes("m");
812    compare[1] = Bytes.toBytes("r");
813    compare[2] = Bytes.toBytes("u");
814
815    assertEquals(3, keysArray.length);
816
817    for (int row = 0; row < keysArray.length; row++) {
818      assertArrayEquals(keysArray[row], compare[row]);
819    }
820  }
821
822  @Test
823  public void testLoadTooMayHFiles() throws Exception {
824    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
825    FileSystem fs = util.getTestFileSystem();
826    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
827    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
828
829    byte[] from = Bytes.toBytes("begin");
830    byte[] to = Bytes.toBytes("end");
831    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
832      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
833        FAMILY, QUALIFIER, from, to, 1000);
834    }
835
836    try {
837      BulkLoadHFiles.create(util.getConfiguration())
838        .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir);
839      fail("Bulk loading too many files should fail");
840    } catch (IOException ie) {
841      assertTrue(ie.getMessage()
842        .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
843    }
844  }
845
846  @Test(expected = TableNotFoundException.class)
847  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
848    Configuration conf = util.getConfiguration();
849    conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no");
850    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
851    String[] args = { "directory", "nonExistingTable" };
852    loader.run(args);
853  }
854
855  @Test
856  public void testTableWithCFNameStartWithUnderScore() throws Exception {
857    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
858    FileSystem fs = util.getTestFileSystem();
859    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
860    String family = "_cf";
861    Path familyDir = new Path(dir, family);
862
863    byte[] from = Bytes.toBytes("begin");
864    byte[] to = Bytes.toBytes("end");
865    Configuration conf = util.getConfiguration();
866    String tableName = tn.getMethodName();
867    try (Table table = util.createTable(TableName.valueOf(tableName), family)) {
868      HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
869        QUALIFIER, from, to, 1000);
870      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir);
871      assertEquals(1000, countRows(table));
872    }
873  }
874
875  @Test
876  public void testBulkLoadByFamily() throws Exception {
877    Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily");
878    FileSystem fs = util.getTestFileSystem();
879    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
880    String tableName = tn.getMethodName();
881    String[] families = { "cf1", "cf2", "cf3" };
882    for (int i = 0; i < families.length; i++) {
883      byte[] from = Bytes.toBytes(i + "begin");
884      byte[] to = Bytes.toBytes(i + "end");
885      Path familyDir = new Path(dir, families[i]);
886      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"),
887        Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000);
888    }
889    Table table = util.createTable(TableName.valueOf(tableName), families);
890    final AtomicInteger attmptedCalls = new AtomicInteger();
891    util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
892    BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) {
893      @Override
894      protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
895        final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
896        final byte[] first, Collection<LoadQueueItem> lqis) {
897        attmptedCalls.incrementAndGet();
898        return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis);
899      }
900    };
901    try {
902      loader.bulkLoad(table.getName(), dir);
903      assertEquals(families.length, attmptedCalls.get());
904      assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table));
905    } finally {
906      if (null != table) {
907        table.close();
908      }
909      util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
910    }
911  }
912
913  @Test
914  public void testFailIfNeedSplitHFile() throws IOException {
915    TableName tableName = TableName.valueOf(tn.getMethodName());
916    Table table = util.createTable(tableName, FAMILY);
917
918    util.loadTable(table, FAMILY);
919
920    FileSystem fs = util.getTestFileSystem();
921    Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
922    HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
923      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
924
925    util.getAdmin().split(tableName);
926    util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
927
928    Configuration config = new Configuration(util.getConfiguration());
929    config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
930    BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
931
932    String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
933    assertThrows(IOException.class, () -> tool.run(args));
934    util.getHBaseCluster().getRegions(tableName)
935      .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
936  }
937}