001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertNotNull; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertThrows; 025import static org.junit.jupiter.api.Assertions.assertTrue; 026import static org.junit.jupiter.api.Assertions.fail; 027import static org.mockito.ArgumentMatchers.any; 028import static org.mockito.ArgumentMatchers.anyBoolean; 029import static org.mockito.ArgumentMatchers.anyList; 030import static org.mockito.Mockito.doReturn; 031import static org.mockito.Mockito.spy; 032 033import java.io.IOException; 034import java.nio.ByteBuffer; 035import java.util.Deque; 036import java.util.List; 037import java.util.Map; 038import java.util.concurrent.CompletableFuture; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.stream.IntStream; 041import org.apache.hadoop.conf.Configuration; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.TableExistsException; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.Admin; 049import org.apache.hadoop.hbase.client.AsyncClusterConnection; 050import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 051import org.apache.hadoop.hbase.client.Connection; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.Result; 054import org.apache.hadoop.hbase.client.ResultScanner; 055import org.apache.hadoop.hbase.client.Scan; 056import org.apache.hadoop.hbase.client.Table; 057import org.apache.hadoop.hbase.client.TableDescriptor; 058import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 059import org.apache.hadoop.hbase.regionserver.HRegionServer; 060import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.Pair; 064import org.junit.jupiter.api.AfterAll; 065import org.junit.jupiter.api.Test; 066import org.junit.jupiter.api.TestInfo; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 071 072import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 073 074/** 075 * Test cases for the atomic load error handling of the bulk load functionality. 076 */ 077public class BulkLoadHFilesSplitRecoveryTestBase { 078 079 private static final Logger LOG = 080 LoggerFactory.getLogger(BulkLoadHFilesSplitRecoveryTestBase.class); 081 082 static HBaseTestingUtil util; 083 // used by secure subclass 084 static boolean useSecure = false; 085 086 final static int NUM_CFS = 10; 087 final static byte[] QUAL = Bytes.toBytes("qual"); 088 final static int ROWCOUNT = 100; 089 090 private final static byte[][] families = new byte[NUM_CFS][]; 091 092 static { 093 for (int i = 0; i < NUM_CFS; i++) { 094 families[i] = Bytes.toBytes(family(i)); 095 } 096 } 097 098 static byte[] rowkey(int i) { 099 return Bytes.toBytes(String.format("row_%08d", i)); 100 } 101 102 static String family(int i) { 103 return String.format("family_%04d", i); 104 } 105 106 static byte[] value(int i) { 107 return Bytes.toBytes(String.format("%010d", i)); 108 } 109 110 public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException { 111 byte[] val = value(value); 112 for (int i = 0; i < NUM_CFS; i++) { 113 Path testIn = new Path(dir, family(i)); 114 115 TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i), 116 Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT); 117 } 118 } 119 120 private TableDescriptor createTableDesc(TableName name, int cfs) { 121 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name); 122 IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i))) 123 .forEachOrdered(builder::setColumnFamily); 124 return builder.build(); 125 } 126 127 /** 128 * Creates a table with given table name and specified number of column families if the table does 129 * not already exist. 130 */ 131 private void setupTable(final Connection connection, TableName table, int cfs) 132 throws IOException { 133 try { 134 LOG.info("Creating table " + table); 135 try (Admin admin = connection.getAdmin()) { 136 admin.createTable(createTableDesc(table, cfs)); 137 } 138 } catch (TableExistsException tee) { 139 LOG.info("Table " + table + " already exists"); 140 } 141 } 142 143 /** 144 * Creates a table with given table name,specified number of column families<br> 145 * and splitkeys if the table does not already exist. 146 */ 147 private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS) 148 throws IOException { 149 try { 150 LOG.info("Creating table " + table); 151 util.createTable(createTableDesc(table, cfs), SPLIT_KEYS); 152 } catch (TableExistsException tee) { 153 LOG.info("Table " + table + " already exists"); 154 } 155 } 156 157 private Path buildBulkFiles(TableName table, int value) throws Exception { 158 Path dir = util.getDataTestDirOnTestFS(table.getNameAsString()); 159 Path bulk1 = new Path(dir, table.getNameAsString() + value); 160 FileSystem fs = util.getTestFileSystem(); 161 buildHFiles(fs, bulk1, value); 162 return bulk1; 163 } 164 165 /** 166 * Populate table with known values. 167 */ 168 private void populateTable(final Connection connection, TableName table, int value) 169 throws Exception { 170 // create HFiles for different column families 171 Path dir = buildBulkFiles(table, value); 172 BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir); 173 } 174 175 /** 176 * Split the known table in half. (this is hard coded for this test suite) 177 */ 178 private void forceSplit(TableName table) { 179 try { 180 // need to call regions server to by synchronous but isn't visible. 181 HRegionServer hrs = util.getRSForFirstRegionInTable(table); 182 183 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 184 if (hri.getTable().equals(table)) { 185 util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2)); 186 // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2)); 187 } 188 } 189 190 // verify that split completed. 191 int regions; 192 do { 193 regions = 0; 194 for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) { 195 if (hri.getTable().equals(table)) { 196 regions++; 197 } 198 } 199 if (regions != 2) { 200 LOG.info("Taking some time to complete split..."); 201 Thread.sleep(250); 202 } 203 } while (regions != 2); 204 } catch (IOException e) { 205 e.printStackTrace(); 206 } catch (InterruptedException e) { 207 e.printStackTrace(); 208 } 209 } 210 211 @AfterAll 212 public static void teardownCluster() throws Exception { 213 util.shutdownMiniCluster(); 214 } 215 216 /** 217 * Checks that all columns have the expected value and that there is the expected number of rows. 218 */ 219 void assertExpectedTable(TableName table, int count, int value) throws IOException { 220 TableDescriptor htd = util.getAdmin().getDescriptor(table); 221 assertNotNull(htd); 222 try (Table t = util.getConnection().getTable(table); 223 ResultScanner sr = t.getScanner(new Scan())) { 224 int i = 0; 225 for (Result r; (r = sr.next()) != null;) { 226 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 227 .forEach(v -> assertArrayEquals(value(value), v)); 228 i++; 229 } 230 assertEquals(count, i); 231 } catch (IOException e) { 232 fail("Failed due to exception"); 233 } 234 } 235 236 private static <T> CompletableFuture<T> failedFuture(Throwable error) { 237 CompletableFuture<T> future = new CompletableFuture<>(); 238 future.completeExceptionally(error); 239 return future; 240 } 241 242 private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) { 243 AsyncClusterConnection errConn = spy(conn); 244 doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn).bulkLoad( 245 any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), anyBoolean()); 246 return errConn; 247 } 248 249 /** 250 * Test that shows that exception thrown from the RS side will result in an exception on the 251 * LIHFile client. 252 */ 253 @Test 254 public void testBulkLoadPhaseFailure(TestInfo testInfo) throws Exception { 255 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 256 final AtomicInteger attemptedCalls = new AtomicInteger(); 257 Configuration conf = new Configuration(util.getConfiguration()); 258 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 259 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { 260 261 @Override 262 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 263 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 264 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 265 AsyncClusterConnection c = 266 attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn; 267 super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap); 268 } 269 }; 270 Path dir = buildBulkFiles(table, 1); 271 assertThrows(IOException.class, () -> loader.bulkLoad(table, dir)); 272 } 273 274 /** 275 * Test that shows that exception thrown from the RS side will result in the expected number of 276 * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when 277 * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set 278 */ 279 @Test 280 public void testRetryOnIOException(TestInfo testInfo) throws Exception { 281 TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 282 AtomicInteger calls = new AtomicInteger(0); 283 setupTable(util.getConnection(), table, 10); 284 Configuration conf = new Configuration(util.getConfiguration()); 285 conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); 286 conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true); 287 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) { 288 289 @Override 290 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 291 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 292 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 293 if ( 294 calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 295 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 296 ) { 297 calls.incrementAndGet(); 298 super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles, 299 item2RegionMap); 300 } else { 301 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 302 } 303 } 304 }; 305 Path dir = buildBulkFiles(table, 1); 306 loader.bulkLoad(table, dir); 307 assertEquals(calls.get(), 2); 308 } 309 310 /** 311 * This test exercises the path where there is a split after initial validation but before the 312 * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a 313 * split just before the atomic region load. 314 */ 315 @Test 316 public void testSplitWhileBulkLoadPhase(TestInfo testInfo) throws Exception { 317 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 318 setupTable(util.getConnection(), table, 10); 319 populateTable(util.getConnection(), table, 1); 320 assertExpectedTable(table, ROWCOUNT, 1); 321 322 // Now let's cause trouble. This will occur after checks and cause bulk 323 // files to fail when attempt to atomically import. This is recoverable. 324 final AtomicInteger attemptedCalls = new AtomicInteger(); 325 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 326 327 @Override 328 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 329 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 330 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 331 int i = attemptedCalls.incrementAndGet(); 332 if (i == 1) { 333 // On first attempt force a split. 334 forceSplit(table); 335 } 336 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 337 } 338 }; 339 340 // create HFiles for different column families 341 Path dir = buildBulkFiles(table, 2); 342 loader.bulkLoad(table, dir); 343 344 // check that data was loaded 345 // The three expected attempts are 1) failure because need to split, 2) 346 // load of split top 3) load of split bottom 347 assertEquals(3, attemptedCalls.get()); 348 assertExpectedTable(table, ROWCOUNT, 2); 349 } 350 351 /** 352 * This test splits a table and attempts to bulk load. The bulk import files should be split 353 * before atomically importing. 354 */ 355 @Test 356 public void testGroupOrSplitPresplit(TestInfo testInfo) throws Exception { 357 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 358 setupTable(util.getConnection(), table, 10); 359 populateTable(util.getConnection(), table, 1); 360 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1); 361 forceSplit(table); 362 363 final AtomicInteger countedLqis = new AtomicInteger(); 364 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 365 366 @Override 367 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 368 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 369 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 370 Pair<List<LoadQueueItem>, String> lqis = 371 super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 372 if (lqis != null && lqis.getFirst() != null) { 373 countedLqis.addAndGet(lqis.getFirst().size()); 374 } 375 return lqis; 376 } 377 }; 378 379 // create HFiles for different column families 380 Path dir = buildBulkFiles(table, 2); 381 loader.bulkLoad(table, dir); 382 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); 383 assertEquals(20, countedLqis.get()); 384 } 385 386 @Test 387 public void testCorrectSplitPoint(TestInfo testInfo) throws Exception { 388 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 389 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 390 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 391 Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"), Bytes.toBytes("row_00000070") }; 392 setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS); 393 394 final AtomicInteger bulkloadRpcTimes = new AtomicInteger(); 395 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 396 397 @Override 398 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 399 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 400 boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 401 bulkloadRpcTimes.addAndGet(1); 402 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap); 403 } 404 }; 405 406 Path dir = buildBulkFiles(table, 1); 407 loader.bulkLoad(table, dir); 408 // before HBASE-25281 we need invoke bulkload rpc 8 times 409 assertEquals(4, bulkloadRpcTimes.get()); 410 } 411 412 /** 413 * This test creates a table with many small regions. The bulk load files would be splitted 414 * multiple times before all of them can be loaded successfully. 415 */ 416 @Test 417 public void testSplitTmpFileCleanUp(TestInfo testInfo) throws Exception { 418 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 419 byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"), 420 Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"), 421 Bytes.toBytes("row_00000050") }; 422 setupTableWithSplitkeys(table, 10, SPLIT_KEYS); 423 424 BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration()); 425 426 // create HFiles 427 Path dir = buildBulkFiles(table, 2); 428 loader.bulkLoad(table, dir); 429 // family path 430 Path tmpPath = new Path(dir, family(0)); 431 // TMP_DIR under family path 432 tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR); 433 FileSystem fs = dir.getFileSystem(util.getConfiguration()); 434 // HFiles have been splitted, there is TMP_DIR 435 assertTrue(fs.exists(tmpPath)); 436 // TMP_DIR should have been cleaned-up 437 assertNull(CommonFSUtils.listStatus(fs, tmpPath), 438 BulkLoadHFilesTool.TMP_DIR + " should be empty."); 439 assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2); 440 } 441 442 /** 443 * This simulates an remote exception which should cause LIHF to exit with an exception. 444 */ 445 @Test 446 public void testGroupOrSplitFailure(TestInfo testInfo) throws Exception { 447 final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 448 setupTable(util.getConnection(), tableName, 10); 449 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 450 451 private int i = 0; 452 453 @Override 454 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 455 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 456 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 457 i++; 458 459 if (i == 5) { 460 throw new IOException("failure"); 461 } 462 return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 463 } 464 }; 465 466 // create HFiles for different column families 467 Path dir = buildBulkFiles(tableName, 1); 468 assertThrows(IOException.class, () -> loader.bulkLoad(tableName, dir)); 469 } 470 471 /** 472 * We are testing a split after initial validation but before the atomic bulk load call. We cannot 473 * use presplitting to test this path, so we actually inject a split just before the atomic region 474 * load. However, we will pass null item2RegionMap and that should not affect the bulk load 475 * behavior. 476 */ 477 @Test 478 public void testSplitWhileBulkLoadPhaseWithoutItemMap(TestInfo testInfo) throws Exception { 479 final TableName table = TableName.valueOf(testInfo.getTestMethod().get().getName()); 480 setupTable(util.getConnection(), table, 10); 481 populateTable(util.getConnection(), table, 1); 482 assertExpectedTable(table, ROWCOUNT, 1); 483 484 // Now let's cause trouble. This will occur after checks and cause bulk 485 // files to fail when attempt to atomically import. This is recoverable. 486 final AtomicInteger attemptedCalls = new AtomicInteger(); 487 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) { 488 489 @Override 490 protected void bulkLoadPhase(final AsyncClusterConnection conn, final TableName tableName, 491 final Deque<LoadQueueItem> queue, final Multimap<ByteBuffer, LoadQueueItem> regionGroups, 492 final boolean copyFiles, final Map<LoadQueueItem, ByteBuffer> item2RegionMap) 493 throws IOException { 494 495 int i = attemptedCalls.incrementAndGet(); 496 if (i == 1) { 497 // On first attempt force a split. 498 forceSplit(table); 499 } 500 501 // Passing item2RegionMap null 502 // In the absence of LoadQueueItem, bulk load should work as expected 503 super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); 504 } 505 506 }; 507 508 // create HFiles for different column families 509 Path dir = buildBulkFiles(table, 2); 510 loader.bulkLoad(table, dir); 511 512 // check that data was loaded 513 // The three expected attempts are 1) failure because need to split, 2) 514 // load of split top 3) load of split bottom 515 assertEquals(3, attemptedCalls.get()); 516 assertExpectedTable(table, ROWCOUNT, 2); 517 } 518 519 /** 520 * Checks that all columns have the expected value and that there is the expected number of rows. 521 */ 522 void assertExpectedTable(final Connection connection, TableName table, int count, int value) 523 throws IOException { 524 TableDescriptor htd = util.getAdmin().getDescriptor(table); 525 assertNotNull(htd); 526 try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) { 527 int i = 0; 528 for (Result r; (r = sr.next()) != null;) { 529 r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream()) 530 .forEach(v -> assertArrayEquals(value(value), v)); 531 i++; 532 } 533 assertEquals(count, i); 534 } catch (IOException e) { 535 fail("Failed due to exception"); 536 } 537 } 538}