001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows; 021import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations; 022import static org.hamcrest.Matchers.greaterThan; 023import static org.junit.jupiter.api.Assertions.assertArrayEquals; 024import static org.junit.jupiter.api.Assertions.assertEquals; 025import static org.junit.jupiter.api.Assertions.assertThrows; 026import static org.junit.jupiter.api.Assertions.assertTrue; 027import static org.junit.jupiter.api.Assertions.fail; 028 029import java.io.IOException; 030import java.net.InetAddress; 031import java.nio.ByteBuffer; 032import java.util.ArrayList; 033import java.util.Collection; 034import java.util.List; 035import java.util.Locale; 036import java.util.Map; 037import java.util.TreeMap; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.atomic.AtomicInteger; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FSDataOutputStream; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.hbase.HBaseTestingUtil; 046import org.apache.hadoop.hbase.HConstants; 047import org.apache.hadoop.hbase.HRegionLocation; 048import org.apache.hadoop.hbase.NamespaceDescriptor; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.TableNotFoundException; 051import org.apache.hadoop.hbase.client.AsyncClusterConnection; 052import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; 053import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 054import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 055import org.apache.hadoop.hbase.client.Table; 056import org.apache.hadoop.hbase.client.TableDescriptor; 057import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 058import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 059import org.apache.hadoop.hbase.io.hfile.CacheConfig; 060import org.apache.hadoop.hbase.io.hfile.HFile; 061import org.apache.hadoop.hbase.io.hfile.HFileScanner; 062import org.apache.hadoop.hbase.regionserver.BloomType; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.CommonFSUtils; 065import org.apache.hadoop.hbase.util.FutureUtils; 066import org.apache.hadoop.hbase.util.HFileTestUtil; 067import org.apache.hadoop.hdfs.DistributedFileSystem; 068import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 069import org.apache.hadoop.hdfs.protocol.LocatedBlock; 070import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 071import org.hamcrest.MatcherAssert; 072import org.junit.jupiter.api.AfterAll; 073import org.junit.jupiter.api.Test; 074import org.junit.jupiter.api.TestInfo; 075 076import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 077 078/** 079 * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run 080 * faster than the full MR cluster tests in TestHFileOutputFormat 081 */ 082public class BulkLoadHFilesTestBase { 083 084 private static final byte[] QUALIFIER = Bytes.toBytes("myqual"); 085 private static final byte[] FAMILY = Bytes.toBytes("myfam"); 086 private static final String NAMESPACE = "bulkNS"; 087 088 static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found"; 089 static final int MAX_FILES_PER_REGION_PER_FAMILY = 4; 090 091 private static final byte[][] SPLIT_KEYS = 092 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") }; 093 094 static HBaseTestingUtil util = new HBaseTestingUtil(); 095 096 protected static void setupNamespace() throws Exception { 097 util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build()); 098 } 099 100 @AfterAll 101 public static void tearDownAfterClass() throws Exception { 102 util.shutdownMiniCluster(); 103 } 104 105 @Test 106 public void testSimpleLoadWithMap() throws Exception { 107 runTest("testSimpleLoadWithMap", BloomType.NONE, 108 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 109 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 110 true); 111 } 112 113 /** 114 * Test case that creates some regions and loads HFiles that fit snugly inside those regions 115 */ 116 @Test 117 public void testSimpleLoad() throws Exception { 118 runTest("testSimpleLoad", BloomType.NONE, 119 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 120 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }); 121 } 122 123 @Test 124 public void testSimpleLoadWithFileCopy(TestInfo testInfo) throws Exception { 125 String testName = testInfo.getTestMethod().get().getName(); 126 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 127 runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null, 128 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, 129 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, 130 false, true, 2); 131 } 132 133 /** 134 * Test case that creates some regions and loads HFiles that cross the boundaries of those regions 135 */ 136 @Test 137 public void testRegionCrossingLoad() throws Exception { 138 runTest("testRegionCrossingLoad", BloomType.NONE, 139 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 140 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 141 } 142 143 /** 144 * Test loading into a column family that has a ROW bloom filter. 145 */ 146 @Test 147 public void testRegionCrossingRowBloom() throws Exception { 148 runTest("testRegionCrossingLoadRowBloom", BloomType.ROW, 149 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 150 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 151 } 152 153 /** 154 * Test loading into a column family that has a ROWCOL bloom filter. 155 */ 156 @Test 157 public void testRegionCrossingRowColBloom() throws Exception { 158 runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL, 159 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 160 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 161 } 162 163 /** 164 * Test case that creates some regions and loads HFiles that have different region boundaries than 165 * the table pre-split. 166 */ 167 @Test 168 public void testSimpleHFileSplit() throws Exception { 169 runTest("testHFileSplit", BloomType.NONE, 170 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 171 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 172 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, 173 new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, }); 174 } 175 176 /** 177 * Test case that creates some regions and loads HFiles that cross the boundaries and have 178 * different region boundaries than the table pre-split. 179 */ 180 @Test 181 public void testRegionCrossingHFileSplit() throws Exception { 182 testRegionCrossingHFileSplit(BloomType.NONE); 183 } 184 185 /** 186 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom 187 * filter and a different region boundaries than the table pre-split. 188 */ 189 @Test 190 public void testRegionCrossingHFileSplitRowBloom() throws Exception { 191 testRegionCrossingHFileSplit(BloomType.ROW); 192 } 193 194 /** 195 * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL 196 * bloom filter and a different region boundaries than the table pre-split. 197 */ 198 @Test 199 public void testRegionCrossingHFileSplitRowColBloom() throws Exception { 200 testRegionCrossingHFileSplit(BloomType.ROWCOL); 201 } 202 203 @Test 204 public void testSplitALot() throws Exception { 205 runTest("testSplitALot", BloomType.NONE, 206 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 207 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 208 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 209 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 210 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 211 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), }, 212 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, }); 213 } 214 215 private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { 216 runTest("testHFileSplit" + bloomType + "Bloom", bloomType, 217 new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"), 218 Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }, 219 new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, 220 new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, }); 221 } 222 223 private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) { 224 return TableDescriptorBuilder.newBuilder(tableName) 225 .setColumnFamily( 226 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build()) 227 .build(); 228 } 229 230 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges) 231 throws Exception { 232 runTest(testName, bloomType, null, hfileRanges); 233 } 234 235 private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap) 236 throws Exception { 237 runTest(testName, bloomType, null, hfileRanges, useMap); 238 } 239 240 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 241 byte[][][] hfileRanges) throws Exception { 242 runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); 243 } 244 245 private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, 246 byte[][][] hfileRanges, boolean useMap) throws Exception { 247 final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); 248 final boolean preCreateTable = tableSplitKeys != null; 249 250 // Run the test bulkloading the table to the default namespace 251 final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); 252 runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 253 useMap, 2); 254 255 /* 256 * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory 257 * -- regionDir -- familyDir -- storeFileDir 258 */ 259 if (preCreateTable) { 260 runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false, 261 3); 262 } 263 264 // Run the test bulkloading the table to the specified namespace 265 final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); 266 runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, 267 2); 268 } 269 270 private void runTest(String testName, TableName tableName, BloomType bloomType, 271 boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, 272 int depth) throws Exception { 273 TableDescriptor htd = buildHTD(tableName, bloomType); 274 runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); 275 } 276 277 public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtil util, 278 byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, 279 byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, 280 int factor, int depth) throws Exception { 281 Path baseDirectory = util.getDataTestDirOnTestFS(testName); 282 FileSystem fs = util.getTestFileSystem(); 283 baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 284 Path parentDir = baseDirectory; 285 if (depth == 3) { 286 assert !useMap; 287 parentDir = new Path(baseDirectory, "someRegion"); 288 } 289 Path familyDir = new Path(parentDir, Bytes.toString(fam)); 290 291 int hfileIdx = 0; 292 Map<byte[], List<Path>> map = null; 293 List<Path> list = null; 294 if (useMap || copyFiles) { 295 list = new ArrayList<>(); 296 } 297 if (useMap) { 298 map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 299 map.put(fam, list); 300 } 301 Path last = null; 302 for (byte[][] range : hfileRanges) { 303 byte[] from = range[0]; 304 byte[] to = range[1]; 305 Path path = new Path(familyDir, "hfile_" + hfileIdx++); 306 HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor); 307 if (useMap) { 308 last = path; 309 list.add(path); 310 } 311 } 312 int expectedRows = hfileIdx * factor; 313 314 TableName tableName = htd.getTableName(); 315 if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) { 316 if (tableSplitKeys != null) { 317 util.getAdmin().createTable(htd, tableSplitKeys); 318 } else { 319 util.getAdmin().createTable(htd); 320 } 321 } 322 323 Configuration conf = util.getConfiguration(); 324 if (copyFiles) { 325 conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true); 326 } 327 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 328 List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); 329 if (depth == 3) { 330 args.add("-loadTable"); 331 } 332 333 if (useMap) { 334 if (deleteFile) { 335 fs.delete(last, true); 336 } 337 Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map); 338 if (deleteFile) { 339 expectedRows -= 1000; 340 for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) { 341 if (item.getFilePath().getName().equals(last.getName())) { 342 fail(last + " should be missing"); 343 } 344 } 345 } 346 } else { 347 loader.run(args.toArray(new String[] {})); 348 } 349 350 if (copyFiles) { 351 for (Path p : list) { 352 assertTrue(fs.exists(p), p + " should exist"); 353 } 354 } 355 356 try (Table table = util.getConnection().getTable(tableName)) { 357 assertEquals(initRowCount + expectedRows, countRows(table)); 358 } 359 360 return expectedRows; 361 } 362 363 private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, 364 byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth) 365 throws Exception { 366 loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, 367 useMap, true, copyFiles, 0, 1000, depth); 368 369 final TableName tableName = htd.getTableName(); 370 // verify staging folder has been cleaned up 371 Path stagingBasePath = new Path(CommonFSUtils.getRootDir(util.getConfiguration()), 372 HConstants.BULKLOAD_STAGING_DIR_NAME); 373 FileSystem fs = util.getTestFileSystem(); 374 if (fs.exists(stagingBasePath)) { 375 FileStatus[] files = fs.listStatus(stagingBasePath); 376 for (FileStatus file : files) { 377 assertTrue(file.getPath().getName() != "DONOTERASE", 378 "Folder=" + file.getPath() + " is not cleaned up."); 379 } 380 } 381 382 util.deleteTable(tableName); 383 } 384 385 /** 386 * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the 387 * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the 388 * responses. 389 */ 390 @Test 391 public void testTagsSurviveBulkLoadSplit(TestInfo testInfo) throws Exception { 392 String name = testInfo.getTestMethod().get().getName(); 393 Path dir = util.getDataTestDirOnTestFS(name); 394 FileSystem fs = util.getTestFileSystem(); 395 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 396 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 397 // table has these split points 398 byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), 399 Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), }; 400 401 // creating an hfile that has values that span the split points. 402 byte[] from = Bytes.toBytes("ddd"); 403 byte[] to = Bytes.toBytes("ooo"); 404 HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs, 405 new Path(familyDir, name + "_hfile"), FAMILY, QUALIFIER, from, to, 1000); 406 int expectedRows = 1000; 407 408 TableName tableName = TableName.valueOf(name); 409 TableDescriptor htd = buildHTD(tableName, BloomType.NONE); 410 util.getAdmin().createTable(htd, tableSplitKeys); 411 412 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir); 413 414 Table table = util.getConnection().getTable(tableName); 415 try { 416 assertEquals(expectedRows, countRows(table)); 417 HFileTestUtil.verifyTags(table); 418 } finally { 419 table.close(); 420 } 421 422 util.deleteTable(tableName); 423 } 424 425 /** 426 * Test loading into a column family that does not exist. 427 */ 428 @Test 429 public void testNonexistentColumnFamilyLoad(TestInfo testInfo) throws Exception { 430 String testName = testInfo.getTestMethod().get().getName(); 431 byte[][][] hFileRanges = 432 new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") }, 433 new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }; 434 435 byte[] TABLE = Bytes.toBytes("mytable_" + testName); 436 // set real family name to upper case in purpose to simulate the case that 437 // family name in HFiles is invalid 438 TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE)) 439 .setColumnFamily(ColumnFamilyDescriptorBuilder 440 .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)))) 441 .build(); 442 443 try { 444 runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); 445 assertTrue(false, "Loading into table with non-existent family should have failed"); 446 } catch (Exception e) { 447 assertTrue(e instanceof IOException, "IOException expected"); 448 // further check whether exception message is correct 449 String errMsg = e.getMessage(); 450 assertTrue(errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY), 451 "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY 452 + "], current message: [" + errMsg + "]"); 453 } 454 } 455 456 @Test 457 public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception { 458 testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true); 459 } 460 461 @Test 462 public void testNonHfileFolder() throws Exception { 463 testNonHfileFolder("testNonHfileFolder", false); 464 } 465 466 /** 467 * Write a random data file and a non-file in a dir with a valid family name but not part of the 468 * table families. we should we able to bulkload without getting the unmatched family exception. 469 * HBASE-13037/HBASE-13227 470 */ 471 private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception { 472 Path dir = util.getDataTestDirOnTestFS(tableName); 473 FileSystem fs = util.getTestFileSystem(); 474 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 475 476 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 477 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY, 478 QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500); 479 createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024); 480 481 final String NON_FAMILY_FOLDER = "_logs"; 482 Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER); 483 fs.mkdirs(nonFamilyDir); 484 fs.mkdirs(new Path(nonFamilyDir, "non-file")); 485 createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024); 486 487 Table table = null; 488 try { 489 if (preCreateTable) { 490 table = util.createTable(TableName.valueOf(tableName), FAMILY); 491 } else { 492 table = util.getConnection().getTable(TableName.valueOf(tableName)); 493 } 494 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir); 495 assertEquals(500, countRows(table)); 496 } finally { 497 if (table != null) { 498 table.close(); 499 } 500 fs.delete(dir, true); 501 } 502 } 503 504 private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException { 505 FSDataOutputStream stream = fs.create(path); 506 try { 507 byte[] data = new byte[1024]; 508 for (int i = 0; i < data.length; ++i) { 509 data[i] = (byte) (i & 0xff); 510 } 511 while (size >= data.length) { 512 stream.write(data, 0, data.length); 513 size -= data.length; 514 } 515 if (size > 0) { 516 stream.write(data, 0, size); 517 } 518 } finally { 519 stream.close(); 520 } 521 } 522 523 @Test 524 public void testSplitStoreFile(TestInfo testInfo) throws IOException { 525 Path dir = util.getDataTestDirOnTestFS("testSplitHFile"); 526 FileSystem fs = util.getTestFileSystem(); 527 Path testIn = new Path(dir, "testhfile"); 528 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 529 String tableName = testInfo.getTestMethod().get().getName(); 530 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 531 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 532 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 533 534 Path bottomOut = new Path(dir, "bottom.out"); 535 Path topOut = new Path(dir, "top.out"); 536 537 BulkLoadHFilesTool.splitStoreFile( 538 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 539 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 540 541 int rowCount = verifyHFile(bottomOut); 542 rowCount += verifyHFile(topOut); 543 assertEquals(1000, rowCount); 544 } 545 546 /** 547 * Test hfile splits with the favored nodes 548 */ 549 @Test 550 public void testSplitStoreFileWithFavoriteNodes(TestInfo testInfo) throws IOException { 551 552 Path dir = new Path(util.getDefaultRootDirPath(), "testhfile"); 553 FileSystem fs = util.getDFSCluster().getFileSystem(); 554 555 Path testIn = new Path(dir, "testSplitStoreFileWithFavoriteNodes"); 556 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 557 String tableName = testInfo.getTestMethod().get().getName(); 558 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 559 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 560 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 561 562 Path bottomOut = new Path(dir, "bottom.out"); 563 Path topOut = new Path(dir, "top.out"); 564 565 final AsyncTableRegionLocator regionLocator = 566 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)); 567 BulkLoadHFilesTool.splitStoreFile(regionLocator, util.getConfiguration(), testIn, familyDesc, 568 Bytes.toBytes("ggg"), bottomOut, topOut); 569 verifyHFileFavoriteNode(topOut, regionLocator, fs); 570 verifyHFileFavoriteNode(bottomOut, regionLocator, fs); 571 int rowCount = verifyHFile(bottomOut); 572 rowCount += verifyHFile(topOut); 573 assertEquals(1000, rowCount); 574 } 575 576 @Test 577 public void testSplitStoreFileWithCreateTimeTS(TestInfo testInfo) throws IOException { 578 Path dir = util.getDataTestDirOnTestFS("testSplitStoreFileWithCreateTimeTS"); 579 FileSystem fs = util.getTestFileSystem(); 580 Path testIn = new Path(dir, "testhfile"); 581 ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY); 582 String tableName = testInfo.getTestMethod().get().getName(); 583 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 584 HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER, 585 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 586 587 Path bottomOut = new Path(dir, "bottom.out"); 588 Path topOut = new Path(dir, "top.out"); 589 590 BulkLoadHFilesTool.splitStoreFile( 591 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 592 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 593 594 verifyHFileCreateTimeTS(bottomOut); 595 verifyHFileCreateTimeTS(topOut); 596 } 597 598 @Test 599 public void testSplitStoreFileWithNoneToNone(TestInfo testInfo) throws IOException { 600 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE, 601 testInfo); 602 } 603 604 @Test 605 public void testSplitStoreFileWithEncodedToEncoded(TestInfo testInfo) throws IOException { 606 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF, 607 testInfo); 608 } 609 610 @Test 611 public void testSplitStoreFileWithEncodedToNone(TestInfo testInfo) throws IOException { 612 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE, 613 testInfo); 614 } 615 616 @Test 617 public void testSplitStoreFileWithNoneToEncoded(TestInfo testInfo) throws IOException { 618 testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF, 619 testInfo); 620 } 621 622 private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding, 623 DataBlockEncoding cfEncoding, TestInfo testInfo) throws IOException { 624 Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding"); 625 FileSystem fs = util.getTestFileSystem(); 626 Path testIn = new Path(dir, "testhfile"); 627 ColumnFamilyDescriptor familyDesc = 628 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build(); 629 String tableName = testInfo.getTestMethod().get().getName(); 630 util.createTable(TableName.valueOf(tableName), familyDesc.getNameAsString()); 631 HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn, 632 bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 633 634 Path bottomOut = new Path(dir, "bottom.out"); 635 Path topOut = new Path(dir, "top.out"); 636 637 BulkLoadHFilesTool.splitStoreFile( 638 util.getAsyncConnection().getRegionLocator(TableName.valueOf(tableName)), 639 util.getConfiguration(), testIn, familyDesc, Bytes.toBytes("ggg"), bottomOut, topOut); 640 641 int rowCount = verifyHFile(bottomOut); 642 rowCount += verifyHFile(topOut); 643 assertEquals(1000, rowCount); 644 } 645 646 private int verifyHFile(Path p) throws IOException { 647 Configuration conf = util.getConfiguration(); 648 HFile.Reader reader = 649 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf); 650 HFileScanner scanner = reader.getScanner(conf, false, false); 651 scanner.seekTo(); 652 int count = 0; 653 do { 654 count++; 655 } while (scanner.next()); 656 assertTrue(count > 0); 657 reader.close(); 658 return count; 659 } 660 661 private void verifyHFileCreateTimeTS(Path p) throws IOException { 662 Configuration conf = util.getConfiguration(); 663 664 try (HFile.Reader reader = 665 HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf)) { 666 long fileCreateTime = reader.getHFileInfo().getHFileContext().getFileCreateTime(); 667 MatcherAssert.assertThat(fileCreateTime, greaterThan(0L)); 668 } 669 } 670 671 /** 672 * test split storefile with favorite node information 673 */ 674 private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocator, FileSystem fs) 675 throws IOException { 676 Configuration conf = util.getConfiguration(); 677 678 try (HFile.Reader reader = HFile.createReader(fs, p, new CacheConfig(conf), true, conf);) { 679 680 final byte[] firstRowkey = reader.getFirstRowKey().get(); 681 final HRegionLocation hRegionLocation = 682 FutureUtils.get(regionLocator.getRegionLocation(firstRowkey)); 683 684 final String targetHostName = hRegionLocation.getHostname(); 685 686 if (fs instanceof DistributedFileSystem) { 687 String pathStr = p.toUri().getPath(); 688 LocatedBlocks blocks = 689 ((DistributedFileSystem) fs).getClient().getLocatedBlocks(pathStr, 0L); 690 691 boolean isFavoriteNode = false; 692 List<LocatedBlock> locatedBlocks = blocks.getLocatedBlocks(); 693 int index = 0; 694 do { 695 if (index > 0) { 696 assertTrue(isFavoriteNode, "failed use favored nodes"); 697 } 698 isFavoriteNode = false; 699 final LocatedBlock block = locatedBlocks.get(index); 700 701 final DatanodeInfo[] locations = getLocatedBlockLocations(block); 702 for (DatanodeInfo location : locations) { 703 704 final String hostName = location.getHostName(); 705 if ( 706 targetHostName.equals(hostName.equals("127.0.0.1") 707 ? InetAddress.getLocalHost().getHostName() 708 : "127.0.0.1") || targetHostName.equals(hostName) 709 ) { 710 isFavoriteNode = true; 711 break; 712 } 713 } 714 715 index++; 716 } while (index < locatedBlocks.size()); 717 if (index > 0) { 718 assertTrue(isFavoriteNode, "failed use favored nodes"); 719 } 720 721 } 722 723 } 724 } 725 726 private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) { 727 Integer value = map.containsKey(first) ? map.get(first) : 0; 728 map.put(first, value + 1); 729 730 value = map.containsKey(last) ? map.get(last) : 0; 731 map.put(last, value - 1); 732 } 733 734 @Test 735 public void testInferBoundaries() { 736 TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 737 738 /* 739 * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s 740 * u----w Should be inferred as: a-----------------k m-------------q r--------------t 741 * u---------x The output should be (m,r,u) 742 */ 743 744 String first; 745 String last; 746 747 first = "a"; 748 last = "e"; 749 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 750 751 first = "r"; 752 last = "s"; 753 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 754 755 first = "o"; 756 last = "p"; 757 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 758 759 first = "g"; 760 last = "k"; 761 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 762 763 first = "v"; 764 last = "x"; 765 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 766 767 first = "c"; 768 last = "i"; 769 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 770 771 first = "m"; 772 last = "q"; 773 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 774 775 first = "s"; 776 last = "t"; 777 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 778 779 first = "u"; 780 last = "w"; 781 addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last)); 782 783 byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map); 784 byte[][] compare = new byte[3][]; 785 compare[0] = Bytes.toBytes("m"); 786 compare[1] = Bytes.toBytes("r"); 787 compare[2] = Bytes.toBytes("u"); 788 789 assertEquals(3, keysArray.length); 790 791 for (int row = 0; row < keysArray.length; row++) { 792 assertArrayEquals(keysArray[row], compare[row]); 793 } 794 } 795 796 @Test 797 public void testLoadTooMayHFiles() throws Exception { 798 Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles"); 799 FileSystem fs = util.getTestFileSystem(); 800 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 801 Path familyDir = new Path(dir, Bytes.toString(FAMILY)); 802 803 byte[] from = Bytes.toBytes("begin"); 804 byte[] to = Bytes.toBytes("end"); 805 for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) { 806 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i), 807 FAMILY, QUALIFIER, from, to, 1000); 808 } 809 810 try { 811 BulkLoadHFiles.create(util.getConfiguration()) 812 .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir); 813 fail("Bulk loading too many files should fail"); 814 } catch (IOException ie) { 815 assertTrue(ie.getMessage() 816 .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles")); 817 } 818 } 819 820 @Test 821 public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception { 822 Configuration conf = util.getConfiguration(); 823 conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no"); 824 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 825 String[] args = { "directory", "nonExistingTable" }; 826 assertThrows(TableNotFoundException.class, () -> loader.run(args)); 827 } 828 829 @Test 830 public void testTableWithCFNameStartWithUnderScore(TestInfo testInfo) throws Exception { 831 Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore"); 832 FileSystem fs = util.getTestFileSystem(); 833 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 834 String family = "_cf"; 835 Path familyDir = new Path(dir, family); 836 837 byte[] from = Bytes.toBytes("begin"); 838 byte[] to = Bytes.toBytes("end"); 839 Configuration conf = util.getConfiguration(); 840 String tableName = testInfo.getTestMethod().get().getName(); 841 try (Table table = util.createTable(TableName.valueOf(tableName), family)) { 842 HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family), 843 QUALIFIER, from, to, 1000); 844 BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir); 845 assertEquals(1000, countRows(table)); 846 } 847 } 848 849 @Test 850 public void testBulkLoadByFamily(TestInfo testInfo) throws Exception { 851 Path dir = util.getDataTestDirOnTestFS("testBulkLoadByFamily"); 852 FileSystem fs = util.getTestFileSystem(); 853 dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); 854 String tableName = testInfo.getTestMethod().get().getName(); 855 String[] families = { "cf1", "cf2", "cf3" }; 856 for (int i = 0; i < families.length; i++) { 857 byte[] from = Bytes.toBytes(i + "begin"); 858 byte[] to = Bytes.toBytes(i + "end"); 859 Path familyDir = new Path(dir, families[i]); 860 HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile"), 861 Bytes.toBytes(families[i]), QUALIFIER, from, to, 1000); 862 } 863 Table table = util.createTable(TableName.valueOf(tableName), families); 864 final AtomicInteger attemptedCalls = new AtomicInteger(); 865 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true); 866 BulkLoadHFiles loader = new BulkLoadHFilesTool(util.getConfiguration()) { 867 @Override 868 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 869 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 870 final byte[] first, Collection<LoadQueueItem> lqis) { 871 attemptedCalls.incrementAndGet(); 872 return super.tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis); 873 } 874 }; 875 try { 876 loader.bulkLoad(table.getName(), dir); 877 assertEquals(families.length, attemptedCalls.get()); 878 assertEquals(1000 * families.length, HBaseTestingUtil.countRows(table)); 879 } finally { 880 if (null != table) { 881 table.close(); 882 } 883 util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false); 884 } 885 } 886 887 @Test 888 public void testFailIfNeedSplitHFile(TestInfo testInfo) throws IOException { 889 TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 890 Table table = util.createTable(tableName, FAMILY); 891 892 util.loadTable(table, FAMILY); 893 894 FileSystem fs = util.getTestFileSystem(); 895 Path sfPath = new Path(fs.getWorkingDirectory(), new Path(Bytes.toString(FAMILY), "file")); 896 HFileTestUtil.createHFile(util.getConfiguration(), fs, sfPath, FAMILY, QUALIFIER, 897 Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000); 898 899 util.getAdmin().split(tableName); 900 util.waitFor(10000, 1000, () -> util.getAdmin().getRegions(tableName).size() > 1); 901 902 Configuration config = new Configuration(util.getConfiguration()); 903 config.setBoolean(BulkLoadHFilesTool.FAIL_IF_NEED_SPLIT_HFILE, true); 904 BulkLoadHFilesTool tool = new BulkLoadHFilesTool(config); 905 906 String[] args = new String[] { fs.getWorkingDirectory().toString(), tableName.toString() }; 907 assertThrows(IOException.class, () -> tool.run(args)); 908 util.getHBaseCluster().getRegions(tableName) 909 .forEach(r -> assertEquals(1, r.getStore(FAMILY).getStorefiles().size())); 910 } 911}