001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Collection; 030import java.util.Deque; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.stream.IntStream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.MetaTableAccessor; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableExistsException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ClientServiceCallable; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionInfoBuilder; 055import org.apache.hadoop.hbase.client.RegionLocator; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.ResultScanner; 058import org.apache.hadoop.hbase.client.Scan; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 063import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 064import org.apache.hadoop.hbase.log.HBaseMarkers; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 067import org.apache.hadoop.hbase.testclassification.LargeTests; 068import org.apache.hadoop.hbase.testclassification.MiscTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.FSUtils; 071import org.apache.hadoop.hbase.util.Pair; 072import org.junit.AfterClass; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.junit.rules.TestName; 079import org.mockito.Mockito; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 090 091/** 092 * Test cases for the atomic load error handling of the bulk load functionality. 093 */ 094@Category({ MiscTests.class, LargeTests.class }) 095public class TestLoadIncrementalHFilesSplitRecovery { 096 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class); 100 101 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 102 103 static HBaseTestingUtility util; 104 // used by secure subclass 105 static boolean useSecure = false; 106 107 final static int NUM_CFS = 10; 108 final static byte[] QUAL = Bytes.toBytes("qual"); 109 final static int ROWCOUNT = 100; 110 111 private final static byte[][] families = new byte[NUM_CFS][]; 112 113 @Rule 114 public TestName name = new TestName(); 115 116 static { 117 for (int i = 0; i < NUM_CFS; i++) { 118 families[i] = Bytes.toBytes(family(i)); 119 } 120 } 121 122 static byte[] rowkey(int i) { 123 return Bytes.toBytes(String.format("row_%08d", i)); 124 } 125 126 static String family(int i) { 127 return String.format("family_%04d", i); 128 } 129 130 static byte[] value(int i) { 131 return Bytes.toBytes(String.format("%010d", i)); 132 } 133 134 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 135 byte[] val = value(value); 136 for (int i = 0; i < NUM_CFS; i++) { 137 Path testIn = new Path(dir, family(i)); 138 139 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 140 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 141 } 142 } 143 144 private TableDescriptor createTableDesc(TableName name, int cfs) { 145 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 146 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 147 .forEachOrdered(builder::setColumnFamily); 148 return builder.build(); 149 } 150 151 /** 152 * Creates a table with given table name and specified number of column families if the table does 153 * not already exist. 154 */ 155 private void setupTable(final Connection connection, TableName table, int cfs) 156 throws IOException { 157 try { 158 LOG.info("Creating table " + table); 159 try (Admin admin = connection.getAdmin()) { 160 admin.createTable(createTableDesc(table, cfs)); 161 } 162 } catch (TableExistsException tee) { 163 LOG.info("Table " + table + " already exists"); 164 } 165 } 166 167 /** 168 * Creates a table with given table name,specified number of column families<br> 169 * and splitkeys if the table does not already exist. 170 * @param table 171 * @param cfs 172 * @param SPLIT_KEYS 173 */ 174 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 175 throws IOException { 176 try { 177 LOG.info("Creating table " + table); 178 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 179 } catch (TableExistsException tee) { 180 LOG.info("Table " + table + " already exists"); 181 } 182 } 183 184 private Path buildBulkFiles(TableName table, int value) throws Exception { 185 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 186 Path bulk1 = new Path(dir, table.getNameAsString() + value); 187 FileSystem fs = util.getTestFileSystem(); 188 buildHFiles(fs, bulk1, value); 189 return bulk1; 190 } 191 192 /** 193 * Populate table with known values. 194 */ 195 private void populateTable(final Connection connection, TableName table, int value) 196 throws Exception { 197 // create HFiles for different column families 198 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 199 Path bulk1 = buildBulkFiles(table, value); 200 try (Table t = connection.getTable(table); 201 RegionLocator locator = connection.getRegionLocator(table); 202 Admin admin = connection.getAdmin()) { 203 lih.doBulkLoad(bulk1, admin, t, locator); 204 } 205 } 206 207 /** 208 * Split the known table in half. (this is hard coded for this test suite) 209 */ 210 private void forceSplit(TableName table) { 211 try { 212 // need to call regions server to by synchronous but isn't visible. 213 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 214 215 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 216 if (hri.getTable().equals(table)) { 217 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 218 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 219 } 220 } 221 222 // verify that split completed. 223 int regions; 224 do { 225 regions = 0; 226 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 227 if (hri.getTable().equals(table)) { 228 regions++; 229 } 230 } 231 if (regions != 2) { 232 LOG.info("Taking some time to complete split..."); 233 Thread.sleep(250); 234 } 235 } while (regions != 2); 236 } catch (IOException e) { 237 e.printStackTrace(); 238 } catch (InterruptedException e) { 239 e.printStackTrace(); 240 } 241 } 242 243 @BeforeClass 244 public static void setupCluster() throws Exception { 245 util = new HBaseTestingUtility(); 246 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 247 util.startMiniCluster(1); 248 } 249 250 @AfterClass 251 public static void teardownCluster() throws Exception { 252 util.shutdownMiniCluster(); 253 } 254 255 /** 256 * Checks that all columns have the expected value and that there is the expected number of rows. 257 * @throws IOException 258 */ 259 void assertExpectedTable(TableName table, int count, int value) throws IOException { 260 TableDescriptor htd = util.getAdmin().getDescriptor(table); 261 assertNotNull(htd); 262 try (Table t = util.getConnection().getTable(table); 263 ResultScanner sr = t.getScanner(new Scan())) { 264 int i = 0; 265 for (Result r; (r = sr.next()) != null;) { 266 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 267 .forEach(v -> assertArrayEquals(value(value), v)); 268 i++; 269 } 270 assertEquals(count, i); 271 } catch (IOException e) { 272 fail("Failed due to exception"); 273 } 274 } 275 276 /** 277 * Test that shows that exception thrown from the RS side will result in an exception on the 278 * LIHFile client. 279 */ 280 @Test(expected = IOException.class) 281 public void testBulkLoadPhaseFailure() throws Exception { 282 final TableName table = TableName.valueOf(name.getMethodName()); 283 final AtomicInteger attmptedCalls = new AtomicInteger(); 284 final AtomicInteger failedCalls = new AtomicInteger(); 285 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 286 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 287 setupTable(connection, table, 10); 288 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 289 @Override 290 protected List<LoadQueueItem> tryAtomicRegionLoad( 291 ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first, 292 Collection<LoadQueueItem> lqis) throws IOException { 293 int i = attmptedCalls.incrementAndGet(); 294 if (i == 1) { 295 Connection errConn; 296 try { 297 errConn = getMockedConnection(util.getConfiguration()); 298 serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true); 299 } catch (Exception e) { 300 LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e); 301 throw new RuntimeException("mocking cruft, should never happen"); 302 } 303 failedCalls.incrementAndGet(); 304 return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); 305 } 306 307 return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); 308 } 309 }; 310 try { 311 // create HFiles for different column families 312 Path dir = buildBulkFiles(table, 1); 313 try (Table t = connection.getTable(table); 314 RegionLocator locator = connection.getRegionLocator(table); 315 Admin admin = connection.getAdmin()) { 316 lih.doBulkLoad(dir, admin, t, locator); 317 } 318 } finally { 319 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 320 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 321 } 322 fail("doBulkLoad should have thrown an exception"); 323 } 324 } 325 326 /** 327 * Test that shows that exception thrown from the RS side will result in the expected number of 328 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 329 * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set 330 */ 331 @Test 332 public void testRetryOnIOException() throws Exception { 333 final TableName table = TableName.valueOf(name.getMethodName()); 334 final AtomicInteger calls = new AtomicInteger(0); 335 final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); 336 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 337 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); 338 final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 339 @Override 340 protected List<LoadQueueItem> tryAtomicRegionLoad( 341 ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first, 342 Collection<LoadQueueItem> lqis) throws IOException { 343 if (calls.get() < util.getConfiguration().getInt( 344 HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 345 ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn, 346 tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), 347 HConstants.PRIORITY_UNSET) { 348 @Override 349 public byte[] rpcCall() throws Exception { 350 throw new IOException("Error calling something on RegionServer"); 351 } 352 }; 353 calls.getAndIncrement(); 354 return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); 355 } else { 356 return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); 357 } 358 } 359 }; 360 setupTable(conn, table, 10); 361 Path dir = buildBulkFiles(table, 1); 362 lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); 363 assertEquals(calls.get(), 2); 364 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); 365 } 366 367 private ClusterConnection getMockedConnection(final Configuration conf) 368 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 369 ClusterConnection c = Mockito.mock(ClusterConnection.class); 370 Mockito.when(c.getConfiguration()).thenReturn(conf); 371 Mockito.doNothing().when(c).close(); 372 // Make it so we return a particular location when asked. 373 final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, 374 ServerName.valueOf("example.org", 1234, 0)); 375 Mockito.when( 376 c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) 377 .thenReturn(loc); 378 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); 379 ClientProtos.ClientService.BlockingInterface hri = 380 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 381 Mockito 382 .when( 383 hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) 384 .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); 385 Mockito.when(c.getClient(Mockito.any())).thenReturn(hri); 386 return c; 387 } 388 389 /** 390 * This test exercises the path where there is a split after initial validation but before the 391 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 392 * split just before the atomic region load. 393 */ 394 @Test 395 public void testSplitWhileBulkLoadPhase() throws Exception { 396 final TableName table = TableName.valueOf(name.getMethodName()); 397 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 398 setupTable(connection, table, 10); 399 populateTable(connection, table, 1); 400 assertExpectedTable(table, ROWCOUNT, 1); 401 402 // Now let's cause trouble. This will occur after checks and cause bulk 403 // files to fail when attempt to atomically import. This is recoverable. 404 final AtomicInteger attemptedCalls = new AtomicInteger(); 405 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { 406 @Override 407 protected void bulkLoadPhase(final Table htable, final Connection conn, 408 ExecutorService pool, Deque<LoadQueueItem> queue, 409 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 410 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 411 int i = attemptedCalls.incrementAndGet(); 412 if (i == 1) { 413 // On first attempt force a split. 414 forceSplit(table); 415 } 416 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 417 } 418 }; 419 420 // create HFiles for different column families 421 try (Table t = connection.getTable(table); 422 RegionLocator locator = connection.getRegionLocator(table); 423 Admin admin = connection.getAdmin()) { 424 Path bulk = buildBulkFiles(table, 2); 425 lih2.doBulkLoad(bulk, admin, t, locator); 426 } 427 428 // check that data was loaded 429 // The three expected attempts are 1) failure because need to split, 2) 430 // load of split top 3) load of split bottom 431 assertEquals(3, attemptedCalls.get()); 432 assertExpectedTable(table, ROWCOUNT, 2); 433 } 434 } 435 436 /** 437 * This test splits a table and attempts to bulk load. The bulk import files should be split 438 * before atomically importing. 439 */ 440 @Test 441 public void testGroupOrSplitPresplit() throws Exception { 442 final TableName table = TableName.valueOf(name.getMethodName()); 443 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 444 setupTable(connection, table, 10); 445 populateTable(connection, table, 1); 446 assertExpectedTable(connection, table, ROWCOUNT, 1); 447 forceSplit(table); 448 449 final AtomicInteger countedLqis = new AtomicInteger(); 450 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 451 @Override 452 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 453 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 454 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 455 Pair<List<LoadQueueItem>, String> lqis = 456 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 457 if (lqis != null && lqis.getFirst() != null) { 458 countedLqis.addAndGet(lqis.getFirst().size()); 459 } 460 return lqis; 461 } 462 }; 463 464 // create HFiles for different column families 465 Path bulk = buildBulkFiles(table, 2); 466 try (Table t = connection.getTable(table); 467 RegionLocator locator = connection.getRegionLocator(table); 468 Admin admin = connection.getAdmin()) { 469 lih.doBulkLoad(bulk, admin, t, locator); 470 } 471 assertExpectedTable(connection, table, ROWCOUNT, 2); 472 assertEquals(20, countedLqis.get()); 473 } 474 } 475 476 /** 477 * This test creates a table with many small regions. The bulk load files would be splitted 478 * multiple times before all of them can be loaded successfully. 479 */ 480 @Test 481 public void testSplitTmpFileCleanUp() throws Exception { 482 final TableName table = TableName.valueOf(name.getMethodName()); 483 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 484 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 485 Bytes.toBytes("row_00000050") }; 486 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 487 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 488 489 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 490 491 // create HFiles 492 Path bulk = buildBulkFiles(table, 2); 493 try (Table t = connection.getTable(table); 494 RegionLocator locator = connection.getRegionLocator(table); 495 Admin admin = connection.getAdmin()) { 496 lih.doBulkLoad(bulk, admin, t, locator); 497 } 498 // family path 499 Path tmpPath = new Path(bulk, family(0)); 500 // TMP_DIR under family path 501 tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); 502 FileSystem fs = bulk.getFileSystem(util.getConfiguration()); 503 // HFiles have been splitted, there is TMP_DIR 504 assertTrue(fs.exists(tmpPath)); 505 // TMP_DIR should have been cleaned-up 506 assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", 507 FSUtils.listStatus(fs, tmpPath)); 508 assertExpectedTable(connection, table, ROWCOUNT, 2); 509 } 510 } 511 512 /** 513 * This simulates an remote exception which should cause LIHF to exit with an exception. 514 */ 515 @Test(expected = IOException.class) 516 public void testGroupOrSplitFailure() throws Exception { 517 final TableName tableName = TableName.valueOf(name.getMethodName()); 518 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 519 setupTable(connection, tableName, 10); 520 521 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 522 int i = 0; 523 524 @Override 525 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 526 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 527 final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 528 i++; 529 530 if (i == 5) { 531 throw new IOException("failure"); 532 } 533 return super.groupOrSplit(regionGroups, item, table, startEndKeys); 534 } 535 }; 536 537 // create HFiles for different column families 538 Path dir = buildBulkFiles(tableName, 1); 539 try (Table t = connection.getTable(tableName); 540 RegionLocator locator = connection.getRegionLocator(tableName); 541 Admin admin = connection.getAdmin()) { 542 lih.doBulkLoad(dir, admin, t, locator); 543 } 544 } 545 546 fail("doBulkLoad should have thrown an exception"); 547 } 548 549 @Test 550 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { 551 final TableName tableName = TableName.valueOf(name.getMethodName()); 552 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; 553 // Share connection. We were failing to find the table with our new reverse scan because it 554 // looks for first region, not any region -- that is how it works now. The below removes first 555 // region in test. Was reliant on the Connection caching having first region. 556 Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); 557 Table table = connection.getTable(tableName); 558 559 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); 560 Path dir = buildBulkFiles(tableName, 2); 561 562 final AtomicInteger countedLqis = new AtomicInteger(); 563 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 564 565 @Override 566 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 567 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 568 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 569 Pair<List<LoadQueueItem>, String> lqis = 570 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 571 if (lqis != null && lqis.getFirst() != null) { 572 countedLqis.addAndGet(lqis.getFirst().size()); 573 } 574 return lqis; 575 } 576 }; 577 578 // do bulkload when there is no region hole in hbase:meta. 579 try (Table t = connection.getTable(tableName); 580 RegionLocator locator = connection.getRegionLocator(tableName); 581 Admin admin = connection.getAdmin()) { 582 loader.doBulkLoad(dir, admin, t, locator); 583 } catch (Exception e) { 584 LOG.error("exeception=", e); 585 } 586 // check if all the data are loaded into the table. 587 this.assertExpectedTable(tableName, ROWCOUNT, 2); 588 589 dir = buildBulkFiles(tableName, 3); 590 591 // Mess it up by leaving a hole in the hbase:meta 592 List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 593 for (RegionInfo regionInfo : regionInfos) { 594 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 595 MetaTableAccessor.deleteRegionInfo(connection, regionInfo); 596 break; 597 } 598 } 599 600 try (Table t = connection.getTable(tableName); 601 RegionLocator locator = connection.getRegionLocator(tableName); 602 Admin admin = connection.getAdmin()) { 603 loader.doBulkLoad(dir, admin, t, locator); 604 } catch (Exception e) { 605 LOG.error("exception=", e); 606 assertTrue("IOException expected", e instanceof IOException); 607 } 608 609 table.close(); 610 611 // Make sure at least the one region that still exists can be found. 612 regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 613 assertTrue(regionInfos.size() >= 1); 614 615 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); 616 connection.close(); 617 } 618 619 /** 620 * Checks that all columns have the expected value and that there is the expected number of rows. 621 * @throws IOException 622 */ 623 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 624 throws IOException { 625 TableDescriptor htd = util.getAdmin().getDescriptor(table); 626 assertNotNull(htd); 627 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 628 int i = 0; 629 for (Result r; (r = sr.next()) != null;) { 630 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 631 .forEach(v -> assertArrayEquals(value(value), v)); 632 i++; 633 } 634 assertEquals(count, i); 635 } catch (IOException e) { 636 fail("Failed due to exception"); 637 } 638 } 639}