001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.junit.Assert.assertArrayEquals; 024import static org.junit.Assert.assertEquals; 025import static org.junit.Assert.assertThrows; 026import static org.junit.Assert.assertTrue; 027import static org.junit.Assert.fail; 028 029import java.io.IOException; 030import java.net.InetAddress; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.TreeMap; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseClassTestRule; 046import org.apache.hadoop.hbase.HBaseTestingUtil; 047import org.apache.hadoop.hbase.HConstants; 048import org.apache.hadoop.hbase.HRegionLocation; 049import org.apache.hadoop.hbase.NamespaceDescriptor; 050import org.apache.hadoop.hbase.TableName; 051import org.apache.hadoop.hbase.TableNotFoundException; 052import org.apache.hadoop.hbase.client.AsyncClusterConnection; 053import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 055import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; 060import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 061import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 062import org.apache.hadoop.hbase.io.hfile.CacheConfig; 063import org.apache.hadoop.hbase.io.hfile.HFile; 064import org.apache.hadoop.hbase.io.hfile.HFileScanner; 065import org.apache.hadoop.hbase.regionserver.BloomType; 066import org.apache.hadoop.hbase.testclassification.LargeTests; 067import org.apache.hadoop.hbase.testclassification.MiscTests; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.CommonFSUtils; 070import org.apache.hadoop.hbase.util.FutureUtils; 071import org.apache.hadoop.hbase.util.HFileTestUtil; 072import org.apache.hadoop.hdfs.DistributedFileSystem; 073import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 074import org.apache.hadoop.hdfs.protocol.LocatedBlock; 075import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 076import org.hamcrest.MatcherAssert; 077import org.junit.AfterClass; 078import org.junit.BeforeClass; 079import org.junit.ClassRule; 080import org.junit.Rule; 081import org.junit.Test; 082import org.junit.experimental.categories.Category; 083import org.junit.rules.TestName; 084 085import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 086 087/** 088 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 089 * faster than the full MR cluster tests in TestHFileOutputFormat 090 */ 091@Category({ MiscTests.class, LargeTests.class }) 092public class TestBulkLoadHFiles { 093 094 @ClassRule 095 public static final HBaseClassTestRule CLASS_RULE = 096 HBaseClassTestRule.forClass(TestBulkLoadHFiles.class); 097 098 @Rule 099 public TestName tn = new TestName(); 100 101 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 102 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 103 private static final String NAMESPACE = "bulkNS"; 104 105 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 106 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 107 108 private static final byte[][] SPLIT_KEYS = 109 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 110 111 static HBaseTestingUtil util = new HBaseTestingUtil(); 112 113 @BeforeClass 114 public static void setUpBeforeClass() throws Exception { 115 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 116 util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY, 117 MAX_FILES_PER_REGION_PER_FAMILY); 118 // change default behavior so that tag values are returned with normal rpcs 119 util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, 120 KeyValueCodecWithTags.class.getCanonicalName()); 121 util.startMiniCluster(); 122 123 setupNamespace(); 124 } 125 126 protected static void setupNamespace() throws Exception { 127 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 128 } 129 130 @AfterClass 131 public static void tearDownAfterClass() throws Exception { 132 util.shutdownMiniCluster(); 133 } 134 135 @Test 136 public void testSimpleLoadWithMap() throws Exception { 137 runTest("testSimpleLoadWithMap", BloomType.NONE, 138 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 139 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 140 true); 141 } 142 143 /** 144 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 145 */ 146 @Test 147 public void testSimpleLoad() throws Exception { 148 runTest("testSimpleLoad", BloomType.NONE, 149 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 150 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 151 } 152 153 @Test 154 public void testSimpleLoadWithFileCopy() throws Exception { 155 String testName = tn.getMethodName(); 156 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 157 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null, 158 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 159 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 160 false, true, 2); 161 } 162 163 /** 164 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 165 */ 166 @Test 167 public void testRegionCrossingLoad() throws Exception { 168 runTest("testRegionCrossingLoad", BloomType.NONE, 169 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 170 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 171 } 172 173 /** 174 * Test loading into a column family that has a ROW bloom filter. 175 */ 176 @Test 177 public void testRegionCrossingRowBloom() throws Exception { 178 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 179 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 180 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 181 } 182 183 /** 184 * Test loading into a column family that has a ROWCOL bloom filter. 185 */ 186 @Test 187 public void testRegionCrossingRowColBloom() throws Exception { 188 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 189 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 190 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 191 } 192 193 /** 194 * Test case that creates some regions and loads HFiles that have different region boundaries than 195 * the table pre-split. 196 */ 197 @Test 198 public void testSimpleHFileSplit() throws Exception { 199 runTest("testHFileSplit", BloomType.NONE, 200 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 201 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 202 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 203 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 204 } 205 206 /** 207 * Test case that creates some regions and loads HFiles that cross the boundaries and have 208 * different region boundaries than the table pre-split. 209 */ 210 @Test 211 public void testRegionCrossingHFileSplit() throws Exception { 212 testRegionCrossingHFileSplit(BloomType.NONE); 213 } 214 215 /** 216 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 217 * filter and a different region boundaries than the table pre-split. 218 */ 219 @Test 220 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 221 testRegionCrossingHFileSplit(BloomType.ROW); 222 } 223 224 /** 225 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 226 * bloom filter and a different region boundaries than the table pre-split. 227 */ 228 @Test 229 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 230 testRegionCrossingHFileSplit(BloomType.ROWCOL); 231 } 232 233 @Test 234 public void testSplitALot() throws Exception { 235 runTest("testSplitALot", BloomType.NONE, 236 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 237 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 238 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 239 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 240 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 241 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 242 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 243 } 244 245 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 246 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 247 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 248 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 249 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 250 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 251 } 252 253 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 254 return TableDescriptorBuilder.newBuilder(tableName) 255 .setColumnFamily( 256 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 257 .build(); 258 } 259 260 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 261 throws Exception { 262 runTest(testName, bloomType, null, hfileRanges); 263 } 264 265 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 266 throws Exception { 267 runTest(testName, bloomType, null, hfileRanges, useMap); 268 } 269 270 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 271 byte[][][] hfileRanges) throws Exception { 272 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 273 } 274 275 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 276 byte[][][] hfileRanges, boolean useMap) throws Exception { 277 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 278 final boolean preCreateTable = tableSplitKeys != null; 279 280 // Run the test bulkloading the table to the default namespace 281 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 282 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 283 useMap, 2); 284 285 /* 286 * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory 287 * -- regionDir -- familyDir -- storeFileDir 288 */ 289 if (preCreateTable) { 290 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false, 291 3); 292 } 293 294 // Run the test bulkloading the table to the specified namespace 295 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 296 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, 297 2); 298 } 299 300 private void runTest(String testName, TableName tableName, BloomType bloomType, 301 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 302 int depth) throws Exception { 303 TableDescriptor htd = buildHTD(tableName, bloomType); 304 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 305 } 306 307 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, 308 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 309 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 310 int factor, int depth) throws Exception { 311 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 312 FileSystem fs = util.getTestFileSystem(); 313 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 314 Path parentDir = baseDirectory; 315 if (depth == 3) { 316 assert !useMap; 317 parentDir = new Path(baseDirectory, "someRegion"); 318 } 319 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 320 321 int hfileIdx = 0; 322 Map<byte[], List<Path>> map = null; 323 List<Path> list = null; 324 if (useMap || copyFiles) { 325 list = new ArrayList<>(); 326 } 327 if (useMap) { 328 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 329 map.put(fam, list); 330 } 331 Path last = null; 332 for (byte[][] range : hfileRanges) { 333 byte[] from = range[0]; 334 byte[] to = range[1]; 335 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 336 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 337 if (useMap) { 338 last = path; 339 list.add(path); 340 } 341 } 342 int expectedRows = hfileIdx * factor; 343 344 TableName tableName = htd.getTableName(); 345 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 346 if (tableSplitKeys != null) { 347 util.getAdmin().createTable(htd, tableSplitKeys); 348 } else { 349 util.getAdmin().createTable(htd); 350 } 351 } 352 353 Configuration conf = util.getConfiguration(); 354 if (copyFiles) { 355 conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true); 356 } 357 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 358 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 359 if (depth == 3) { 360 args.add("-loadTable"); 361 } 362 363 if (useMap) { 364 if (deleteFile) { 365 fs.delete(last, true); 366 } 367 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 368 if (deleteFile) { 369 expectedRows -= 1000; 370 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 371 if (item.getFilePath().getName().equals(last.getName())) { 372 fail(last + " should be missing"); 373 } 374 } 375 } 376 } else { 377 loader.run(args.toArray(new String[] {})); 378 } 379 380 if (copyFiles) { 381 for (Path p : list) { 382 assertTrue(p + " should exist", fs.exists(p)); 383 } 384 } 385 386 try (Table table = util.getConnection().getTable(tableName)) { 387 assertEquals(initRowCount + expectedRows, countRows(table)); 388 } 389 390 return expectedRows; 391 } 392 393 private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, 394 byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth) 395 throws Exception { 396 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 397 useMap, true, copyFiles, 0, 1000, depth); 398 399 final TableName tableName = htd.getTableName(); 400 // verify staging folder has been cleaned up 401 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 402 HConstants.BULKLOAD_STAGING_DIR_NAME); 403 FileSystem fs = util.getTestFileSystem(); 404 if (fs.exists(stagingBasePath)) { 405 FileStatus[] files = fs.listStatus(stagingBasePath); 406 for (FileStatus file : files) { 407 assertTrue("Folder=" + file.getPath() + " is not cleaned up.", 408 file.getPath().getName() != "DONOTERASE"); 409 } 410 } 411 412 util.deleteTable(tableName); 413 } 414 415 /** 416 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 417 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 418 * responses. 419 */ 420 @Test 421 public void testTagsSurviveBulkLoadSplit() throws Exception { 422 Path dir = util.getDataTestDirOnTestFS(tn.getMethodName()); 423 FileSystem fs = util.getTestFileSystem(); 424 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 425 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 426 // table has these split points 427 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 428 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 429 430 // creating an hfile that has values that span the split points. 431 byte[] from = Bytes.toBytes("ddd"); 432 byte[] to = Bytes.toBytes("ooo"); 433 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 434 new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 435 int expectedRows = 1000; 436 437 TableName tableName = TableName.valueOf(tn.getMethodName()); 438 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 439 util.getAdmin().createTable(htd, tableSplitKeys); 440 441 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir); 442 443 Table table = util.getConnection().getTable(tableName); 444 try { 445 assertEquals(expectedRows, countRows(table)); 446 HFileTestUtil.verifyTags(table); 447 } finally { 448 table.close(); 449 } 450 451 util.deleteTable(tableName); 452 } 453 454 /** 455 * Test loading into a column family that does not exist. 456 */ 457 @Test 458 public void testNonexistentColumnFamilyLoad() throws Exception { 459 String testName = tn.getMethodName(); 460 byte[][][] hFileRanges = 461 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 462 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 463 464 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 465 // set real family name to upper case in purpose to simulate the case that 466 // family name in HFiles is invalid 467 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 468 .setColumnFamily(ColumnFamilyDescriptorBuilder 469 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 470 .build(); 471 472 try { 473 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 474 assertTrue("Loading into table with non-existent family should have failed", false); 475 } catch (Exception e) { 476 assertTrue("IOException expected", e instanceof IOException); 477 // further check whether the exception message is correct 478 String errMsg = e.getMessage(); 479 assertTrue( 480 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY 481 + "], current message: [" + errMsg + "]", 482 errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY)); 483 } 484 } 485 486 @Test 487 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 488 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 489 } 490 491 @Test 492 public void testNonHfileFolder() throws Exception { 493 testNonHfileFolder("testNonHfileFolder", false); 494 } 495 496 /** 497 * Write a random data file and a non-file in a dir with a valid family name but not part of the 498 * table families. we should we able to bulkload without getting the unmatched family exception. 499 * HBASE-13037/HBASE-13227 500 */ 501 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 502 Path dir = util.getDataTestDirOnTestFS(tableName); 503 FileSystem fs = util.getTestFileSystem(); 504 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 505 506 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 507 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 508 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 509 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 510 511 final String NON_FAMILY_FOLDER = "_logs"; 512 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 513 fs.mkdirs(nonFamilyDir); 514 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 515 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 516 517 Table table = null; 518 try { 519 if (preCreateTable) { 520 table = util.createTable(TableName.valueOf(tableName), FAMILY); 521 } else { 522 table = util.getConnection().getTable(TableName.valueOf(tableName)); 523 } 524 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir); 525 assertEquals(500, countRows(table)); 526 } finally { 527 if (table != null) { 528 table.close(); 529 } 530 fs.delete(dir, true); 531 } 532 } 533 534 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 535 FSDataOutputStream stream = fs.create(path); 536 try { 537 byte[] data = new byte[1024]; 538 for (int i = 0; i < data.length; ++i) { 539 data[i] = (byte) (i & 0xff); 540 } 541 while (size >= data.length) { 542 stream.write(data, 0, data.length); 543 size -= data.length; 544 } 545 if (size > 0) { 546 stream.write(data, 0, size); 547 } 548 } finally { 549 stream.close(); 550 } 551 } 552 553 @Test 554 public void testSplitStoreFile() throws IOException { 555 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 556 FileSystem fs = util.getTestFileSystem(); 557 Path testIn = new Path(dir, "testhfile"); 558 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 559 String tableName = tn.getMethodName(); 560 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 561 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 562 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 563 564 Path bottomOut = new Path(dir, "bottom.out"); 565 Path topOut = new Path(dir, "top.out"); 566 567 BulkLoadHFilesTool.splitStoreFile( 568 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 569 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 570 571 int rowCount = verifyHFile(bottomOut); 572 rowCount += verifyHFile(topOut); 573 assertEquals(1000, rowCount); 574 } 575 576 /** 577 * Test hfile splits with the favored nodes 578 */ 579 @Test 580 public void testSplitStoreFileWithFavoriteNodes() throws IOException { 581 582 Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); 583 FileSystem fs = util.getDFSCluster().getFileSystem(); 584 585 Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); 586 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 587 String tableName = tn.getMethodName(); 588 Table table = util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 589 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 590 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 591 592 Path bottomOut = new Path(dir, "bottom.out"); 593 Path topOut = new Path(dir, "top.out"); 594 595 final AsyncTableRegionLocator regionLocator = 596 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)); 597 BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc, 598 Bytes.toBytes("ggg"), bottomOut, topOut); 599 verifyHFileFavoriteNode(topOut, regionLocator, fs); 600 verifyHFileFavoriteNode(bottomOut, regionLocator, fs); 601 int rowCount = verifyHFile(bottomOut); 602 rowCount += verifyHFile(topOut); 603 assertEquals(1000, rowCount); 604 } 605 606 @Test 607 public void testSplitStoreFileWithCreateTimeTS() throws IOException { 608 Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); 609 FileSystem fs = util.getTestFileSystem(); 610 Path testIn = new Path(dir, "testhfile"); 611 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 612 String tableName = tn.getMethodName(); 613 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 614 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 615 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 616 617 Path bottomOut = new Path(dir, "bottom.out"); 618 Path topOut = new Path(dir, "top.out"); 619 620 BulkLoadHFilesTool.splitStoreFile( 621 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 622 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 623 624 verifyHFileCreateTimeTS(bottomOut); 625 verifyHFileCreateTimeTS(topOut); 626 } 627 628 @Test 629 public void testSplitStoreFileWithNoneToNone() throws IOException { 630 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE); 631 } 632 633 @Test 634 public void testSplitStoreFileWithEncodedToEncoded() throws IOException { 635 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF); 636 } 637 638 @Test 639 public void testSplitStoreFileWithEncodedToNone() throws IOException { 640 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE); 641 } 642 643 @Test 644 public void testSplitStoreFileWithNoneToEncoded() throws IOException { 645 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF); 646 } 647 648 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 649 DataBlockEncoding cfEncoding) throws IOException { 650 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 651 FileSystem fs = util.getTestFileSystem(); 652 Path testIn = new Path(dir, "testhfile"); 653 ColumnFamilyDescriptor familyDesc = 654 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 655 String tableName = tn.getMethodName(); 656 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 657 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 658 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 659 660 Path bottomOut = new Path(dir, "bottom.out"); 661 Path topOut = new Path(dir, "top.out"); 662 663 BulkLoadHFilesTool.splitStoreFile( 664 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 665 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 666 667 int rowCount = verifyHFile(bottomOut); 668 rowCount += verifyHFile(topOut); 669 assertEquals(1000, rowCount); 670 } 671 672 private int verifyHFile(Path p) throws IOException { 673 Configuration conf = util.getConfiguration(); 674 HFile.Reader reader = 675 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 676 HFileScanner scanner = reader.getScanner(conf, false, false); 677 scanner.seekTo(); 678 int count = 0; 679 do { 680 count++; 681 } while (scanner.next()); 682 assertTrue(count > 0); 683 reader.close(); 684 return count; 685 } 686 687 private void verifyHFileCreateTimeTS(Path p) throws IOException { 688 Configuration conf = util.getConfiguration(); 689 690 try (HFile.Reader reader = 691 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) { 692 long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime(); 693 MatcherAssert.assertThat(fileCreateTime, greaterThan(0L)); 694 } 695 } 696 697 /** 698 * test split storefile with favorite node information 699 */ 700 private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs) 701 throws IOException { 702 Configuration conf = util.getConfiguration(); 703 704 try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) { 705 706 final byte[] firstRowkey = reader.getFirstRowKey().get(); 707 final HRegionLocation hRegionLocation = 708 FutureUtils.get(regionLocator.getRegionLocation(firstRowkey)); 709 710 final String targetHostName = hRegionLocation.getHostname(); 711 712 if (fs instanceof DistributedFileSystem) { 713 String pathStr = p.toUri().getPath(); 714 LocatedBlocks blocks = 715 ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L); 716 717 boolean isFavoriteNode = false; 718 List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks(); 719 int index = 0; 720 do { 721 if (index > 0) { 722 assertTrue("failed use favored nodes", isFavoriteNode); 723 } 724 isFavoriteNode = false; 725 final LocatedBlock block = locatedBlocks.get(index); 726 727 final DatanodeInfo[] locations = getLocatedBlockLocations(block); 728 for (DatanodeInfo location : locations) { 729 730 final String hostName = location.getHostName(); 731 if ( 732 targetHostName.equals(hostName.equals("127.0.0.1") 733 ? InetAddress.getLocalHost().getHostName() 734 : "127.0.0.1") || targetHostName.equals(hostName) 735 ) { 736 isFavoriteNode = true; 737 break; 738 } 739 } 740 741 index++; 742 } while (index < locatedBlocks.size()); 743 if (index > 0) { 744 assertTrue("failed use favored nodes", isFavoriteNode); 745 } 746 747 } 748 749 } 750 } 751 752 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 753 Integer value = map.containsKey(first) ? map.get(first) : 0; 754 map.put(first, value + 1); 755 756 value = map.containsKey(last) ? map.get(last) : 0; 757 map.put(last, value - 1); 758 } 759 760 @Test 761 public void testInferBoundaries() { 762 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 763 764 /* 765 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 766 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 767 * u---------x The output should be (m,r,u) 768 */ 769 770 String first; 771 String last; 772 773 first = "a"; 774 last = "e"; 775 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 776 777 first = "r"; 778 last = "s"; 779 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 780 781 first = "o"; 782 last = "p"; 783 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 784 785 first = "g"; 786 last = "k"; 787 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 788 789 first = "v"; 790 last = "x"; 791 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 792 793 first = "c"; 794 last = "i"; 795 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 796 797 first = "m"; 798 last = "q"; 799 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 800 801 first = "s"; 802 last = "t"; 803 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 804 805 first = "u"; 806 last = "w"; 807 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 808 809 byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map); 810 byte[][] compare = new byte[3][]; 811 compare[0] = Bytes.toBytes("m"); 812 compare[1] = Bytes.toBytes("r"); 813 compare[2] = Bytes.toBytes("u"); 814 815 assertEquals(3, keysArray.length); 816 817 for (int row = 0; row < keysArray.length; row++) { 818 assertArrayEquals(keysArray[row], compare[row]); 819 } 820 } 821 822 @Test 823 public void testLoadTooMayHFiles() throws Exception { 824 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 825 FileSystem fs = util.getTestFileSystem(); 826 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 827 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 828 829 byte[] from = Bytes.toBytes("begin"); 830 byte[] to = Bytes.toBytes("end"); 831 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 832 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 833 FAMILY, QUALIFIER, from, to, 1000); 834 } 835 836 try { 837 BulkLoadHFiles.create(util.getConfiguration()) 838 .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir); 839 fail("Bulk loading too many files should fail"); 840 } catch (IOException ie) { 841 assertTrue(ie.getMessage() 842 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 843 } 844 } 845 846 @Test(expected = TableNotFoundException.class) 847 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 848 Configuration conf = util.getConfiguration(); 849 conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no"); 850 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 851 String[] args = { "directory", "nonExistingTable" }; 852 loader.run(args); 853 } 854 855 @Test 856 public void testTableWithCFNameStartWithUnderScore() throws Exception { 857 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 858 FileSystem fs = util.getTestFileSystem(); 859 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 860 String family = "_cf"; 861 Path familyDir = new Path(dir, family); 862 863 byte[] from = Bytes.toBytes("begin"); 864 byte[] to = Bytes.toBytes("end"); 865 Configuration conf = util.getConfiguration(); 866 String tableName = tn.getMethodName(); 867 try (Table table = util.createTable(TableName.valueOf(tableName), family)) { 868 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 869 QUALIFIER, from, to, 1000); 870 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir); 871 assertEquals(1000, countRows(table)); 872 } 873 } 874 875 @Test 876 public void testBulkLoadByFamily() throws Exception { 877 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 878 FileSystem fs = util.getTestFileSystem(); 879 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 880 String tableName = tn.getMethodName(); 881 String[] families = { "cf1", "cf2", "cf3" }; 882 for (int i = 0; i < families.length; i++) { 883 byte[] from = Bytes.toBytes(i + "begin"); 884 byte[] to = Bytes.toBytes(i + "end"); 885 Path familyDir = new Path(dir, families[i]); 886 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 887 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 888 } 889 Table table = util.createTable(TableName.valueOf(tableName), families); 890 final AtomicInteger attmptedCalls = new AtomicInteger(); 891 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); 892 BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) { 893 @Override 894 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 895 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 896 final byte[] first, Collection<LoadQueueItem> lqis) { 897 attmptedCalls.incrementAndGet(); 898 return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis); 899 } 900 }; 901 try { 902 loader.bulkLoad(table.getName(), dir); 903 assertEquals(families.length, attmptedCalls.get()); 904 assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table)); 905 } finally { 906 if (null != table) { 907 table.close(); 908 } 909 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); 910 } 911 } 912 913 @Test 914 public void testFailIfNeedSplitHFile() throws IOException { 915 TableName tableName = TableName.valueOf(tn.getMethodName()); 916 Table table = util.createTable(tableName, FAMILY); 917 918 util.loadTable(table, FAMILY); 919 920 FileSystem fs = util.getTestFileSystem(); 921 Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file")); 922 HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER, 923 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 924 925 util.getAdmin().split(tableName); 926 util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1); 927 928 Configuration config = new Configuration(util.getConfiguration()); 929 config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true); 930 BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config); 931 932 String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() }; 933 assertThrows(IOException.class, () -> tool.run(args)); 934 util.getHBaseCluster().getRegions(tableName) 935 .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size())); 936 } 937}