001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.Assert.assertArrayEquals; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025import static org.junit.Assert.fail; 026 027import java.io.IOException; 028import java.nio.ByteBuffer; 029import java.util.Collection; 030import java.util.Deque; 031import java.util.List; 032import java.util.Map; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.atomic.AtomicInteger; 035import java.util.stream.IntStream; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.fs.FileSystem; 038import org.apache.hadoop.fs.Path; 039import org.apache.hadoop.hbase.HBaseClassTestRule; 040import org.apache.hadoop.hbase.HBaseTestingUtility; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.HRegionLocation; 043import org.apache.hadoop.hbase.MetaTableAccessor; 044import org.apache.hadoop.hbase.ServerName; 045import org.apache.hadoop.hbase.TableExistsException; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.client.Admin; 048import org.apache.hadoop.hbase.client.ClientServiceCallable; 049import org.apache.hadoop.hbase.client.ClusterConnection; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.ConnectionFactory; 053import org.apache.hadoop.hbase.client.RegionInfo; 054import org.apache.hadoop.hbase.client.RegionInfoBuilder; 055import org.apache.hadoop.hbase.client.RegionLocator; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.ResultScanner; 058import org.apache.hadoop.hbase.client.Scan; 059import org.apache.hadoop.hbase.client.Table; 060import org.apache.hadoop.hbase.client.TableDescriptor; 061import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 062import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 063import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 064import org.apache.hadoop.hbase.log.HBaseMarkers; 065import org.apache.hadoop.hbase.regionserver.HRegionServer; 066import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 067import org.apache.hadoop.hbase.testclassification.LargeTests; 068import org.apache.hadoop.hbase.testclassification.MiscTests; 069import org.apache.hadoop.hbase.util.Bytes; 070import org.apache.hadoop.hbase.util.CommonFSUtils; 071import org.apache.hadoop.hbase.util.Pair; 072import org.junit.AfterClass; 073import org.junit.BeforeClass; 074import org.junit.ClassRule; 075import org.junit.Rule; 076import org.junit.Test; 077import org.junit.experimental.categories.Category; 078import org.junit.rules.TestName; 079import org.mockito.Mockito; 080import org.slf4j.Logger; 081import org.slf4j.LoggerFactory; 082 083import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 084import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 085import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 086 087import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 090 091/** 092 * Test cases for the atomic load error handling of the bulk load functionality. 093 */ 094@Category({ MiscTests.class, LargeTests.class }) 095public class TestLoadIncrementalHFilesSplitRecovery { 096 097 @ClassRule 098 public static final HBaseClassTestRule CLASS_RULE = 099 HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class); 100 101 private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class); 102 103 static HBaseTestingUtility util; 104 // used by secure subclass 105 static boolean useSecure = false; 106 107 final static int NUM_CFS = 10; 108 final static byte[] QUAL = Bytes.toBytes("qual"); 109 final static int ROWCOUNT = 100; 110 111 private final static byte[][] families = new byte[NUM_CFS][]; 112 113 @Rule 114 public TestName name = new TestName(); 115 116 static { 117 for (int i = 0; i < NUM_CFS; i++) { 118 families[i] = Bytes.toBytes(family(i)); 119 } 120 } 121 122 static byte[] rowkey(int i) { 123 return Bytes.toBytes(String.format("row_%08d", i)); 124 } 125 126 static String family(int i) { 127 return String.format("family_%04d", i); 128 } 129 130 static byte[] value(int i) { 131 return Bytes.toBytes(String.format("%010d", i)); 132 } 133 134 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 135 byte[] val = value(value); 136 for (int i = 0; i < NUM_CFS; i++) { 137 Path testIn = new Path(dir, family(i)); 138 139 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 140 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 141 } 142 } 143 144 private TableDescriptor createTableDesc(TableName name, int cfs) { 145 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 146 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 147 .forEachOrdered(builder::setColumnFamily); 148 return builder.build(); 149 } 150 151 /** 152 * Creates a table with given table name and specified number of column families if the table does 153 * not already exist. 154 */ 155 private void setupTable(final Connection connection, TableName table, int cfs) 156 throws IOException { 157 try { 158 LOG.info("Creating table " + table); 159 try (Admin admin = connection.getAdmin()) { 160 admin.createTable(createTableDesc(table, cfs)); 161 } 162 } catch (TableExistsException tee) { 163 LOG.info("Table " + table + " already exists"); 164 } 165 } 166 167 /** 168 * Creates a table with given table name,specified number of column families<br> 169 * and splitkeys if the table does not already exist. 170 * @param table 171 * @param cfs 172 * @param SPLIT_KEYS 173 */ 174 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 175 throws IOException { 176 try { 177 LOG.info("Creating table " + table); 178 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 179 } catch (TableExistsException tee) { 180 LOG.info("Table " + table + " already exists"); 181 } 182 } 183 184 private Path buildBulkFiles(TableName table, int value) throws Exception { 185 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 186 Path bulk1 = new Path(dir, table.getNameAsString() + value); 187 FileSystem fs = util.getTestFileSystem(); 188 buildHFiles(fs, bulk1, value); 189 return bulk1; 190 } 191 192 /** 193 * Populate table with known values. 194 */ 195 private void populateTable(final Connection connection, TableName table, int value) 196 throws Exception { 197 // create HFiles for different column families 198 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 199 Path bulk1 = buildBulkFiles(table, value); 200 try (Table t = connection.getTable(table); 201 RegionLocator locator = connection.getRegionLocator(table); 202 Admin admin = connection.getAdmin()) { 203 lih.doBulkLoad(bulk1, admin, t, locator); 204 } 205 } 206 207 /** 208 * Split the known table in half. (this is hard coded for this test suite) 209 */ 210 private void forceSplit(TableName table) { 211 try { 212 // need to call regions server to by synchronous but isn't visible. 213 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 214 215 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 216 if (hri.getTable().equals(table)) { 217 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 218 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 219 } 220 } 221 222 // verify that split completed. 223 int regions; 224 do { 225 regions = 0; 226 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 227 if (hri.getTable().equals(table)) { 228 regions++; 229 } 230 } 231 if (regions != 2) { 232 LOG.info("Taking some time to complete split..."); 233 Thread.sleep(250); 234 } 235 } while (regions != 2); 236 } catch (IOException e) { 237 e.printStackTrace(); 238 } catch (InterruptedException e) { 239 e.printStackTrace(); 240 } 241 } 242 243 @BeforeClass 244 public static void setupCluster() throws Exception { 245 util = new HBaseTestingUtility(); 246 util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 247 util.startMiniCluster(1); 248 } 249 250 @AfterClass 251 public static void teardownCluster() throws Exception { 252 util.shutdownMiniCluster(); 253 } 254 255 /** 256 * Checks that all columns have the expected value and that there is the expected number of rows. 257 * @throws IOException 258 */ 259 void assertExpectedTable(TableName table, int count, int value) throws IOException { 260 TableDescriptor htd = util.getAdmin().getDescriptor(table); 261 assertNotNull(htd); 262 try (Table t = util.getConnection().getTable(table); 263 ResultScanner sr = t.getScanner(new Scan())) { 264 int i = 0; 265 for (Result r; (r = sr.next()) != null;) { 266 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 267 .forEach(v -> assertArrayEquals(value(value), v)); 268 i++; 269 } 270 assertEquals(count, i); 271 } catch (IOException e) { 272 fail("Failed due to exception"); 273 } 274 } 275 276 /** 277 * Test that shows that exception thrown from the RS side will result in an exception on the 278 * LIHFile client. 279 */ 280 @Test(expected = IOException.class) 281 public void testBulkLoadPhaseFailure() throws Exception { 282 final TableName table = TableName.valueOf(name.getMethodName()); 283 final AtomicInteger attmptedCalls = new AtomicInteger(); 284 final AtomicInteger failedCalls = new AtomicInteger(); 285 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 286 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 287 setupTable(connection, table, 10); 288 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 289 @Override 290 protected List<LoadQueueItem> tryAtomicRegionLoad(Connection connection, 291 TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis, 292 boolean copyFile) throws IOException { 293 int i = attmptedCalls.incrementAndGet(); 294 if (i == 1) { 295 Connection errConn; 296 try { 297 errConn = getMockedConnection(util.getConfiguration()); 298 } catch (Exception e) { 299 LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e); 300 throw new RuntimeException("mocking cruft, should never happen"); 301 } 302 failedCalls.incrementAndGet(); 303 return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, true); 304 } 305 306 return super.tryAtomicRegionLoad(connection, tableName, first, lqis, true); 307 } 308 }; 309 try { 310 // create HFiles for different column families 311 Path dir = buildBulkFiles(table, 1); 312 try (Table t = connection.getTable(table); 313 RegionLocator locator = connection.getRegionLocator(table); 314 Admin admin = connection.getAdmin()) { 315 lih.doBulkLoad(dir, admin, t, locator); 316 } 317 } finally { 318 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 319 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); 320 } 321 fail("doBulkLoad should have thrown an exception"); 322 } 323 } 324 325 /** 326 * Test that shows that exception thrown from the RS side will result in the expected number of 327 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 328 * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set 329 */ 330 @Test 331 public void testRetryOnIOException() throws Exception { 332 final TableName table = TableName.valueOf(name.getMethodName()); 333 final AtomicInteger calls = new AtomicInteger(0); 334 final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); 335 util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 336 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); 337 final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 338 @Override 339 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 340 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 341 if (calls.get() < util.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 342 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 343 calls.getAndIncrement(); 344 return new ClientServiceCallable<byte[]>(conn, tableName, first, 345 new RpcControllerFactory(util.getConfiguration()).newController(), 346 HConstants.PRIORITY_UNSET) { 347 @Override 348 public byte[] rpcCall() throws Exception { 349 throw new IOException("Error calling something on RegionServer"); 350 } 351 }; 352 } else { 353 return super.buildClientServiceCallable(conn, tableName, first, lqis, true); 354 } 355 } 356 }; 357 setupTable(conn, table, 10); 358 Path dir = buildBulkFiles(table, 1); 359 lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); 360 assertEquals(calls.get(), 2); 361 util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); 362 } 363 364 private ClusterConnection getMockedConnection(final Configuration conf) 365 throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException { 366 ClusterConnection c = Mockito.mock(ClusterConnection.class); 367 Mockito.when(c.getConfiguration()).thenReturn(conf); 368 Mockito.doNothing().when(c).close(); 369 // Make it so we return a particular location when asked. 370 final HRegionLocation loc = new HRegionLocation(RegionInfoBuilder.FIRST_META_REGIONINFO, 371 ServerName.valueOf("example.org", 1234, 0)); 372 Mockito.when( 373 c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean())) 374 .thenReturn(loc); 375 Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc); 376 ClientProtos.ClientService.BlockingInterface hri = 377 Mockito.mock(ClientProtos.ClientService.BlockingInterface.class); 378 Mockito 379 .when( 380 hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any())) 381 .thenThrow(new ServiceException(new IOException("injecting bulk load error"))); 382 Mockito.when(c.getClient(Mockito.any())).thenReturn(hri); 383 return c; 384 } 385 386 /** 387 * This test exercises the path where there is a split after initial validation but before the 388 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 389 * split just before the atomic region load. 390 */ 391 @Test 392 public void testSplitWhileBulkLoadPhase() throws Exception { 393 final TableName table = TableName.valueOf(name.getMethodName()); 394 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 395 setupTable(connection, table, 10); 396 populateTable(connection, table, 1); 397 assertExpectedTable(table, ROWCOUNT, 1); 398 399 // Now let's cause trouble. This will occur after checks and cause bulk 400 // files to fail when attempt to atomically import. This is recoverable. 401 final AtomicInteger attemptedCalls = new AtomicInteger(); 402 LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) { 403 @Override 404 protected void bulkLoadPhase(final Table htable, final Connection conn, 405 ExecutorService pool, Deque<LoadQueueItem> queue, 406 final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile, 407 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 408 int i = attemptedCalls.incrementAndGet(); 409 if (i == 1) { 410 // On first attempt force a split. 411 forceSplit(table); 412 } 413 super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap); 414 } 415 }; 416 417 // create HFiles for different column families 418 try (Table t = connection.getTable(table); 419 RegionLocator locator = connection.getRegionLocator(table); 420 Admin admin = connection.getAdmin()) { 421 Path bulk = buildBulkFiles(table, 2); 422 lih2.doBulkLoad(bulk, admin, t, locator); 423 } 424 425 // check that data was loaded 426 // The three expected attempts are 1) failure because need to split, 2) 427 // load of split top 3) load of split bottom 428 assertEquals(3, attemptedCalls.get()); 429 assertExpectedTable(table, ROWCOUNT, 2); 430 } 431 } 432 433 /** 434 * This test splits a table and attempts to bulk load. The bulk import files should be split 435 * before atomically importing. 436 */ 437 @Test 438 public void testGroupOrSplitPresplit() throws Exception { 439 final TableName table = TableName.valueOf(name.getMethodName()); 440 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 441 setupTable(connection, table, 10); 442 populateTable(connection, table, 1); 443 assertExpectedTable(connection, table, ROWCOUNT, 1); 444 forceSplit(table); 445 446 final AtomicInteger countedLqis = new AtomicInteger(); 447 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 448 @Override 449 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 450 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 451 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 452 Pair<List<LoadQueueItem>, String> lqis = 453 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 454 if (lqis != null && lqis.getFirst() != null) { 455 countedLqis.addAndGet(lqis.getFirst().size()); 456 } 457 return lqis; 458 } 459 }; 460 461 // create HFiles for different column families 462 Path bulk = buildBulkFiles(table, 2); 463 try (Table t = connection.getTable(table); 464 RegionLocator locator = connection.getRegionLocator(table); 465 Admin admin = connection.getAdmin()) { 466 lih.doBulkLoad(bulk, admin, t, locator); 467 } 468 assertExpectedTable(connection, table, ROWCOUNT, 2); 469 assertEquals(20, countedLqis.get()); 470 } 471 } 472 473 /** 474 * This test creates a table with many small regions. The bulk load files would be splitted 475 * multiple times before all of them can be loaded successfully. 476 */ 477 @Test 478 public void testSplitTmpFileCleanUp() throws Exception { 479 final TableName table = TableName.valueOf(name.getMethodName()); 480 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 481 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 482 Bytes.toBytes("row_00000050") }; 483 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 484 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 485 486 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()); 487 488 // create HFiles 489 Path bulk = buildBulkFiles(table, 2); 490 try (Table t = connection.getTable(table); 491 RegionLocator locator = connection.getRegionLocator(table); 492 Admin admin = connection.getAdmin()) { 493 lih.doBulkLoad(bulk, admin, t, locator); 494 } 495 // family path 496 Path tmpPath = new Path(bulk, family(0)); 497 // TMP_DIR under family path 498 tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR); 499 FileSystem fs = bulk.getFileSystem(util.getConfiguration()); 500 // HFiles have been splitted, there is TMP_DIR 501 assertTrue(fs.exists(tmpPath)); 502 // TMP_DIR should have been cleaned-up 503 assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.", 504 CommonFSUtils.listStatus(fs, tmpPath)); 505 assertExpectedTable(connection, table, ROWCOUNT, 2); 506 } 507 } 508 509 /** 510 * This simulates an remote exception which should cause LIHF to exit with an exception. 511 */ 512 @Test(expected = IOException.class) 513 public void testGroupOrSplitFailure() throws Exception { 514 final TableName tableName = TableName.valueOf(name.getMethodName()); 515 try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) { 516 setupTable(connection, tableName, 10); 517 518 LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { 519 int i = 0; 520 521 @Override 522 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 523 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 524 final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 525 i++; 526 527 if (i == 5) { 528 throw new IOException("failure"); 529 } 530 return super.groupOrSplit(regionGroups, item, table, startEndKeys); 531 } 532 }; 533 534 // create HFiles for different column families 535 Path dir = buildBulkFiles(tableName, 1); 536 try (Table t = connection.getTable(tableName); 537 RegionLocator locator = connection.getRegionLocator(tableName); 538 Admin admin = connection.getAdmin()) { 539 lih.doBulkLoad(dir, admin, t, locator); 540 } 541 } 542 543 fail("doBulkLoad should have thrown an exception"); 544 } 545 546 @Test 547 public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception { 548 final TableName tableName = TableName.valueOf(name.getMethodName()); 549 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") }; 550 // Share connection. We were failing to find the table with our new reverse scan because it 551 // looks for first region, not any region -- that is how it works now. The below removes first 552 // region in test. Was reliant on the Connection caching having first region. 553 Connection connection = ConnectionFactory.createConnection(util.getConfiguration()); 554 Table table = connection.getTable(tableName); 555 556 setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS); 557 Path dir = buildBulkFiles(tableName, 2); 558 559 final AtomicInteger countedLqis = new AtomicInteger(); 560 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { 561 562 @Override 563 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 564 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, 565 final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 566 Pair<List<LoadQueueItem>, String> lqis = 567 super.groupOrSplit(regionGroups, item, htable, startEndKeys); 568 if (lqis != null && lqis.getFirst() != null) { 569 countedLqis.addAndGet(lqis.getFirst().size()); 570 } 571 return lqis; 572 } 573 }; 574 575 // do bulkload when there is no region hole in hbase:meta. 576 try (Table t = connection.getTable(tableName); 577 RegionLocator locator = connection.getRegionLocator(tableName); 578 Admin admin = connection.getAdmin()) { 579 loader.doBulkLoad(dir, admin, t, locator); 580 } catch (Exception e) { 581 LOG.error("exeception=", e); 582 } 583 // check if all the data are loaded into the table. 584 this.assertExpectedTable(tableName, ROWCOUNT, 2); 585 586 dir = buildBulkFiles(tableName, 3); 587 588 // Mess it up by leaving a hole in the hbase:meta 589 List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 590 for (RegionInfo regionInfo : regionInfos) { 591 if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) { 592 MetaTableAccessor.deleteRegionInfo(connection, regionInfo); 593 break; 594 } 595 } 596 597 try (Table t = connection.getTable(tableName); 598 RegionLocator locator = connection.getRegionLocator(tableName); 599 Admin admin = connection.getAdmin()) { 600 loader.doBulkLoad(dir, admin, t, locator); 601 } catch (Exception e) { 602 LOG.error("exception=", e); 603 assertTrue("IOException expected", e instanceof IOException); 604 } 605 606 table.close(); 607 608 // Make sure at least the one region that still exists can be found. 609 regionInfos = MetaTableAccessor.getTableRegions(connection, tableName); 610 assertTrue(regionInfos.size() >= 1); 611 612 this.assertExpectedTable(connection, tableName, ROWCOUNT, 2); 613 connection.close(); 614 } 615 616 /** 617 * Checks that all columns have the expected value and that there is the expected number of rows. 618 * @throws IOException 619 */ 620 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 621 throws IOException { 622 TableDescriptor htd = util.getAdmin().getDescriptor(table); 623 assertNotNull(htd); 624 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 625 int i = 0; 626 for (Result r; (r = sr.next()) != null;) { 627 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 628 .forEach(v -> assertArrayEquals(value(value), v)); 629 i++; 630 } 631 assertEquals(count, i); 632 } catch (IOException e) { 633 fail("Failed due to exception"); 634 } 635 } 636}