001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Collection; 030import java.util.Deque; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.stream.IntStream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.MetaTableAccessor; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableExistsException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ClientServiceCallable; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionInfoBuilder; 055import org.apache.hadoop.hbase.client.RegionLocator; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.ResultScanner; 058import org.apache.hadoop.hbase.client.Scan; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 063import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 064import org.apache.hadoop.hbase.log.HBaseMarkers; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 067import org.apache.hadoop.hbase.testclassification.LargeTests; 068import org.apache.hadoop.hbase.testclassification.MiscTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.Pair; 072import org.junit.AfterClass; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.junit.rules.TestName; 079import org.mockito.Mockito; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 090 091/** 092 * Test cases for the atomic load error handling of the bulk load functionality. 093 */ 094@Category({ MiscTests.class, LargeTests.class }) 095public class TestLoadIncrementalHFilesSplitRecovery { 096 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class); 100 101 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 102 103 static HBaseTestingUtility util; 104 // used by secure subclass 105 static boolean useSecure = false; 106 107 final static int NUM_CFS = 10; 108 final static byte[] QUAL = Bytes.toBytes("qual"); 109 final static int ROWCOUNT = 100; 110 111 private final static byte[][] families = new byte[NUM_CFS][]; 112 113 @Rule 114 public TestName name = new TestName(); 115 116 static { 117 for (int i = 0; i < NUM_CFS; i++) { 118 families[i] = Bytes.toBytes(family(i)); 119 } 120 } 121 122 static byte[] rowkey(int i) { 123 return Bytes.toBytes(String.format("row_%08d", i)); 124 } 125 126 static String family(int i) { 127 return String.format("family_%04d", i); 128 } 129 130 static byte[] value(int i) { 131 return Bytes.toBytes(String.format("%010d", i)); 132 } 133 134 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 135 byte[] val = value(value); 136 for (int i = 0; i < NUM_CFS; i++) { 137 Path testIn = new Path(dir, family(i)); 138 139 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 140 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 141 } 142 } 143 144 private TableDescriptor createTableDesc(TableName name, int cfs) { 145 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 146 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 147 .forEachOrdered(builder::setColumnFamily); 148 return builder.build(); 149 } 150 151 /** 152 * Creates a table with given table name and specified number of column families if the table does 153 * not already exist. 154 */ 155 private void setupTable(final Connection connection, TableName table, int cfs) 156 throws IOException { 157 try { 158 LOG.info("Creating table " + table); 159 try (Admin admin = connection.getAdmin()) { 160 admin.createTable(createTableDesc(table, cfs)); 161 } 162 } catch (TableExistsException tee) { 163 LOG.info("Table " + table + " already exists"); 164 } 165 } 166 167 /** 168 * Creates a table with given table name,specified number of column families<br> 169 * and splitkeys if the table does not already exist. 170 * @param table 171 * @param cfs 172 * @param SPLIT_KEYS 173 */ 174 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 175 throws IOException { 176 try { 177 LOG.info("Creating table " + table); 178 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 179 } catch (TableExistsException tee) { 180 LOG.info("Table " + table + " already exists"); 181 } 182 } 183 184 private Path buildBulkFiles(TableName table, int value) throws Exception { 185 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 186 Path bulk1 = new Path(dir, table.getNameAsString() + value); 187 FileSystem fs = util.getTestFileSystem(); 188 buildHFiles(fs, bulk1, value); 189 return bulk1; 190 } 191 192 /** 193 * Populate table with known values. 194 */ 195 private void populateTable(final Connection connection, TableName table, int value) 196 throws Exception { 197 // create HFiles for different column families 198 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 199 Path bulk1 = buildBulkFiles(table, value); 200 try (Table t = connection.getTable(table); 201 RegionLocator locator = connection.getRegionLocator(table); 202 Admin admin = connection.getAdmin()) { 203 lih.doBulkLoad(bulk1, admin, t, locator); 204 } 205 } 206 207 /** 208 * Split the known table in half. (this is hard coded for this test suite) 209 */ 210 private void forceSplit(TableName table) { 211 try { 212 // need to call regions server to by synchronous but isn't visible. 213 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 214 215 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 216 if (hri.getTable().equals(table)) { 217 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 218 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 219 } 220 } 221 222 // verify that split completed. 223 int regions; 224 do { 225 regions = 0; 226 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 227 if (hri.getTable().equals(table)) { 228 regions++; 229 } 230 } 231 if (regions != 2) { 232 LOG.info("Taking some time to complete split..."); 233 Thread.sleep(250); 234 } 235 } while (regions != 2); 236 } catch (IOException e) { 237 e.printStackTrace(); 238 } catch (InterruptedException e) { 239 e.printStackTrace(); 240 } 241 } 242 243 @BeforeClass 244 public static void setupCluster() throws Exception { 245 util = new HBaseTestingUtility(); 246 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 247 util.startMiniCluster(1); 248 } 249 250 @AfterClass 251 public static void teardownCluster() throws Exception { 252 util.shutdownMiniCluster(); 253 } 254 255 /** 256 * Checks that all columns have the expected value and that there is the expected number of rows. 257 * @throws IOException 258 */ 259 void assertExpectedTable(TableName table, int count, int value) throws IOException { 260 TableDescriptor htd = util.getAdmin().getDescriptor(table); 261 assertNotNull(htd); 262 try (Table t = util.getConnection().getTable(table); 263 ResultScanner sr = t.getScanner(new Scan())) { 264 int i = 0; 265 for (Result r; (r = sr.next()) != null;) { 266 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 267 .forEach(v -> assertArrayEquals(value(value), v)); 268 i++; 269 } 270 assertEquals(count, i); 271 } catch (IOException e) { 272 fail("Failed due to exception"); 273 } 274 } 275 276 /** 277 * Test that shows that exception thrown from the RS side will result in an exception on the 278 * LIHFile client. 279 */ 280 @Test(expected = IOException.class) 281 public void testBulkLoadPhaseFailure() throws Exception { 282 final TableName table = TableName.valueOf(name.getMethodName()); 283 final AtomicInteger attmptedCalls = new AtomicInteger(); 284 final AtomicInteger failedCalls = new AtomicInteger(); 285 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 286 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 287 setupTable(connection, table, 10); 288 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 289 @Override 290 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, 291 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis, 292 boolean copyFile) throws IOException { 293 int i = attmptedCalls.incrementAndGet(); 294 if (i == 1) { 295 Connection errConn; 296 try { 297 errConn = getMockedConnection(util.getConfiguration()); 298 } catch (Exception e) { 299 LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e); 300 throw new RuntimeException("mocking cruft, should never happen"); 301 } 302 failedCalls.incrementAndGet(); 303 return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true); 304 } 305 306 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true); 307 } 308 }; 309 try { 310 // create HFiles for different column families 311 Path dir = buildBulkFiles(table, 1); 312 try (Table t = connection.getTable(table); 313 RegionLocator locator = connection.getRegionLocator(table); 314 Admin admin = connection.getAdmin()) { 315 lih.doBulkLoad(dir, admin, t, locator); 316 } 317 } finally { 318 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 319 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 320 } 321 fail("doBulkLoad should have thrown an exception"); 322 } 323 } 324 325 /** 326 * Test that shows that exception thrown from the RS side will result in the expected number of 327 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 328 * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set 329 */ 330 @Test 331 public void testRetryOnIOException() throws Exception { 332 final TableName table = TableName.valueOf(name.getMethodName()); 333 final AtomicInteger calls = new AtomicInteger(0); 334 final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); 335 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 336 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); 337 final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 338 @Override 339 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 340 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 341 if (calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 342 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 343 calls.getAndIncrement(); 344 return new ClientServiceCallable<byte[]>(conn, tableName, first, 345 new RpcControllerFactory(util.getConfiguration()).newController(), 346 HConstants.PRIORITY_UNSET) { 347 @Override 348 public byte[] rpcCall() throws Exception { 349 throw new IOException("Error calling something on RegionServer"); 350 } 351 }; 352 } else { 353 return super.buildClientServiceCallable(conn, tableName, first, lqis, true); 354 } 355 } 356 }; 357 setupTable(conn, table, 10); 358 Path dir = buildBulkFiles(table, 1); 359 lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); 360 assertEquals(calls.get(), 2); 361 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); 362 } 363 364 private ClusterConnection getMockedConnection(final Configuration conf) 365 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 366 ClusterConnection c = Mockito.mock(ClusterConnection.class); 367 Mockito.when(c.getConfiguration()).thenReturn(conf); 368 Mockito.doNothing().when(c).close(); 369 // Make it so we return a particular location when asked. 370 final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, 371 ServerName.valueOf("example.org", 1234, 0)); 372 Mockito.when( 373 c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) 374 .thenReturn(loc); 375 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); 376 ClientProtos.ClientService.BlockingInterface hri = 377 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 378 Mockito 379 .when( 380 hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) 381 .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); 382 Mockito.when(c.getClient(Mockito.any())).thenReturn(hri); 383 return c; 384 } 385 386 /** 387 * This test exercises the path where there is a split after initial validation but before the 388 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 389 * split just before the atomic region load. 390 */ 391 @Test 392 public void testSplitWhileBulkLoadPhase() throws Exception { 393 final TableName table = TableName.valueOf(name.getMethodName()); 394 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 395 setupTable(connection, table, 10); 396 populateTable(connection, table, 1); 397 assertExpectedTable(table, ROWCOUNT, 1); 398 399 // Now let's cause trouble. This will occur after checks and cause bulk 400 // files to fail when attempt to atomically import. This is recoverable. 401 final AtomicInteger attemptedCalls = new AtomicInteger(); 402 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { 403 @Override 404 protected void bulkLoadPhase(final Table htable, final Connection conn, 405 ExecutorService pool, Deque<LoadQueueItem> queue, 406 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 407 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 408 int i = attemptedCalls.incrementAndGet(); 409 if (i == 1) { 410 // On first attempt force a split. 411 forceSplit(table); 412 } 413 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 414 } 415 }; 416 417 // create HFiles for different column families 418 try (Table t = connection.getTable(table); 419 RegionLocator locator = connection.getRegionLocator(table); 420 Admin admin = connection.getAdmin()) { 421 Path bulk = buildBulkFiles(table, 2); 422 lih2.doBulkLoad(bulk, admin, t, locator); 423 } 424 425 // check that data was loaded 426 // The three expected attempts are 1) failure because need to split, 2) 427 // load of split top 3) load of split bottom 428 assertEquals(3, attemptedCalls.get()); 429 assertExpectedTable(table, ROWCOUNT, 2); 430 } 431 } 432 433 /** 434 * This test splits a table and attempts to bulk load. The bulk import files should be split 435 * before atomically importing. 436 */ 437 @Test 438 public void testGroupOrSplitPresplit() throws Exception { 439 final TableName table = TableName.valueOf(name.getMethodName()); 440 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 441 setupTable(connection, table, 10); 442 populateTable(connection, table, 1); 443 assertExpectedTable(connection, table, ROWCOUNT, 1); 444 forceSplit(table); 445 446 final AtomicInteger countedLqis = new AtomicInteger(); 447 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 448 @Override 449 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 450 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 451 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 452 Pair<List<LoadQueueItem>, String> lqis = 453 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 454 if (lqis != null && lqis.getFirst() != null) { 455 countedLqis.addAndGet(lqis.getFirst().size()); 456 } 457 return lqis; 458 } 459 }; 460 461 // create HFiles for different column families 462 Path bulk = buildBulkFiles(table, 2); 463 try (Table t = connection.getTable(table); 464 RegionLocator locator = connection.getRegionLocator(table); 465 Admin admin = connection.getAdmin()) { 466 lih.doBulkLoad(bulk, admin, t, locator); 467 } 468 assertExpectedTable(connection, table, ROWCOUNT, 2); 469 assertEquals(20, countedLqis.get()); 470 } 471 } 472 473 @Test 474 public void testCorrectSplitPoint() throws Exception { 475 final TableName table = TableName.valueOf(name.getMethodName()); 476 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 477 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 478 Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), 479 Bytes.toBytes("row_00000070") }; 480 setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS); 481 482 final AtomicInteger bulkloadRpcTimes = new AtomicInteger(); 483 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 484 485 @Override 486 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 487 Deque<LoadIncrementalHFiles.LoadQueueItem> queue, 488 Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem> regionGroups, boolean copyFile, 489 Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 490 bulkloadRpcTimes.addAndGet(1); 491 super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 492 } 493 }; 494 495 Path dir = buildBulkFiles(table, 1); 496 loader.bulkLoad(table, dir); 497 // before HBASE-25281 we need invoke bulkload rpc 8 times 498 assertEquals(4, bulkloadRpcTimes.get()); 499 } 500 501 /** 502 * This test creates a table with many small regions. The bulk load files would be splitted 503 * multiple times before all of them can be loaded successfully. 504 */ 505 @Test 506 public void testSplitTmpFileCleanUp() throws Exception { 507 final TableName table = TableName.valueOf(name.getMethodName()); 508 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 509 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 510 Bytes.toBytes("row_00000050") }; 511 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 512 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 513 514 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 515 516 // create HFiles 517 Path bulk = buildBulkFiles(table, 2); 518 try (Table t = connection.getTable(table); 519 RegionLocator locator = connection.getRegionLocator(table); 520 Admin admin = connection.getAdmin()) { 521 lih.doBulkLoad(bulk, admin, t, locator); 522 } 523 // family path 524 Path tmpPath = new Path(bulk, family(0)); 525 // TMP_DIR under family path 526 tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); 527 FileSystem fs = bulk.getFileSystem(util.getConfiguration()); 528 // HFiles have been splitted, there is TMP_DIR 529 assertTrue(fs.exists(tmpPath)); 530 // TMP_DIR should have been cleaned-up 531 assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", 532 CommonFSUtils.listStatus(fs, tmpPath)); 533 assertExpectedTable(connection, table, ROWCOUNT, 2); 534 } 535 } 536 537 /** 538 * This simulates an remote exception which should cause LIHF to exit with an exception. 539 */ 540 @Test(expected = IOException.class) 541 public void testGroupOrSplitFailure() throws Exception { 542 final TableName tableName = TableName.valueOf(name.getMethodName()); 543 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 544 setupTable(connection, tableName, 10); 545 546 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 547 int i = 0; 548 549 @Override 550 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 551 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 552 final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 553 i++; 554 555 if (i == 5) { 556 throw new IOException("failure"); 557 } 558 return super.groupOrSplit(regionGroups, item, table, startEndKeys); 559 } 560 }; 561 562 // create HFiles for different column families 563 Path dir = buildBulkFiles(tableName, 1); 564 try (Table t = connection.getTable(tableName); 565 RegionLocator locator = connection.getRegionLocator(tableName); 566 Admin admin = connection.getAdmin()) { 567 lih.doBulkLoad(dir, admin, t, locator); 568 } 569 } 570 571 fail("doBulkLoad should have thrown an exception"); 572 } 573 574 @Test 575 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { 576 final TableName tableName = TableName.valueOf(name.getMethodName()); 577 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; 578 // Share connection. We were failing to find the table with our new reverse scan because it 579 // looks for first region, not any region -- that is how it works now. The below removes first 580 // region in test. Was reliant on the Connection caching having first region. 581 Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); 582 Table table = connection.getTable(tableName); 583 584 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); 585 Path dir = buildBulkFiles(tableName, 2); 586 587 final AtomicInteger countedLqis = new AtomicInteger(); 588 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 589 590 @Override 591 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 592 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 593 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 594 Pair<List<LoadQueueItem>, String> lqis = 595 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 596 if (lqis != null && lqis.getFirst() != null) { 597 countedLqis.addAndGet(lqis.getFirst().size()); 598 } 599 return lqis; 600 } 601 }; 602 603 // do bulkload when there is no region hole in hbase:meta. 604 try (Table t = connection.getTable(tableName); 605 RegionLocator locator = connection.getRegionLocator(tableName); 606 Admin admin = connection.getAdmin()) { 607 loader.doBulkLoad(dir, admin, t, locator); 608 } catch (Exception e) { 609 LOG.error("exeception=", e); 610 } 611 // check if all the data are loaded into the table. 612 this.assertExpectedTable(tableName, ROWCOUNT, 2); 613 614 dir = buildBulkFiles(tableName, 3); 615 616 // Mess it up by leaving a hole in the hbase:meta 617 List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 618 for (RegionInfo regionInfo : regionInfos) { 619 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 620 MetaTableAccessor.deleteRegionInfo(connection, regionInfo); 621 break; 622 } 623 } 624 625 try (Table t = connection.getTable(tableName); 626 RegionLocator locator = connection.getRegionLocator(tableName); 627 Admin admin = connection.getAdmin()) { 628 loader.doBulkLoad(dir, admin, t, locator); 629 } catch (Exception e) { 630 LOG.error("exception=", e); 631 assertTrue("IOException expected", e instanceof IOException); 632 } 633 634 table.close(); 635 636 // Make sure at least the one region that still exists can be found. 637 regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 638 assertTrue(regionInfos.size() >= 1); 639 640 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); 641 connection.close(); 642 } 643 644 /** 645 * Checks that all columns have the expected value and that there is the expected number of rows. 646 * @throws IOException 647 */ 648 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 649 throws IOException { 650 TableDescriptor htd = util.getAdmin().getDescriptor(table); 651 assertNotNull(htd); 652 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 653 int i = 0; 654 for (Result r; (r = sr.next()) != null;) { 655 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 656 .forEach(v -> assertArrayEquals(value(value), v)); 657 i++; 658 } 659 assertEquals(count, i); 660 } catch (IOException e) { 661 fail("Failed due to exception"); 662 } 663 } 664}