001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static org.junit.Assert.assertArrayEquals;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.List;
029import java.util.Locale;
030import java.util.Map;
031import java.util.TreeMap;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataOutputStream;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.NamespaceDescriptor;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.TableNotFoundException;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableDescriptor;
047import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
048import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
049import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
050import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
051import org.apache.hadoop.hbase.io.hfile.CacheConfig;
052import org.apache.hadoop.hbase.io.hfile.HFile;
053import org.apache.hadoop.hbase.io.hfile.HFileScanner;
054import org.apache.hadoop.hbase.regionserver.BloomType;
055import org.apache.hadoop.hbase.testclassification.LargeTests;
056import org.apache.hadoop.hbase.testclassification.MiscTests;
057import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.FSUtils;
060import org.apache.hadoop.hbase.util.HFileTestUtil;
061import org.junit.AfterClass;
062import org.junit.BeforeClass;
063import org.junit.ClassRule;
064import org.junit.Rule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.junit.rules.TestName;
068
069import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
070
071/**
072 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
073 * faster than the full MR cluster tests in TestHFileOutputFormat
074 */
075@Category({ MiscTests.class, LargeTests.class })
076public class TestLoadIncrementalHFiles {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class);
081
082  @Rule
083  public TestName tn = new TestName();
084
085  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
086  private static final byte[] FAMILY = Bytes.toBytes("myfam");
087  private static final String NAMESPACE = "bulkNS";
088
089  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
090  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
091
092  private static final byte[][] SPLIT_KEYS =
093      new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
094
095  static HBaseTestingUtility util = new HBaseTestingUtility();
096
097  @BeforeClass
098  public static void setUpBeforeClass() throws Exception {
099    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
100    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
101      MAX_FILES_PER_REGION_PER_FAMILY);
102    // change default behavior so that tag values are returned with normal rpcs
103    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
104      KeyValueCodecWithTags.class.getCanonicalName());
105    util.startMiniCluster();
106
107    setupNamespace();
108  }
109
110  protected static void setupNamespace() throws Exception {
111    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
112  }
113
114  @AfterClass
115  public static void tearDownAfterClass() throws Exception {
116    util.shutdownMiniCluster();
117  }
118
119  @Test
120  public void testSimpleLoadWithMap() throws Exception {
121    runTest("testSimpleLoadWithMap", BloomType.NONE,
122      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
123          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
124      true);
125  }
126
127  /**
128   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
129   */
130  @Test
131  public void testSimpleLoad() throws Exception {
132    runTest("testSimpleLoad", BloomType.NONE,
133      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
134          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
135  }
136
137  @Test
138  public void testSimpleLoadWithFileCopy() throws Exception {
139    String testName = tn.getMethodName();
140    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
141    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE),
142        false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
143          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
144      false, true, 2);
145  }
146
147  /**
148   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
149   */
150  @Test
151  public void testRegionCrossingLoad() throws Exception {
152    runTest("testRegionCrossingLoad", BloomType.NONE,
153      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
154          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
155  }
156
157  /**
158   * Test loading into a column family that has a ROW bloom filter.
159   */
160  @Test
161  public void testRegionCrossingRowBloom() throws Exception {
162    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
163      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
164          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
165  }
166
167  /**
168   * Test loading into a column family that has a ROWCOL bloom filter.
169   */
170  @Test
171  public void testRegionCrossingRowColBloom() throws Exception {
172    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
173      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
174          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
175  }
176
177  /**
178   * Test case that creates some regions and loads HFiles that have different region boundaries than
179   * the table pre-split.
180   */
181  @Test
182  public void testSimpleHFileSplit() throws Exception {
183    runTest("testHFileSplit", BloomType.NONE,
184      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
185          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
186      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
187          new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
188  }
189
190  /**
191   * Test case that creates some regions and loads HFiles that cross the boundaries and have
192   * different region boundaries than the table pre-split.
193   */
194  @Test
195  public void testRegionCrossingHFileSplit() throws Exception {
196    testRegionCrossingHFileSplit(BloomType.NONE);
197  }
198
199  /**
200   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
201   * filter and a different region boundaries than the table pre-split.
202   */
203  @Test
204  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
205    testRegionCrossingHFileSplit(BloomType.ROW);
206  }
207
208  /**
209   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
210   * bloom filter and a different region boundaries than the table pre-split.
211   */
212  @Test
213  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
214    testRegionCrossingHFileSplit(BloomType.ROWCOL);
215  }
216
217  @Test
218  public void testSplitALot() throws Exception {
219    runTest("testSplitALot", BloomType.NONE,
220      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
221          Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
222          Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
223          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
224          Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
225          Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
226      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
227  }
228
229  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
230    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
231      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
232          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
233      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
234          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
235  }
236
237  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
238    return TableDescriptorBuilder.newBuilder(tableName)
239        .setColumnFamily(
240          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
241        .build();
242  }
243
244  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
245      throws Exception {
246    runTest(testName, bloomType, null, hfileRanges);
247  }
248
249  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
250      throws Exception {
251    runTest(testName, bloomType, null, hfileRanges, useMap);
252  }
253
254  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
255      byte[][][] hfileRanges) throws Exception {
256    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
257  }
258
259  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
260      byte[][][] hfileRanges, boolean useMap) throws Exception {
261    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
262    final boolean preCreateTable = tableSplitKeys != null;
263
264    // Run the test bulkloading the table to the default namespace
265    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
266    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
267      useMap, 2);
268
269
270    /* Run the test bulkloading the table from a depth of 3
271      directory structure is now
272      baseDirectory
273          -- regionDir
274            -- familyDir
275              -- storeFileDir
276    */
277    if (preCreateTable) {
278      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges,
279          false, 3);
280    }
281
282    // Run the test bulkloading the table to the specified namespace
283    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
284    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
285      useMap, 2);
286  }
287
288  private void runTest(String testName, TableName tableName, BloomType bloomType,
289      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges,
290      boolean useMap, int depth) throws Exception {
291    TableDescriptor htd = buildHTD(tableName, bloomType);
292    runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
293  }
294
295  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
296      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
297      byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
298      int initRowCount, int factor) throws Exception {
299    return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
300        useMap, deleteFile, copyFiles, initRowCount, factor, 2);
301  }
302
303  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
304      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
305      byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
306      int initRowCount, int factor, int depth) throws Exception {
307    Path baseDirectory = util.getDataTestDirOnTestFS(testName);
308    FileSystem fs = util.getTestFileSystem();
309    baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
310    Path parentDir = baseDirectory;
311    if (depth == 3) {
312      assert !useMap;
313      parentDir = new Path(baseDirectory, "someRegion");
314    }
315    Path familyDir = new Path(parentDir, Bytes.toString(fam));
316
317    int hfileIdx = 0;
318    Map<byte[], List<Path>> map = null;
319    List<Path> list = null;
320    if (useMap || copyFiles) {
321      list = new ArrayList<>();
322    }
323    if (useMap) {
324      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
325      map.put(fam, list);
326    }
327    Path last = null;
328    for (byte[][] range : hfileRanges) {
329      byte[] from = range[0];
330      byte[] to = range[1];
331      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
332      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
333      if (useMap) {
334        last = path;
335        list.add(path);
336      }
337    }
338    int expectedRows = hfileIdx * factor;
339
340    TableName tableName = htd.getTableName();
341    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
342      util.getAdmin().createTable(htd, tableSplitKeys);
343    }
344
345    Configuration conf = util.getConfiguration();
346    if (copyFiles) {
347      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
348    }
349    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
350    List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
351    if (depth == 3) {
352      args.add("-loadTable");
353    }
354
355    if (useMap) {
356      if (deleteFile) {
357        fs.delete(last, true);
358      }
359      Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
360      if (deleteFile) {
361        expectedRows -= 1000;
362        for (LoadQueueItem item : loaded.keySet()) {
363          if (item.getFilePath().getName().equals(last.getName())) {
364            fail(last + " should be missing");
365          }
366        }
367      }
368    } else {
369      loader.run(args.toArray(new String[]{}));
370    }
371
372    if (copyFiles) {
373      for (Path p : list) {
374        assertTrue(p + " should exist", fs.exists(p));
375      }
376    }
377
378    Table table = util.getConnection().getTable(tableName);
379    try {
380      assertEquals(initRowCount + expectedRows, util.countRows(table));
381    } finally {
382      table.close();
383    }
384
385    return expectedRows;
386  }
387
388  private void runTest(String testName, TableDescriptor htd,
389      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
390      boolean copyFiles, int depth) throws Exception {
391    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
392      useMap, true, copyFiles, 0, 1000, depth);
393
394    final TableName tableName = htd.getTableName();
395    // verify staging folder has been cleaned up
396    Path stagingBasePath =
397        new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
398    FileSystem fs = util.getTestFileSystem();
399    if (fs.exists(stagingBasePath)) {
400      FileStatus[] files = fs.listStatus(stagingBasePath);
401      for (FileStatus file : files) {
402        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
403          file.getPath().getName() != "DONOTERASE");
404      }
405    }
406
407    util.deleteTable(tableName);
408  }
409
410  /**
411   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
412   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
413   * responses.
414   */
415  @Test
416  public void testTagsSurviveBulkLoadSplit() throws Exception {
417    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
418    FileSystem fs = util.getTestFileSystem();
419    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
420    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
421    // table has these split points
422    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
423        Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
424
425    // creating an hfile that has values that span the split points.
426    byte[] from = Bytes.toBytes("ddd");
427    byte[] to = Bytes.toBytes("ooo");
428    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
429      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
430    int expectedRows = 1000;
431
432    TableName tableName = TableName.valueOf(tn.getMethodName());
433    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
434    util.getAdmin().createTable(htd, tableSplitKeys);
435
436    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
437    String[] args = { dir.toString(), tableName.toString() };
438    loader.run(args);
439
440    Table table = util.getConnection().getTable(tableName);
441    try {
442      assertEquals(expectedRows, util.countRows(table));
443      HFileTestUtil.verifyTags(table);
444    } finally {
445      table.close();
446    }
447
448    util.deleteTable(tableName);
449  }
450
451  /**
452   * Test loading into a column family that does not exist.
453   */
454  @Test
455  public void testNonexistentColumnFamilyLoad() throws Exception {
456    String testName = tn.getMethodName();
457    byte[][][] hFileRanges =
458        new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
459            new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
460
461    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
462    // set real family name to upper case in purpose to simulate the case that
463    // family name in HFiles is invalid
464    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
465        .setColumnFamily(ColumnFamilyDescriptorBuilder
466            .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
467        .build();
468
469    try {
470      runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
471      assertTrue("Loading into table with non-existent family should have failed", false);
472    } catch (Exception e) {
473      assertTrue("IOException expected", e instanceof IOException);
474      // further check whether the exception message is correct
475      String errMsg = e.getMessage();
476      assertTrue(
477        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
478            "], current message: [" + errMsg + "]",
479        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
480    }
481  }
482
483  @Test
484  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
485    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
486  }
487
488  @Test
489  public void testNonHfileFolder() throws Exception {
490    testNonHfileFolder("testNonHfileFolder", false);
491  }
492
493  /**
494   * Write a random data file and a non-file in a dir with a valid family name but not part of the
495   * table families. we should we able to bulkload without getting the unmatched family exception.
496   * HBASE-13037/HBASE-13227
497   */
498  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
499    Path dir = util.getDataTestDirOnTestFS(tableName);
500    FileSystem fs = util.getTestFileSystem();
501    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
502
503    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
504    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
505      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
506    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
507
508    final String NON_FAMILY_FOLDER = "_logs";
509    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
510    fs.mkdirs(nonFamilyDir);
511    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
512    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
513
514    Table table = null;
515    try {
516      if (preCreateTable) {
517        table = util.createTable(TableName.valueOf(tableName), FAMILY);
518      } else {
519        table = util.getConnection().getTable(TableName.valueOf(tableName));
520      }
521
522      final String[] args = { dir.toString(), tableName };
523      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
524      assertEquals(500, util.countRows(table));
525    } finally {
526      if (table != null) {
527        table.close();
528      }
529      fs.delete(dir, true);
530    }
531  }
532
533  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
534    FSDataOutputStream stream = fs.create(path);
535    try {
536      byte[] data = new byte[1024];
537      for (int i = 0; i < data.length; ++i) {
538        data[i] = (byte) (i & 0xff);
539      }
540      while (size >= data.length) {
541        stream.write(data, 0, data.length);
542        size -= data.length;
543      }
544      if (size > 0) {
545        stream.write(data, 0, size);
546      }
547    } finally {
548      stream.close();
549    }
550  }
551
552  @Test
553  public void testSplitStoreFile() throws IOException {
554    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
555    FileSystem fs = util.getTestFileSystem();
556    Path testIn = new Path(dir, "testhfile");
557    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
558    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
559      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
560
561    Path bottomOut = new Path(dir, "bottom.out");
562    Path topOut = new Path(dir, "top.out");
563
564    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
565      Bytes.toBytes("ggg"), bottomOut, topOut);
566
567    int rowCount = verifyHFile(bottomOut);
568    rowCount += verifyHFile(topOut);
569    assertEquals(1000, rowCount);
570  }
571
572  @Test
573  public void testSplitStoreFileWithNoneToNone() throws IOException {
574    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
575  }
576
577  @Test
578  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
579    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
580  }
581
582  @Test
583  public void testSplitStoreFileWithEncodedToNone() throws IOException {
584    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
585  }
586
587  @Test
588  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
589    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
590  }
591
592  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
593      DataBlockEncoding cfEncoding) throws IOException {
594    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
595    FileSystem fs = util.getTestFileSystem();
596    Path testIn = new Path(dir, "testhfile");
597    ColumnFamilyDescriptor familyDesc =
598        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
599    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
600      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
601
602    Path bottomOut = new Path(dir, "bottom.out");
603    Path topOut = new Path(dir, "top.out");
604
605    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
606      Bytes.toBytes("ggg"), bottomOut, topOut);
607
608    int rowCount = verifyHFile(bottomOut);
609    rowCount += verifyHFile(topOut);
610    assertEquals(1000, rowCount);
611  }
612
613  private int verifyHFile(Path p) throws IOException {
614    Configuration conf = util.getConfiguration();
615    HFile.Reader reader =
616        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
617    reader.loadFileInfo();
618    HFileScanner scanner = reader.getScanner(false, false);
619    scanner.seekTo();
620    int count = 0;
621    do {
622      count++;
623    } while (scanner.next());
624    assertTrue(count > 0);
625    reader.close();
626    return count;
627  }
628
629  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
630    Integer value = map.containsKey(first) ? map.get(first) : 0;
631    map.put(first, value + 1);
632
633    value = map.containsKey(last) ? map.get(last) : 0;
634    map.put(last, value - 1);
635  }
636
637  @Test
638  public void testInferBoundaries() {
639    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
640
641    /*
642     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
643     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
644     * u---------x The output should be (m,r,u)
645     */
646
647    String first;
648    String last;
649
650    first = "a";
651    last = "e";
652    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
653
654    first = "r";
655    last = "s";
656    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
657
658    first = "o";
659    last = "p";
660    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
661
662    first = "g";
663    last = "k";
664    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
665
666    first = "v";
667    last = "x";
668    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
669
670    first = "c";
671    last = "i";
672    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
673
674    first = "m";
675    last = "q";
676    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
677
678    first = "s";
679    last = "t";
680    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
681
682    first = "u";
683    last = "w";
684    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
685
686    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
687    byte[][] compare = new byte[3][];
688    compare[0] = "m".getBytes();
689    compare[1] = "r".getBytes();
690    compare[2] = "u".getBytes();
691
692    assertEquals(3, keysArray.length);
693
694    for (int row = 0; row < keysArray.length; row++) {
695      assertArrayEquals(keysArray[row], compare[row]);
696    }
697  }
698
699  @Test
700  public void testLoadTooMayHFiles() throws Exception {
701    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
702    FileSystem fs = util.getTestFileSystem();
703    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
704    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
705
706    byte[] from = Bytes.toBytes("begin");
707    byte[] to = Bytes.toBytes("end");
708    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
709      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
710        FAMILY, QUALIFIER, from, to, 1000);
711    }
712
713    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
714    String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
715    try {
716      loader.run(args);
717      fail("Bulk loading too many files should fail");
718    } catch (IOException ie) {
719      assertTrue(ie.getMessage()
720          .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
721    }
722  }
723
724  @Test(expected = TableNotFoundException.class)
725  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
726    Configuration conf = util.getConfiguration();
727    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
728    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
729    String[] args = { "directory", "nonExistingTable" };
730    loader.run(args);
731  }
732
733  @Test
734  public void testTableWithCFNameStartWithUnderScore() throws Exception {
735    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
736    FileSystem fs = util.getTestFileSystem();
737    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
738    String family = "_cf";
739    Path familyDir = new Path(dir, family);
740
741    byte[] from = Bytes.toBytes("begin");
742    byte[] to = Bytes.toBytes("end");
743    Configuration conf = util.getConfiguration();
744    String tableName = tn.getMethodName();
745    Table table = util.createTable(TableName.valueOf(tableName), family);
746    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
747      QUALIFIER, from, to, 1000);
748
749    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
750    String[] args = { dir.toString(), tableName };
751    try {
752      loader.run(args);
753      assertEquals(1000, util.countRows(table));
754    } finally {
755      if (null != table) {
756        table.close();
757      }
758    }
759  }
760}