001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertThrows;
023import static org.junit.Assert.assertTrue;
024import static org.junit.Assert.fail;
025
026import java.io.IOException;
027import java.nio.ByteBuffer;
028import java.util.ArrayList;
029import java.util.Collection;
030import java.util.List;
031import java.util.Locale;
032import java.util.Map;
033import java.util.TreeMap;
034import java.util.concurrent.atomic.AtomicInteger;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.fs.FSDataOutputStream;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.hbase.HBaseClassTestRule;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.NamespaceDescriptor;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.Connection;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
053import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
054import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
055import org.apache.hadoop.hbase.io.hfile.CacheConfig;
056import org.apache.hadoop.hbase.io.hfile.HFile;
057import org.apache.hadoop.hbase.io.hfile.HFileScanner;
058import org.apache.hadoop.hbase.regionserver.BloomType;
059import org.apache.hadoop.hbase.testclassification.LargeTests;
060import org.apache.hadoop.hbase.testclassification.MiscTests;
061import org.apache.hadoop.hbase.util.Bytes;
062import org.apache.hadoop.hbase.util.CommonFSUtils;
063import org.apache.hadoop.hbase.util.HFileTestUtil;
064import org.junit.AfterClass;
065import org.junit.BeforeClass;
066import org.junit.ClassRule;
067import org.junit.Rule;
068import org.junit.Test;
069import org.junit.experimental.categories.Category;
070import org.junit.rules.TestName;
071
072import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
073
074/**
075 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
076 * faster than the full MR cluster tests in TestHFileOutputFormat
077 */
078@Category({ MiscTests.class, LargeTests.class })
079public class TestLoadIncrementalHFiles {
080
081  @ClassRule
082  public static final HBaseClassTestRule CLASS_RULE =
083    HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class);
084
085  @Rule
086  public TestName tn = new TestName();
087
088  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
089  private static final byte[] FAMILY = Bytes.toBytes("myfam");
090  private static final String NAMESPACE = "bulkNS";
091
092  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
093  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
094
095  private static final byte[][] SPLIT_KEYS =
096    new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
097
098  static HBaseTestingUtility util = new HBaseTestingUtility();
099
100  @BeforeClass
101  public static void setUpBeforeClass() throws Exception {
102    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
103    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
104      MAX_FILES_PER_REGION_PER_FAMILY);
105    // change default behavior so that tag values are returned with normal rpcs
106    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
107      KeyValueCodecWithTags.class.getCanonicalName());
108    util.startMiniCluster();
109
110    setupNamespace();
111  }
112
113  protected static void setupNamespace() throws Exception {
114    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
115  }
116
117  @AfterClass
118  public static void tearDownAfterClass() throws Exception {
119    util.shutdownMiniCluster();
120  }
121
122  @Test
123  public void testSimpleLoadWithMap() throws Exception {
124    runTest("testSimpleLoadWithMap", BloomType.NONE,
125      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
126        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
127      true);
128  }
129
130  /**
131   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
132   */
133  @Test
134  public void testSimpleLoad() throws Exception {
135    runTest("testSimpleLoad", BloomType.NONE,
136      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
137        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
138  }
139
140  @Test
141  public void testSimpleLoadWithFileCopy() throws Exception {
142    String testName = tn.getMethodName();
143    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
144    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
145      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
146        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
147      false, true, 2);
148  }
149
150  /**
151   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
152   */
153  @Test
154  public void testRegionCrossingLoad() throws Exception {
155    runTest("testRegionCrossingLoad", BloomType.NONE,
156      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
157        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
158  }
159
160  /**
161   * Test loading into a column family that has a ROW bloom filter.
162   */
163  @Test
164  public void testRegionCrossingRowBloom() throws Exception {
165    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
166      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
167        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
168  }
169
170  /**
171   * Test loading into a column family that has a ROWCOL bloom filter.
172   */
173  @Test
174  public void testRegionCrossingRowColBloom() throws Exception {
175    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
176      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
177        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
178  }
179
180  /**
181   * Test case that creates some regions and loads HFiles that have different region boundaries than
182   * the table pre-split.
183   */
184  @Test
185  public void testSimpleHFileSplit() throws Exception {
186    runTest("testHFileSplit", BloomType.NONE,
187      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
188        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
189      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
190        new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
191  }
192
193  /**
194   * Test case that creates some regions and loads HFiles that cross the boundaries and have
195   * different region boundaries than the table pre-split.
196   */
197  @Test
198  public void testRegionCrossingHFileSplit() throws Exception {
199    testRegionCrossingHFileSplit(BloomType.NONE);
200  }
201
202  /**
203   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
204   * filter and a different region boundaries than the table pre-split.
205   */
206  @Test
207  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
208    testRegionCrossingHFileSplit(BloomType.ROW);
209  }
210
211  /**
212   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
213   * bloom filter and a different region boundaries than the table pre-split.
214   */
215  @Test
216  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
217    testRegionCrossingHFileSplit(BloomType.ROWCOL);
218  }
219
220  @Test
221  public void testSplitALot() throws Exception {
222    runTest("testSplitALot", BloomType.NONE,
223      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
224        Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
225        Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
226        Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
227        Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
228        Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
229      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
230  }
231
232  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
233    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
234      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
235        Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
236      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
237        new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
238  }
239
240  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
241    return TableDescriptorBuilder.newBuilder(tableName)
242      .setColumnFamily(
243        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
244      .build();
245  }
246
247  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
248    throws Exception {
249    runTest(testName, bloomType, null, hfileRanges);
250  }
251
252  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
253    throws Exception {
254    runTest(testName, bloomType, null, hfileRanges, useMap);
255  }
256
257  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
258    byte[][][] hfileRanges) throws Exception {
259    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
260  }
261
262  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
263    byte[][][] hfileRanges, boolean useMap) throws Exception {
264    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
265    final boolean preCreateTable = tableSplitKeys != null;
266
267    // Run the test bulkloading the table to the default namespace
268    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
269    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
270      useMap, 2);
271
272    /*
273     * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
274     * -- regionDir -- familyDir -- storeFileDir
275     */
276    if (preCreateTable) {
277      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
278        3);
279    }
280
281    // Run the test bulkloading the table to the specified namespace
282    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
283    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
284      2);
285  }
286
287  private void runTest(String testName, TableName tableName, BloomType bloomType,
288    boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
289    int depth) throws Exception {
290    TableDescriptor htd = buildHTD(tableName, bloomType);
291    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
292  }
293
294  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
295    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
296    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
297    int factor) throws Exception {
298    return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
299      useMap, deleteFile, copyFiles, initRowCount, factor, 2);
300  }
301
302  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
303    byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
304    byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount,
305    int factor, int depth) throws Exception {
306    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
307    FileSystem fs = util.getTestFileSystem();
308    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
309    Path parentDir = baseDirectory;
310    if (depth == 3) {
311      assert !useMap;
312      parentDir = new Path(baseDirectory, "someRegion");
313    }
314    Path familyDir = new Path(parentDir, Bytes.toString(fam));
315
316    int hfileIdx = 0;
317    Map<byte[], List<Path>> map = null;
318    List<Path> list = null;
319    if (useMap || copyFiles) {
320      list = new ArrayList<>();
321    }
322    if (useMap) {
323      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
324      map.put(fam, list);
325    }
326    Path last = null;
327    for (byte[][] range : hfileRanges) {
328      byte[] from = range[0];
329      byte[] to = range[1];
330      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
331      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
332      if (useMap) {
333        last = path;
334        list.add(path);
335      }
336    }
337    int expectedRows = hfileIdx * factor;
338
339    TableName tableName = htd.getTableName();
340    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
341      util.getAdmin().createTable(htd, tableSplitKeys);
342    }
343
344    Configuration conf = util.getConfiguration();
345    if (copyFiles) {
346      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
347    }
348    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
349    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
350    if (depth == 3) {
351      args.add("-loadTable");
352    }
353
354    if (useMap) {
355      if (deleteFile) {
356        fs.delete(last, true);
357      }
358      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
359      if (deleteFile) {
360        expectedRows -= 1000;
361        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
362          if (item.getFilePath().getName().equals(last.getName())) {
363            fail(last + " should be missing");
364          }
365        }
366      }
367    } else {
368      loader.run(args.toArray(new String[] {}));
369    }
370
371    if (copyFiles) {
372      for (Path p : list) {
373        assertTrue(p + " should exist", fs.exists(p));
374      }
375    }
376
377    Table table = util.getConnection().getTable(tableName);
378    try {
379      assertEquals(initRowCount + expectedRows, util.countRows(table));
380    } finally {
381      table.close();
382    }
383
384    return expectedRows;
385  }
386
387  private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
388    byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
389    throws Exception {
390    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
391      useMap, true, copyFiles, 0, 1000, depth);
392
393    final TableName tableName = htd.getTableName();
394    // verify staging folder has been cleaned up
395    Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()),
396      HConstants.BULKLOAD_STAGING_DIR_NAME);
397    FileSystem fs = util.getTestFileSystem();
398    if (fs.exists(stagingBasePath)) {
399      FileStatus[] files = fs.listStatus(stagingBasePath);
400      for (FileStatus file : files) {
401        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
402          file.getPath().getName() != "DONOTERASE");
403      }
404    }
405
406    util.deleteTable(tableName);
407  }
408
409  /**
410   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
411   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
412   * responses.
413   */
414  @Test
415  public void testTagsSurviveBulkLoadSplit() throws Exception {
416    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
417    FileSystem fs = util.getTestFileSystem();
418    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
419    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
420    // table has these split points
421    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
422      Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
423
424    // creating an hfile that has values that span the split points.
425    byte[] from = Bytes.toBytes("ddd");
426    byte[] to = Bytes.toBytes("ooo");
427    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
428      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
429    int expectedRows = 1000;
430
431    TableName tableName = TableName.valueOf(tn.getMethodName());
432    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
433    util.getAdmin().createTable(htd, tableSplitKeys);
434
435    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
436    String[] args = { dir.toString(), tableName.toString() };
437    loader.run(args);
438
439    Table table = util.getConnection().getTable(tableName);
440    try {
441      assertEquals(expectedRows, util.countRows(table));
442      HFileTestUtil.verifyTags(table);
443    } finally {
444      table.close();
445    }
446
447    util.deleteTable(tableName);
448  }
449
450  /**
451   * Test loading into a column family that does not exist.
452   */
453  @Test
454  public void testNonexistentColumnFamilyLoad() throws Exception {
455    String testName = tn.getMethodName();
456    byte[][][] hFileRanges =
457      new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
458        new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
459
460    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
461    // set real family name to upper case in purpose to simulate the case that
462    // family name in HFiles is invalid
463    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
464      .setColumnFamily(ColumnFamilyDescriptorBuilder
465        .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
466      .build();
467
468    try {
469      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
470      assertTrue("Loading into table with non-existent family should have failed", false);
471    } catch (Exception e) {
472      assertTrue("IOException expected", e instanceof IOException);
473      // further check whether the exception message is correct
474      String errMsg = e.getMessage();
475      assertTrue(
476        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY
477          + "], current message: [" + errMsg + "]",
478        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
479    }
480  }
481
482  @Test
483  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
484    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
485  }
486
487  @Test
488  public void testNonHfileFolder() throws Exception {
489    testNonHfileFolder("testNonHfileFolder", false);
490  }
491
492  /**
493   * Write a random data file and a non-file in a dir with a valid family name but not part of the
494   * table families. we should we able to bulkload without getting the unmatched family exception.
495   * HBASE-13037/HBASE-13227
496   */
497  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
498    Path dir = util.getDataTestDirOnTestFS(tableName);
499    FileSystem fs = util.getTestFileSystem();
500    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
501
502    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
503    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
504      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
505    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
506
507    final String NON_FAMILY_FOLDER = "_logs";
508    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
509    fs.mkdirs(nonFamilyDir);
510    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
511    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
512
513    Table table = null;
514    try {
515      if (preCreateTable) {
516        table = util.createTable(TableName.valueOf(tableName), FAMILY);
517      } else {
518        table = util.getConnection().getTable(TableName.valueOf(tableName));
519      }
520
521      final String[] args = { dir.toString(), tableName };
522      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
523      assertEquals(500, util.countRows(table));
524    } finally {
525      if (table != null) {
526        table.close();
527      }
528      fs.delete(dir, true);
529    }
530  }
531
532  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
533    FSDataOutputStream stream = fs.create(path);
534    try {
535      byte[] data = new byte[1024];
536      for (int i = 0; i < data.length; ++i) {
537        data[i] = (byte) (i & 0xff);
538      }
539      while (size >= data.length) {
540        stream.write(data, 0, data.length);
541        size -= data.length;
542      }
543      if (size > 0) {
544        stream.write(data, 0, size);
545      }
546    } finally {
547      stream.close();
548    }
549  }
550
551  @Test
552  public void testSplitStoreFile() throws IOException {
553    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
554    FileSystem fs = util.getTestFileSystem();
555    Path testIn = new Path(dir, "testhfile");
556    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
557    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
558      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
559
560    Path bottomOut = new Path(dir, "bottom.out");
561    Path topOut = new Path(dir, "top.out");
562
563    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
564      Bytes.toBytes("ggg"), bottomOut, topOut);
565
566    int rowCount = verifyHFile(bottomOut);
567    rowCount += verifyHFile(topOut);
568    assertEquals(1000, rowCount);
569  }
570
571  @Test
572  public void testSplitStoreFileWithNoneToNone() throws IOException {
573    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
574  }
575
576  @Test
577  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
578    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
579  }
580
581  @Test
582  public void testSplitStoreFileWithEncodedToNone() throws IOException {
583    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
584  }
585
586  @Test
587  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
588    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
589  }
590
591  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
592    DataBlockEncoding cfEncoding) throws IOException {
593    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
594    FileSystem fs = util.getTestFileSystem();
595    Path testIn = new Path(dir, "testhfile");
596    ColumnFamilyDescriptor familyDesc =
597      ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
598    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
599      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
600
601    Path bottomOut = new Path(dir, "bottom.out");
602    Path topOut = new Path(dir, "top.out");
603
604    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
605      Bytes.toBytes("ggg"), bottomOut, topOut);
606
607    int rowCount = verifyHFile(bottomOut);
608    rowCount += verifyHFile(topOut);
609    assertEquals(1000, rowCount);
610  }
611
612  private int verifyHFile(Path p) throws IOException {
613    Configuration conf = util.getConfiguration();
614    HFile.Reader reader =
615      HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
616    HFileScanner scanner = reader.getScanner(conf, false, false);
617    scanner.seekTo();
618    int count = 0;
619    do {
620      count++;
621    } while (scanner.next());
622    assertTrue(count > 0);
623    reader.close();
624    return count;
625  }
626
627  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
628    Integer value = map.containsKey(first) ? map.get(first) : 0;
629    map.put(first, value + 1);
630
631    value = map.containsKey(last) ? map.get(last) : 0;
632    map.put(last, value - 1);
633  }
634
635  @Test
636  public void testInferBoundaries() {
637    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
638
639    /*
640     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
641     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
642     * u---------x The output should be (m,r,u)
643     */
644
645    String first;
646    String last;
647
648    first = "a";
649    last = "e";
650    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
651
652    first = "r";
653    last = "s";
654    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
655
656    first = "o";
657    last = "p";
658    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
659
660    first = "g";
661    last = "k";
662    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
663
664    first = "v";
665    last = "x";
666    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
667
668    first = "c";
669    last = "i";
670    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
671
672    first = "m";
673    last = "q";
674    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
675
676    first = "s";
677    last = "t";
678    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
679
680    first = "u";
681    last = "w";
682    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
683
684    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
685    byte[][] compare = new byte[3][];
686    compare[0] = "m".getBytes();
687    compare[1] = "r".getBytes();
688    compare[2] = "u".getBytes();
689
690    assertEquals(3, keysArray.length);
691
692    for (int row = 0; row < keysArray.length; row++) {
693      assertArrayEquals(keysArray[row], compare[row]);
694    }
695  }
696
697  @Test
698  public void testLoadTooMayHFiles() throws Exception {
699    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
700    FileSystem fs = util.getTestFileSystem();
701    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
702    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
703
704    byte[] from = Bytes.toBytes("begin");
705    byte[] to = Bytes.toBytes("end");
706    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
707      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
708        FAMILY, QUALIFIER, from, to, 1000);
709    }
710
711    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
712    String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
713    try {
714      loader.run(args);
715      fail("Bulk loading too many files should fail");
716    } catch (IOException ie) {
717      assertTrue(ie.getMessage()
718        .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
719    }
720  }
721
722  @Test(expected = TableNotFoundException.class)
723  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
724    Configuration conf = util.getConfiguration();
725    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
726    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
727    String[] args = { "directory", "nonExistingTable" };
728    loader.run(args);
729  }
730
731  @Test
732  public void testTableWithCFNameStartWithUnderScore() throws Exception {
733    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
734    FileSystem fs = util.getTestFileSystem();
735    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
736    String family = "_cf";
737    Path familyDir = new Path(dir, family);
738
739    byte[] from = Bytes.toBytes("begin");
740    byte[] to = Bytes.toBytes("end");
741    Configuration conf = util.getConfiguration();
742    String tableName = tn.getMethodName();
743    Table table = util.createTable(TableName.valueOf(tableName), family);
744    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
745      QUALIFIER, from, to, 1000);
746
747    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
748    String[] args = { dir.toString(), tableName };
749    try {
750      loader.run(args);
751      assertEquals(1000, util.countRows(table));
752    } finally {
753      if (null != table) {
754        table.close();
755      }
756    }
757  }
758
759  @Test
760  public void testBulkLoadByFamily() throws Exception {
761    Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily");
762    FileSystem fs = util.getTestFileSystem();
763    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
764    String tableName = tn.getMethodName();
765    String[] families = { "cf1", "cf2", "cf3" };
766    for (int i = 0; i < families.length; i++) {
767      byte[] from = Bytes.toBytes(i + "begin");
768      byte[] to = Bytes.toBytes(i + "end");
769      Path familyDir = new Path(dir, families[i]);
770      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"),
771        Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000);
772    }
773    Table table = util.createTable(TableName.valueOf(tableName), families);
774    final AtomicInteger attmptedCalls = new AtomicInteger();
775    util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true);
776    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
777      @Override
778      protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, TableName tableName,
779        final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) throws IOException {
780        attmptedCalls.incrementAndGet();
781        return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile);
782      }
783    };
784
785    String[] args = { dir.toString(), tableName };
786    try {
787      loader.run(args);
788      assertEquals(families.length, attmptedCalls.get());
789      assertEquals(1000 * families.length, util.countRows(table));
790    } finally {
791      if (null != table) {
792        table.close();
793      }
794      util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false);
795    }
796  }
797
798  @Test
799  public void testFailIfNeedSplitHFile() throws IOException {
800    TableName tableName = TableName.valueOf(tn.getMethodName());
801    Table table = util.createTable(tableName, FAMILY);
802
803    util.loadTable(table, FAMILY);
804
805    FileSystem fs = util.getTestFileSystem();
806    Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file"));
807    HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER,
808      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
809
810    util.getAdmin().split(tableName);
811    util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1);
812
813    Configuration config = new Configuration(util.getConfiguration());
814    config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true);
815    BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config);
816
817    String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() };
818    assertThrows(IOException.class, () -> tool.run(args));
819    util.getHBaseCluster().getRegions(tableName)
820      .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size()));
821  }
822}