001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertTrue; 023import static org.junit.Assert.fail; 024import java.io.IOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayList; 027import java.util.Collection; 028import java.util.List; 029import java.util.Locale; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.atomic.AtomicInteger; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataOutputStream; 035import org.apache.hadoop.fs.FileStatus; 036import org.apache.hadoop.fs.FileSystem; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.hbase.HBaseClassTestRule; 039import org.apache.hadoop.hbase.HBaseTestingUtility; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.NamespaceDescriptor; 042import org.apache.hadoop.hbase.TableName; 043import org.apache.hadoop.hbase.TableNotFoundException; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 051import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 052import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 053import org.apache.hadoop.hbase.io.hfile.CacheConfig; 054import org.apache.hadoop.hbase.io.hfile.HFile; 055import org.apache.hadoop.hbase.io.hfile.HFileScanner; 056import org.apache.hadoop.hbase.regionserver.BloomType; 057import org.apache.hadoop.hbase.testclassification.LargeTests; 058import org.apache.hadoop.hbase.testclassification.MiscTests; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.CommonFSUtils; 061import org.apache.hadoop.hbase.util.HFileTestUtil; 062import org.junit.AfterClass; 063import org.junit.BeforeClass; 064import org.junit.ClassRule; 065import org.junit.Rule; 066import org.junit.Test; 067import org.junit.experimental.categories.Category; 068import org.junit.rules.TestName; 069import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 070 071/** 072 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 073 * faster than the full MR cluster tests in TestHFileOutputFormat 074 */ 075@Category({ MiscTests.class, LargeTests.class }) 076public class TestLoadIncrementalHFiles { 077 078 @ClassRule 079 public static final HBaseClassTestRule CLASS_RULE = 080 HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class); 081 082 @Rule 083 public TestName tn = new TestName(); 084 085 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 086 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 087 private static final String NAMESPACE = "bulkNS"; 088 089 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 090 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 091 092 private static final byte[][] SPLIT_KEYS = 093 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 094 095 static HBaseTestingUtility util = new HBaseTestingUtility(); 096 097 @BeforeClass 098 public static void setUpBeforeClass() throws Exception { 099 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 100 util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 101 MAX_FILES_PER_REGION_PER_FAMILY); 102 // change default behavior so that tag values are returned with normal rpcs 103 util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 104 KeyValueCodecWithTags.class.getCanonicalName()); 105 util.startMiniCluster(); 106 107 setupNamespace(); 108 } 109 110 protected static void setupNamespace() throws Exception { 111 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 112 } 113 114 @AfterClass 115 public static void tearDownAfterClass() throws Exception { 116 util.shutdownMiniCluster(); 117 } 118 119 @Test 120 public void testSimpleLoadWithMap() throws Exception { 121 runTest("testSimpleLoadWithMap", BloomType.NONE, 122 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 123 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 124 true); 125 } 126 127 /** 128 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 129 */ 130 @Test 131 public void testSimpleLoad() throws Exception { 132 runTest("testSimpleLoad", BloomType.NONE, 133 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 134 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 135 } 136 137 @Test 138 public void testSimpleLoadWithFileCopy() throws Exception { 139 String testName = tn.getMethodName(); 140 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 141 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), 142 false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 143 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 144 false, true, 2); 145 } 146 147 /** 148 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 149 */ 150 @Test 151 public void testRegionCrossingLoad() throws Exception { 152 runTest("testRegionCrossingLoad", BloomType.NONE, 153 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 154 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 155 } 156 157 /** 158 * Test loading into a column family that has a ROW bloom filter. 159 */ 160 @Test 161 public void testRegionCrossingRowBloom() throws Exception { 162 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 163 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 164 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 165 } 166 167 /** 168 * Test loading into a column family that has a ROWCOL bloom filter. 169 */ 170 @Test 171 public void testRegionCrossingRowColBloom() throws Exception { 172 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 173 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 174 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 175 } 176 177 /** 178 * Test case that creates some regions and loads HFiles that have different region boundaries than 179 * the table pre-split. 180 */ 181 @Test 182 public void testSimpleHFileSplit() throws Exception { 183 runTest("testHFileSplit", BloomType.NONE, 184 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 185 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 186 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 187 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 188 } 189 190 /** 191 * Test case that creates some regions and loads HFiles that cross the boundaries and have 192 * different region boundaries than the table pre-split. 193 */ 194 @Test 195 public void testRegionCrossingHFileSplit() throws Exception { 196 testRegionCrossingHFileSplit(BloomType.NONE); 197 } 198 199 /** 200 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 201 * filter and a different region boundaries than the table pre-split. 202 */ 203 @Test 204 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 205 testRegionCrossingHFileSplit(BloomType.ROW); 206 } 207 208 /** 209 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 210 * bloom filter and a different region boundaries than the table pre-split. 211 */ 212 @Test 213 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 214 testRegionCrossingHFileSplit(BloomType.ROWCOL); 215 } 216 217 @Test 218 public void testSplitALot() throws Exception { 219 runTest("testSplitALot", BloomType.NONE, 220 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 221 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 222 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 223 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 224 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 225 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 226 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 227 } 228 229 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 230 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 231 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 232 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 233 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 234 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 235 } 236 237 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 238 return TableDescriptorBuilder.newBuilder(tableName) 239 .setColumnFamily( 240 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 241 .build(); 242 } 243 244 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 245 throws Exception { 246 runTest(testName, bloomType, null, hfileRanges); 247 } 248 249 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 250 throws Exception { 251 runTest(testName, bloomType, null, hfileRanges, useMap); 252 } 253 254 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 255 byte[][][] hfileRanges) throws Exception { 256 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 257 } 258 259 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 260 byte[][][] hfileRanges, boolean useMap) throws Exception { 261 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 262 final boolean preCreateTable = tableSplitKeys != null; 263 264 // Run the test bulkloading the table to the default namespace 265 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 266 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 267 useMap, 2); 268 269 270 /* Run the test bulkloading the table from a depth of 3 271 directory structure is now 272 baseDirectory 273 -- regionDir 274 -- familyDir 275 -- storeFileDir 276 */ 277 if (preCreateTable) { 278 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, 279 false, 3); 280 } 281 282 // Run the test bulkloading the table to the specified namespace 283 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 284 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 285 useMap, 2); 286 } 287 288 private void runTest(String testName, TableName tableName, BloomType bloomType, 289 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, 290 boolean useMap, int depth) throws Exception { 291 TableDescriptor htd = buildHTD(tableName, bloomType); 292 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 293 } 294 295 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 296 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 297 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, 298 int initRowCount, int factor) throws Exception { 299 return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, 300 useMap, deleteFile, copyFiles, initRowCount, factor, 2); 301 } 302 303 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 304 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 305 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, 306 int initRowCount, int factor, int depth) throws Exception { 307 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 308 FileSystem fs = util.getTestFileSystem(); 309 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 310 Path parentDir = baseDirectory; 311 if (depth == 3) { 312 assert !useMap; 313 parentDir = new Path(baseDirectory, "someRegion"); 314 } 315 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 316 317 int hfileIdx = 0; 318 Map<byte[], List<Path>> map = null; 319 List<Path> list = null; 320 if (useMap || copyFiles) { 321 list = new ArrayList<>(); 322 } 323 if (useMap) { 324 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 325 map.put(fam, list); 326 } 327 Path last = null; 328 for (byte[][] range : hfileRanges) { 329 byte[] from = range[0]; 330 byte[] to = range[1]; 331 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 332 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 333 if (useMap) { 334 last = path; 335 list.add(path); 336 } 337 } 338 int expectedRows = hfileIdx * factor; 339 340 TableName tableName = htd.getTableName(); 341 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 342 util.getAdmin().createTable(htd, tableSplitKeys); 343 } 344 345 Configuration conf = util.getConfiguration(); 346 if (copyFiles) { 347 conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); 348 } 349 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 350 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 351 if (depth == 3) { 352 args.add("-loadTable"); 353 } 354 355 if (useMap) { 356 if (deleteFile) { 357 fs.delete(last, true); 358 } 359 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 360 if (deleteFile) { 361 expectedRows -= 1000; 362 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 363 if (item.getFilePath().getName().equals(last.getName())) { 364 fail(last + " should be missing"); 365 } 366 } 367 } 368 } else { 369 loader.run(args.toArray(new String[] {})); 370 } 371 372 if (copyFiles) { 373 for (Path p : list) { 374 assertTrue(p + " should exist", fs.exists(p)); 375 } 376 } 377 378 Table table = util.getConnection().getTable(tableName); 379 try { 380 assertEquals(initRowCount + expectedRows, util.countRows(table)); 381 } finally { 382 table.close(); 383 } 384 385 return expectedRows; 386 } 387 388 private void runTest(String testName, TableDescriptor htd, 389 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 390 boolean copyFiles, int depth) throws Exception { 391 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 392 useMap, true, copyFiles, 0, 1000, depth); 393 394 final TableName tableName = htd.getTableName(); 395 // verify staging folder has been cleaned up 396 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 397 HConstants.BULKLOAD_STAGING_DIR_NAME); 398 FileSystem fs = util.getTestFileSystem(); 399 if (fs.exists(stagingBasePath)) { 400 FileStatus[] files = fs.listStatus(stagingBasePath); 401 for (FileStatus file : files) { 402 assertTrue("Folder=" + file.getPath() + " is not cleaned up.", 403 file.getPath().getName() != "DONOTERASE"); 404 } 405 } 406 407 util.deleteTable(tableName); 408 } 409 410 /** 411 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 412 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 413 * responses. 414 */ 415 @Test 416 public void testTagsSurviveBulkLoadSplit() throws Exception { 417 Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); 418 FileSystem fs = util.getTestFileSystem(); 419 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 420 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 421 // table has these split points 422 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 423 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 424 425 // creating an hfile that has values that span the split points. 426 byte[] from = Bytes.toBytes("ddd"); 427 byte[] to = Bytes.toBytes("ooo"); 428 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 429 new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 430 int expectedRows = 1000; 431 432 TableName tableName = TableName.valueOf(tn.getMethodName()); 433 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 434 util.getAdmin().createTable(htd, tableSplitKeys); 435 436 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 437 String[] args = { dir.toString(), tableName.toString() }; 438 loader.run(args); 439 440 Table table = util.getConnection().getTable(tableName); 441 try { 442 assertEquals(expectedRows, util.countRows(table)); 443 HFileTestUtil.verifyTags(table); 444 } finally { 445 table.close(); 446 } 447 448 util.deleteTable(tableName); 449 } 450 451 /** 452 * Test loading into a column family that does not exist. 453 */ 454 @Test 455 public void testNonexistentColumnFamilyLoad() throws Exception { 456 String testName = tn.getMethodName(); 457 byte[][][] hFileRanges = 458 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 459 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 460 461 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 462 // set real family name to upper case in purpose to simulate the case that 463 // family name in HFiles is invalid 464 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 465 .setColumnFamily(ColumnFamilyDescriptorBuilder 466 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 467 .build(); 468 469 try { 470 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 471 assertTrue("Loading into table with non-existent family should have failed", false); 472 } catch (Exception e) { 473 assertTrue("IOException expected", e instanceof IOException); 474 // further check whether the exception message is correct 475 String errMsg = e.getMessage(); 476 assertTrue( 477 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + 478 "], current message: [" + errMsg + "]", 479 errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); 480 } 481 } 482 483 @Test 484 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 485 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 486 } 487 488 @Test 489 public void testNonHfileFolder() throws Exception { 490 testNonHfileFolder("testNonHfileFolder", false); 491 } 492 493 /** 494 * Write a random data file and a non-file in a dir with a valid family name but not part of the 495 * table families. we should we able to bulkload without getting the unmatched family exception. 496 * HBASE-13037/HBASE-13227 497 */ 498 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 499 Path dir = util.getDataTestDirOnTestFS(tableName); 500 FileSystem fs = util.getTestFileSystem(); 501 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 502 503 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 504 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 505 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 506 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 507 508 final String NON_FAMILY_FOLDER = "_logs"; 509 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 510 fs.mkdirs(nonFamilyDir); 511 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 512 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 513 514 Table table = null; 515 try { 516 if (preCreateTable) { 517 table = util.createTable(TableName.valueOf(tableName), FAMILY); 518 } else { 519 table = util.getConnection().getTable(TableName.valueOf(tableName)); 520 } 521 522 final String[] args = { dir.toString(), tableName }; 523 new LoadIncrementalHFiles(util.getConfiguration()).run(args); 524 assertEquals(500, util.countRows(table)); 525 } finally { 526 if (table != null) { 527 table.close(); 528 } 529 fs.delete(dir, true); 530 } 531 } 532 533 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 534 FSDataOutputStream stream = fs.create(path); 535 try { 536 byte[] data = new byte[1024]; 537 for (int i = 0; i < data.length; ++i) { 538 data[i] = (byte) (i & 0xff); 539 } 540 while (size >= data.length) { 541 stream.write(data, 0, data.length); 542 size -= data.length; 543 } 544 if (size > 0) { 545 stream.write(data, 0, size); 546 } 547 } finally { 548 stream.close(); 549 } 550 } 551 552 @Test 553 public void testSplitStoreFile() throws IOException { 554 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 555 FileSystem fs = util.getTestFileSystem(); 556 Path testIn = new Path(dir, "testhfile"); 557 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 558 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 559 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 560 561 Path bottomOut = new Path(dir, "bottom.out"); 562 Path topOut = new Path(dir, "top.out"); 563 564 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 565 Bytes.toBytes("ggg"), bottomOut, topOut); 566 567 int rowCount = verifyHFile(bottomOut); 568 rowCount += verifyHFile(topOut); 569 assertEquals(1000, rowCount); 570 } 571 572 @Test 573 public void testSplitStoreFileWithNoneToNone() throws IOException { 574 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); 575 } 576 577 @Test 578 public void testSplitStoreFileWithEncodedToEncoded() throws IOException { 579 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); 580 } 581 582 @Test 583 public void testSplitStoreFileWithEncodedToNone() throws IOException { 584 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); 585 } 586 587 @Test 588 public void testSplitStoreFileWithNoneToEncoded() throws IOException { 589 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); 590 } 591 592 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 593 DataBlockEncoding cfEncoding) throws IOException { 594 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 595 FileSystem fs = util.getTestFileSystem(); 596 Path testIn = new Path(dir, "testhfile"); 597 ColumnFamilyDescriptor familyDesc = 598 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 599 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 600 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 601 602 Path bottomOut = new Path(dir, "bottom.out"); 603 Path topOut = new Path(dir, "top.out"); 604 605 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 606 Bytes.toBytes("ggg"), bottomOut, topOut); 607 608 int rowCount = verifyHFile(bottomOut); 609 rowCount += verifyHFile(topOut); 610 assertEquals(1000, rowCount); 611 } 612 613 private int verifyHFile(Path p) throws IOException { 614 Configuration conf = util.getConfiguration(); 615 HFile.Reader reader = 616 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 617 HFileScanner scanner = reader.getScanner(false, false); 618 scanner.seekTo(); 619 int count = 0; 620 do { 621 count++; 622 } while (scanner.next()); 623 assertTrue(count > 0); 624 reader.close(); 625 return count; 626 } 627 628 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 629 Integer value = map.containsKey(first) ? map.get(first) : 0; 630 map.put(first, value + 1); 631 632 value = map.containsKey(last) ? map.get(last) : 0; 633 map.put(last, value - 1); 634 } 635 636 @Test 637 public void testInferBoundaries() { 638 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 639 640 /* 641 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 642 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 643 * u---------x The output should be (m,r,u) 644 */ 645 646 String first; 647 String last; 648 649 first = "a"; 650 last = "e"; 651 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 652 653 first = "r"; 654 last = "s"; 655 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 656 657 first = "o"; 658 last = "p"; 659 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 660 661 first = "g"; 662 last = "k"; 663 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 664 665 first = "v"; 666 last = "x"; 667 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 668 669 first = "c"; 670 last = "i"; 671 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 672 673 first = "m"; 674 last = "q"; 675 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 676 677 first = "s"; 678 last = "t"; 679 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 680 681 first = "u"; 682 last = "w"; 683 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 684 685 byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); 686 byte[][] compare = new byte[3][]; 687 compare[0] = "m".getBytes(); 688 compare[1] = "r".getBytes(); 689 compare[2] = "u".getBytes(); 690 691 assertEquals(3, keysArray.length); 692 693 for (int row = 0; row < keysArray.length; row++) { 694 assertArrayEquals(keysArray[row], compare[row]); 695 } 696 } 697 698 @Test 699 public void testLoadTooMayHFiles() throws Exception { 700 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 701 FileSystem fs = util.getTestFileSystem(); 702 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 703 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 704 705 byte[] from = Bytes.toBytes("begin"); 706 byte[] to = Bytes.toBytes("end"); 707 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 708 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 709 FAMILY, QUALIFIER, from, to, 1000); 710 } 711 712 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 713 String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" }; 714 try { 715 loader.run(args); 716 fail("Bulk loading too many files should fail"); 717 } catch (IOException ie) { 718 assertTrue(ie.getMessage() 719 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 720 } 721 } 722 723 @Test(expected = TableNotFoundException.class) 724 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 725 Configuration conf = util.getConfiguration(); 726 conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 727 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 728 String[] args = { "directory", "nonExistingTable" }; 729 loader.run(args); 730 } 731 732 @Test 733 public void testTableWithCFNameStartWithUnderScore() throws Exception { 734 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 735 FileSystem fs = util.getTestFileSystem(); 736 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 737 String family = "_cf"; 738 Path familyDir = new Path(dir, family); 739 740 byte[] from = Bytes.toBytes("begin"); 741 byte[] to = Bytes.toBytes("end"); 742 Configuration conf = util.getConfiguration(); 743 String tableName = tn.getMethodName(); 744 Table table = util.createTable(TableName.valueOf(tableName), family); 745 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 746 QUALIFIER, from, to, 1000); 747 748 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 749 String[] args = { dir.toString(), tableName }; 750 try { 751 loader.run(args); 752 assertEquals(1000, util.countRows(table)); 753 } finally { 754 if (null != table) { 755 table.close(); 756 } 757 } 758 } 759 760 @Test 761 public void testBulkLoadByFamily() throws Exception { 762 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 763 FileSystem fs = util.getTestFileSystem(); 764 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 765 String tableName = tn.getMethodName(); 766 String[] families = { "cf1", "cf2", "cf3" }; 767 for (int i = 0; i < families.length; i++) { 768 byte[] from = Bytes.toBytes(i + "begin"); 769 byte[] to = Bytes.toBytes(i + "end"); 770 Path familyDir = new Path(dir, families[i]); 771 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 772 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 773 } 774 Table table = util.createTable(TableName.valueOf(tableName), families); 775 final AtomicInteger attmptedCalls = new AtomicInteger(); 776 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true); 777 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 778 @Override 779 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, TableName tableName, 780 final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) throws IOException { 781 attmptedCalls.incrementAndGet(); 782 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile); 783 } 784 }; 785 786 String[] args = { dir.toString(), tableName }; 787 try { 788 loader.run(args); 789 assertEquals(families.length, attmptedCalls.get()); 790 assertEquals(1000 * families.length, util.countRows(table)); 791 } finally { 792 if (null != table) { 793 table.close(); 794 } 795 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 796 } 797 } 798}