001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertThrows; 023import static org.junit.Assert.assertTrue; 024import static org.junit.Assert.fail; 025 026import java.io.IOException; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.Collection; 030import java.util.List; 031import java.util.Locale; 032import java.util.Map; 033import java.util.TreeMap; 034import java.util.concurrent.atomic.AtomicInteger; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.fs.FSDataOutputStream; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.Path; 040import org.apache.hadoop.hbase.HBaseClassTestRule; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.HConstants; 043import org.apache.hadoop.hbase.NamespaceDescriptor; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.TableNotFoundException; 046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.Connection; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 053import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 054import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 055import org.apache.hadoop.hbase.io.hfile.CacheConfig; 056import org.apache.hadoop.hbase.io.hfile.HFile; 057import org.apache.hadoop.hbase.io.hfile.HFileScanner; 058import org.apache.hadoop.hbase.regionserver.BloomType; 059import org.apache.hadoop.hbase.testclassification.LargeTests; 060import org.apache.hadoop.hbase.testclassification.MiscTests; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.HFileTestUtil; 064import org.junit.AfterClass; 065import org.junit.BeforeClass; 066import org.junit.ClassRule; 067import org.junit.Rule; 068import org.junit.Test; 069import org.junit.experimental.categories.Category; 070import org.junit.rules.TestName; 071 072import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 073 074/** 075 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 076 * faster than the full MR cluster tests in TestHFileOutputFormat 077 */ 078@Category({ MiscTests.class, LargeTests.class }) 079public class TestLoadIncrementalHFiles { 080 081 @ClassRule 082 public static final HBaseClassTestRule CLASS_RULE = 083 HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class); 084 085 @Rule 086 public TestName tn = new TestName(); 087 088 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 089 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 090 private static final String NAMESPACE = "bulkNS"; 091 092 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 093 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 094 095 private static final byte[][] SPLIT_KEYS = 096 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 097 098 static HBaseTestingUtility util = new HBaseTestingUtility(); 099 100 @BeforeClass 101 public static void setUpBeforeClass() throws Exception { 102 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 103 util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 104 MAX_FILES_PER_REGION_PER_FAMILY); 105 // change default behavior so that tag values are returned with normal rpcs 106 util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 107 KeyValueCodecWithTags.class.getCanonicalName()); 108 util.startMiniCluster(); 109 110 setupNamespace(); 111 } 112 113 protected static void setupNamespace() throws Exception { 114 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 115 } 116 117 @AfterClass 118 public static void tearDownAfterClass() throws Exception { 119 util.shutdownMiniCluster(); 120 } 121 122 @Test 123 public void testSimpleLoadWithMap() throws Exception { 124 runTest("testSimpleLoadWithMap", BloomType.NONE, 125 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 126 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 127 true); 128 } 129 130 /** 131 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 132 */ 133 @Test 134 public void testSimpleLoad() throws Exception { 135 runTest("testSimpleLoad", BloomType.NONE, 136 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 137 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 138 } 139 140 @Test 141 public void testSimpleLoadWithFileCopy() throws Exception { 142 String testName = tn.getMethodName(); 143 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 144 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null, 145 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 146 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 147 false, true, 2); 148 } 149 150 /** 151 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 152 */ 153 @Test 154 public void testRegionCrossingLoad() throws Exception { 155 runTest("testRegionCrossingLoad", BloomType.NONE, 156 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 157 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 158 } 159 160 /** 161 * Test loading into a column family that has a ROW bloom filter. 162 */ 163 @Test 164 public void testRegionCrossingRowBloom() throws Exception { 165 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 166 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 167 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 168 } 169 170 /** 171 * Test loading into a column family that has a ROWCOL bloom filter. 172 */ 173 @Test 174 public void testRegionCrossingRowColBloom() throws Exception { 175 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 176 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 177 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 178 } 179 180 /** 181 * Test case that creates some regions and loads HFiles that have different region boundaries than 182 * the table pre-split. 183 */ 184 @Test 185 public void testSimpleHFileSplit() throws Exception { 186 runTest("testHFileSplit", BloomType.NONE, 187 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 188 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 189 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 190 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 191 } 192 193 /** 194 * Test case that creates some regions and loads HFiles that cross the boundaries and have 195 * different region boundaries than the table pre-split. 196 */ 197 @Test 198 public void testRegionCrossingHFileSplit() throws Exception { 199 testRegionCrossingHFileSplit(BloomType.NONE); 200 } 201 202 /** 203 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 204 * filter and a different region boundaries than the table pre-split. 205 */ 206 @Test 207 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 208 testRegionCrossingHFileSplit(BloomType.ROW); 209 } 210 211 /** 212 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 213 * bloom filter and a different region boundaries than the table pre-split. 214 */ 215 @Test 216 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 217 testRegionCrossingHFileSplit(BloomType.ROWCOL); 218 } 219 220 @Test 221 public void testSplitALot() throws Exception { 222 runTest("testSplitALot", BloomType.NONE, 223 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 224 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 225 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 226 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 227 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 228 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 229 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 230 } 231 232 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 233 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 234 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 235 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 236 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 237 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 238 } 239 240 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 241 return TableDescriptorBuilder.newBuilder(tableName) 242 .setColumnFamily( 243 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 244 .build(); 245 } 246 247 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 248 throws Exception { 249 runTest(testName, bloomType, null, hfileRanges); 250 } 251 252 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 253 throws Exception { 254 runTest(testName, bloomType, null, hfileRanges, useMap); 255 } 256 257 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 258 byte[][][] hfileRanges) throws Exception { 259 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 260 } 261 262 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 263 byte[][][] hfileRanges, boolean useMap) throws Exception { 264 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 265 final boolean preCreateTable = tableSplitKeys != null; 266 267 // Run the test bulkloading the table to the default namespace 268 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 269 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 270 useMap, 2); 271 272 /* 273 * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory 274 * -- regionDir -- familyDir -- storeFileDir 275 */ 276 if (preCreateTable) { 277 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false, 278 3); 279 } 280 281 // Run the test bulkloading the table to the specified namespace 282 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 283 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, 284 2); 285 } 286 287 private void runTest(String testName, TableName tableName, BloomType bloomType, 288 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 289 int depth) throws Exception { 290 TableDescriptor htd = buildHTD(tableName, bloomType); 291 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 292 } 293 294 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 295 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 296 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 297 int factor) throws Exception { 298 return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, 299 useMap, deleteFile, copyFiles, initRowCount, factor, 2); 300 } 301 302 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, 303 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 304 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 305 int factor, int depth) throws Exception { 306 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 307 FileSystem fs = util.getTestFileSystem(); 308 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 309 Path parentDir = baseDirectory; 310 if (depth == 3) { 311 assert !useMap; 312 parentDir = new Path(baseDirectory, "someRegion"); 313 } 314 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 315 316 int hfileIdx = 0; 317 Map<byte[], List<Path>> map = null; 318 List<Path> list = null; 319 if (useMap || copyFiles) { 320 list = new ArrayList<>(); 321 } 322 if (useMap) { 323 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 324 map.put(fam, list); 325 } 326 Path last = null; 327 for (byte[][] range : hfileRanges) { 328 byte[] from = range[0]; 329 byte[] to = range[1]; 330 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 331 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 332 if (useMap) { 333 last = path; 334 list.add(path); 335 } 336 } 337 int expectedRows = hfileIdx * factor; 338 339 TableName tableName = htd.getTableName(); 340 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 341 util.getAdmin().createTable(htd, tableSplitKeys); 342 } 343 344 Configuration conf = util.getConfiguration(); 345 if (copyFiles) { 346 conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); 347 } 348 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 349 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 350 if (depth == 3) { 351 args.add("-loadTable"); 352 } 353 354 if (useMap) { 355 if (deleteFile) { 356 fs.delete(last, true); 357 } 358 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 359 if (deleteFile) { 360 expectedRows -= 1000; 361 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 362 if (item.getFilePath().getName().equals(last.getName())) { 363 fail(last + " should be missing"); 364 } 365 } 366 } 367 } else { 368 loader.run(args.toArray(new String[] {})); 369 } 370 371 if (copyFiles) { 372 for (Path p : list) { 373 assertTrue(p + " should exist", fs.exists(p)); 374 } 375 } 376 377 Table table = util.getConnection().getTable(tableName); 378 try { 379 assertEquals(initRowCount + expectedRows, util.countRows(table)); 380 } finally { 381 table.close(); 382 } 383 384 return expectedRows; 385 } 386 387 private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, 388 byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth) 389 throws Exception { 390 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 391 useMap, true, copyFiles, 0, 1000, depth); 392 393 final TableName tableName = htd.getTableName(); 394 // verify staging folder has been cleaned up 395 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 396 HConstants.BULKLOAD_STAGING_DIR_NAME); 397 FileSystem fs = util.getTestFileSystem(); 398 if (fs.exists(stagingBasePath)) { 399 FileStatus[] files = fs.listStatus(stagingBasePath); 400 for (FileStatus file : files) { 401 assertTrue("Folder=" + file.getPath() + " is not cleaned up.", 402 file.getPath().getName() != "DONOTERASE"); 403 } 404 } 405 406 util.deleteTable(tableName); 407 } 408 409 /** 410 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 411 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 412 * responses. 413 */ 414 @Test 415 public void testTagsSurviveBulkLoadSplit() throws Exception { 416 Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); 417 FileSystem fs = util.getTestFileSystem(); 418 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 419 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 420 // table has these split points 421 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 422 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 423 424 // creating an hfile that has values that span the split points. 425 byte[] from = Bytes.toBytes("ddd"); 426 byte[] to = Bytes.toBytes("ooo"); 427 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 428 new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 429 int expectedRows = 1000; 430 431 TableName tableName = TableName.valueOf(tn.getMethodName()); 432 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 433 util.getAdmin().createTable(htd, tableSplitKeys); 434 435 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 436 String[] args = { dir.toString(), tableName.toString() }; 437 loader.run(args); 438 439 Table table = util.getConnection().getTable(tableName); 440 try { 441 assertEquals(expectedRows, util.countRows(table)); 442 HFileTestUtil.verifyTags(table); 443 } finally { 444 table.close(); 445 } 446 447 util.deleteTable(tableName); 448 } 449 450 /** 451 * Test loading into a column family that does not exist. 452 */ 453 @Test 454 public void testNonexistentColumnFamilyLoad() throws Exception { 455 String testName = tn.getMethodName(); 456 byte[][][] hFileRanges = 457 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 458 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 459 460 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 461 // set real family name to upper case in purpose to simulate the case that 462 // family name in HFiles is invalid 463 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 464 .setColumnFamily(ColumnFamilyDescriptorBuilder 465 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 466 .build(); 467 468 try { 469 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 470 assertTrue("Loading into table with non-existent family should have failed", false); 471 } catch (Exception e) { 472 assertTrue("IOException expected", e instanceof IOException); 473 // further check whether the exception message is correct 474 String errMsg = e.getMessage(); 475 assertTrue( 476 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY 477 + "], current message: [" + errMsg + "]", 478 errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); 479 } 480 } 481 482 @Test 483 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 484 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 485 } 486 487 @Test 488 public void testNonHfileFolder() throws Exception { 489 testNonHfileFolder("testNonHfileFolder", false); 490 } 491 492 /** 493 * Write a random data file and a non-file in a dir with a valid family name but not part of the 494 * table families. we should we able to bulkload without getting the unmatched family exception. 495 * HBASE-13037/HBASE-13227 496 */ 497 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 498 Path dir = util.getDataTestDirOnTestFS(tableName); 499 FileSystem fs = util.getTestFileSystem(); 500 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 501 502 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 503 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 504 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 505 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 506 507 final String NON_FAMILY_FOLDER = "_logs"; 508 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 509 fs.mkdirs(nonFamilyDir); 510 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 511 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 512 513 Table table = null; 514 try { 515 if (preCreateTable) { 516 table = util.createTable(TableName.valueOf(tableName), FAMILY); 517 } else { 518 table = util.getConnection().getTable(TableName.valueOf(tableName)); 519 } 520 521 final String[] args = { dir.toString(), tableName }; 522 new LoadIncrementalHFiles(util.getConfiguration()).run(args); 523 assertEquals(500, util.countRows(table)); 524 } finally { 525 if (table != null) { 526 table.close(); 527 } 528 fs.delete(dir, true); 529 } 530 } 531 532 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 533 FSDataOutputStream stream = fs.create(path); 534 try { 535 byte[] data = new byte[1024]; 536 for (int i = 0; i < data.length; ++i) { 537 data[i] = (byte) (i & 0xff); 538 } 539 while (size >= data.length) { 540 stream.write(data, 0, data.length); 541 size -= data.length; 542 } 543 if (size > 0) { 544 stream.write(data, 0, size); 545 } 546 } finally { 547 stream.close(); 548 } 549 } 550 551 @Test 552 public void testSplitStoreFile() throws IOException { 553 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 554 FileSystem fs = util.getTestFileSystem(); 555 Path testIn = new Path(dir, "testhfile"); 556 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 557 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 558 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 559 560 Path bottomOut = new Path(dir, "bottom.out"); 561 Path topOut = new Path(dir, "top.out"); 562 563 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 564 Bytes.toBytes("ggg"), bottomOut, topOut); 565 566 int rowCount = verifyHFile(bottomOut); 567 rowCount += verifyHFile(topOut); 568 assertEquals(1000, rowCount); 569 } 570 571 @Test 572 public void testSplitStoreFileWithNoneToNone() throws IOException { 573 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); 574 } 575 576 @Test 577 public void testSplitStoreFileWithEncodedToEncoded() throws IOException { 578 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); 579 } 580 581 @Test 582 public void testSplitStoreFileWithEncodedToNone() throws IOException { 583 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); 584 } 585 586 @Test 587 public void testSplitStoreFileWithNoneToEncoded() throws IOException { 588 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); 589 } 590 591 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 592 DataBlockEncoding cfEncoding) throws IOException { 593 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 594 FileSystem fs = util.getTestFileSystem(); 595 Path testIn = new Path(dir, "testhfile"); 596 ColumnFamilyDescriptor familyDesc = 597 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 598 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 599 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 600 601 Path bottomOut = new Path(dir, "bottom.out"); 602 Path topOut = new Path(dir, "top.out"); 603 604 LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc, 605 Bytes.toBytes("ggg"), bottomOut, topOut); 606 607 int rowCount = verifyHFile(bottomOut); 608 rowCount += verifyHFile(topOut); 609 assertEquals(1000, rowCount); 610 } 611 612 private int verifyHFile(Path p) throws IOException { 613 Configuration conf = util.getConfiguration(); 614 HFile.Reader reader = 615 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 616 HFileScanner scanner = reader.getScanner(conf, false, false); 617 scanner.seekTo(); 618 int count = 0; 619 do { 620 count++; 621 } while (scanner.next()); 622 assertTrue(count > 0); 623 reader.close(); 624 return count; 625 } 626 627 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 628 Integer value = map.containsKey(first) ? map.get(first) : 0; 629 map.put(first, value + 1); 630 631 value = map.containsKey(last) ? map.get(last) : 0; 632 map.put(last, value - 1); 633 } 634 635 @Test 636 public void testInferBoundaries() { 637 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 638 639 /* 640 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 641 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 642 * u---------x The output should be (m,r,u) 643 */ 644 645 String first; 646 String last; 647 648 first = "a"; 649 last = "e"; 650 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 651 652 first = "r"; 653 last = "s"; 654 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 655 656 first = "o"; 657 last = "p"; 658 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 659 660 first = "g"; 661 last = "k"; 662 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 663 664 first = "v"; 665 last = "x"; 666 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 667 668 first = "c"; 669 last = "i"; 670 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 671 672 first = "m"; 673 last = "q"; 674 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 675 676 first = "s"; 677 last = "t"; 678 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 679 680 first = "u"; 681 last = "w"; 682 addStartEndKeysForTest(map, first.getBytes(), last.getBytes()); 683 684 byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map); 685 byte[][] compare = new byte[3][]; 686 compare[0] = "m".getBytes(); 687 compare[1] = "r".getBytes(); 688 compare[2] = "u".getBytes(); 689 690 assertEquals(3, keysArray.length); 691 692 for (int row = 0; row < keysArray.length; row++) { 693 assertArrayEquals(keysArray[row], compare[row]); 694 } 695 } 696 697 @Test 698 public void testLoadTooMayHFiles() throws Exception { 699 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 700 FileSystem fs = util.getTestFileSystem(); 701 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 702 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 703 704 byte[] from = Bytes.toBytes("begin"); 705 byte[] to = Bytes.toBytes("end"); 706 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 707 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 708 FAMILY, QUALIFIER, from, to, 1000); 709 } 710 711 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); 712 String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" }; 713 try { 714 loader.run(args); 715 fail("Bulk loading too many files should fail"); 716 } catch (IOException ie) { 717 assertTrue(ie.getMessage() 718 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 719 } 720 } 721 722 @Test(expected = TableNotFoundException.class) 723 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 724 Configuration conf = util.getConfiguration(); 725 conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 726 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 727 String[] args = { "directory", "nonExistingTable" }; 728 loader.run(args); 729 } 730 731 @Test 732 public void testTableWithCFNameStartWithUnderScore() throws Exception { 733 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 734 FileSystem fs = util.getTestFileSystem(); 735 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 736 String family = "_cf"; 737 Path familyDir = new Path(dir, family); 738 739 byte[] from = Bytes.toBytes("begin"); 740 byte[] to = Bytes.toBytes("end"); 741 Configuration conf = util.getConfiguration(); 742 String tableName = tn.getMethodName(); 743 Table table = util.createTable(TableName.valueOf(tableName), family); 744 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 745 QUALIFIER, from, to, 1000); 746 747 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 748 String[] args = { dir.toString(), tableName }; 749 try { 750 loader.run(args); 751 assertEquals(1000, util.countRows(table)); 752 } finally { 753 if (null != table) { 754 table.close(); 755 } 756 } 757 } 758 759 @Test 760 public void testBulkLoadByFamily() throws Exception { 761 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 762 FileSystem fs = util.getTestFileSystem(); 763 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 764 String tableName = tn.getMethodName(); 765 String[] families = { "cf1", "cf2", "cf3" }; 766 for (int i = 0; i < families.length; i++) { 767 byte[] from = Bytes.toBytes(i + "begin"); 768 byte[] to = Bytes.toBytes(i + "end"); 769 Path familyDir = new Path(dir, families[i]); 770 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 771 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 772 } 773 Table table = util.createTable(TableName.valueOf(tableName), families); 774 final AtomicInteger attmptedCalls = new AtomicInteger(); 775 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, true); 776 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 777 @Override 778 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, TableName tableName, 779 final byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) throws IOException { 780 attmptedCalls.incrementAndGet(); 781 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, copyFile); 782 } 783 }; 784 785 String[] args = { dir.toString(), tableName }; 786 try { 787 loader.run(args); 788 assertEquals(families.length, attmptedCalls.get()); 789 assertEquals(1000 * families.length, util.countRows(table)); 790 } finally { 791 if (null != table) { 792 table.close(); 793 } 794 util.getConfiguration().setBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 795 } 796 } 797 798 @Test 799 public void testFailIfNeedSplitHFile() throws IOException { 800 TableName tableName = TableName.valueOf(tn.getMethodName()); 801 Table table = util.createTable(tableName, FAMILY); 802 803 util.loadTable(table, FAMILY); 804 805 FileSystem fs = util.getTestFileSystem(); 806 Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file")); 807 HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER, 808 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 809 810 util.getAdmin().split(tableName); 811 util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1); 812 813 Configuration config = new Configuration(util.getConfiguration()); 814 config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true); 815 BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config); 816 817 String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() }; 818 assertThrows(IOException.class, () -> tool.run(args)); 819 util.getHBaseCluster().getRegions(tableName) 820 .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size())); 821 } 822}