001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Future; 046import java.util.concurrent.LinkedBlockingQueue; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Collectors; 051import org.apache.commons.lang3.mutable.MutableInt; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.conf.Configured; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.TableNotFoundException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.ClientServiceCallable; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.RegionLocator; 069import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 070import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 074import org.apache.hadoop.hbase.io.HFileLink; 075import org.apache.hadoop.hbase.io.HalfStoreFileReader; 076import org.apache.hadoop.hbase.io.Reference; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFileContext; 081import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 083import org.apache.hadoop.hbase.io.hfile.HFileScanner; 084import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 085import org.apache.hadoop.hbase.regionserver.BloomType; 086import org.apache.hadoop.hbase.regionserver.HStore; 087import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 088import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 089import org.apache.hadoop.hbase.security.UserProvider; 090import org.apache.hadoop.hbase.security.token.FsDelegationToken; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.FSHDFSUtils; 093import org.apache.hadoop.hbase.util.FSVisitor; 094import org.apache.hadoop.hbase.util.Pair; 095import org.apache.hadoop.util.Tool; 096import org.apache.hadoop.util.ToolRunner; 097import org.apache.yetus.audience.InterfaceAudience; 098import org.slf4j.Logger; 099import org.slf4j.LoggerFactory; 100 101import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 102import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 103import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 104import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 105import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 106import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 107import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 108 109/** 110 * Tool to load the output of HFileOutputFormat into an existing table. 111 * <p/> 112 * Notice that, by default, this class should be kept till 4.0.0, but as this is a bad practice that 113 * we expose an implementation class instead of an interface, we want to fix it ASAP. That's why we 114 * will remove this class completely in 3.0.0. Please change your code to use 115 * {@link BulkLoadHFiles}. 116 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please 117 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)} 118 * and {@link #run(String, TableName)}, as all the methods other than them will be 119 * removed with no replacement. 120 */ 121@Deprecated 122@InterfaceAudience.Public 123public class LoadIncrementalHFiles extends Configured implements Tool { 124 125 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 126 127 /** 128 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not 129 * depend on this value. 130 */ 131 @Deprecated 132 public static final String NAME = BulkLoadHFilesTool.NAME; 133 static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION; 134 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 135 BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY; 136 private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS; 137 public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY; 138 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = 139 BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY; 140 public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES; 141 142 // We use a '.' prefix which is ignored when walking directory trees 143 // above. It is invalid family name. 144 static final String TMP_DIR = ".tmp"; 145 146 private final int maxFilesPerRegionPerFamily; 147 private final boolean assignSeqIds; 148 149 // Source delegation token 150 private final FsDelegationToken fsDelegationToken; 151 private final UserProvider userProvider; 152 private final int nrThreads; 153 private AtomicInteger numRetries; 154 private final RpcControllerFactory rpcControllerFactory; 155 156 private String bulkToken; 157 158 private List<String> clusterIds = new ArrayList<>(); 159 160 private boolean replicate = true; 161 162 /** 163 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 164 * the case where a region has split during the process of the load. When this happens, the HFile 165 * is split into two physical parts across the new region boundary, and each part is added back 166 * into the queue. The import process finishes when the queue is empty. 167 * @deprecated since 2.2.0 and will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. 168 * @see BulkLoadHFiles 169 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a> 170 */ 171 @InterfaceAudience.Public 172 @Deprecated 173 public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { 174 175 public LoadQueueItem(byte[] family, Path hfilePath) { 176 super(family, hfilePath); 177 } 178 } 179 180 public LoadIncrementalHFiles(Configuration conf) { 181 // make a copy, just to be sure we're not overriding someone else's config 182 super(HBaseConfiguration.create(conf)); 183 conf = getConf(); 184 // disable blockcache for tool invocation, see HBASE-10500 185 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 186 userProvider = UserProvider.instantiate(conf); 187 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 188 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 189 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 190 nrThreads = conf.getInt("hbase.loadincremental.threads.max", 191 Runtime.getRuntime().availableProcessors()); 192 numRetries = new AtomicInteger(0); 193 rpcControllerFactory = new RpcControllerFactory(conf); 194 } 195 196 private void usage() { 197 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 198 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 199 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 200 + "into an hbase table.\n" 201 + "OPTIONS (for other -D options, see source code):\n" 202 + " -D" + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 203 + "table must exist.\n" 204 + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes to ignore unmatched column families.\n" 205 + " -loadTable for when directory of files to load has a depth of 3; target table must " 206 + "exist;\n" 207 + " must be last of the options on command line.\n" 208 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 209 + "documentation.\n"); 210 } 211 212 /** 213 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 214 * passed directory and validates whether the prepared queue has all the valid table column 215 * families in it. 216 * @param hfilesDir directory containing list of hfiles to be loaded into the table 217 * @param table table to which hfiles should be loaded 218 * @param queue queue which needs to be loaded into the table 219 * @param validateHFile if true hfiles will be validated for its format 220 * @throws IOException If any I/O or network error occurred 221 */ 222 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 223 boolean validateHFile) throws IOException { 224 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 225 } 226 227 /** 228 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 229 * passed directory and validates whether the prepared queue has all the valid table column 230 * families in it. 231 * @param hfilesDir directory containing list of hfiles to be loaded into the table 232 * @param table table to which hfiles should be loaded 233 * @param queue queue which needs to be loaded into the table 234 * @param validateHFile if true hfiles will be validated for its format 235 * @param silence true to ignore unmatched column families 236 * @throws IOException If any I/O or network error occurred 237 */ 238 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 239 boolean validateHFile, boolean silence) throws IOException { 240 discoverLoadQueue(queue, hfilesDir, validateHFile); 241 validateFamiliesInHFiles(table, queue, silence); 242 } 243 244 /** 245 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 246 * passed directory and validates whether the prepared queue has all the valid table column 247 * families in it. 248 * @param map map of family to List of hfiles 249 * @param table table to which hfiles should be loaded 250 * @param queue queue which needs to be loaded into the table 251 * @param silence true to ignore unmatched column families 252 * @throws IOException If any I/O or network error occurred 253 */ 254 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 255 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 256 populateLoadQueue(queue, map); 257 validateFamiliesInHFiles(table, queue, silence); 258 } 259 260 /** 261 * Perform a bulk load of the given directory into the given pre-existing table. This method is 262 * not threadsafe. 263 * @param hfofDir the directory that was provided as the output path of a job using 264 * HFileOutputFormat 265 * @param admin the Admin 266 * @param table the table to load into 267 * @param regionLocator region locator 268 * @throws TableNotFoundException if table does not yet exist 269 */ 270 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 271 RegionLocator regionLocator) throws TableNotFoundException, IOException { 272 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 273 } 274 275 /** 276 * Perform a bulk load of the given directory into the given pre-existing table. This method is 277 * not threadsafe. 278 * @param map map of family to List of hfiles 279 * @param admin the Admin 280 * @param table the table to load into 281 * @param regionLocator region locator 282 * @param silence true to ignore unmatched column families 283 * @param copyFile always copy hfiles if true 284 * @throws TableNotFoundException if table does not yet exist 285 */ 286 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 287 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 288 throws TableNotFoundException, IOException { 289 if (!admin.isTableAvailable(regionLocator.getName())) { 290 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 291 } 292 // LQI queue does not need to be threadsafe -- all operations on this queue 293 // happen in this thread 294 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 295 ExecutorService pool = null; 296 SecureBulkLoadClient secureClient = null; 297 try { 298 prepareHFileQueue(map, table, queue, silence); 299 if (queue.isEmpty()) { 300 LOG.warn("Bulk load operation did not get any files to load"); 301 return Collections.emptyMap(); 302 } 303 pool = createExecutorService(); 304 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 305 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 306 } finally { 307 cleanup(admin, queue, pool, secureClient); 308 } 309 } 310 311 /** 312 * Perform a bulk load of the given directory into the given pre-existing table. This method is 313 * not threadsafe. 314 * @param hfofDir the directory that was provided as the output path of a job using 315 * HFileOutputFormat 316 * @param admin the Admin 317 * @param table the table to load into 318 * @param regionLocator region locator 319 * @param silence true to ignore unmatched column families 320 * @param copyFile always copy hfiles if true 321 * @throws TableNotFoundException if table does not yet exist 322 */ 323 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 324 RegionLocator regionLocator, boolean silence, boolean copyFile) 325 throws TableNotFoundException, IOException { 326 if (!admin.isTableAvailable(regionLocator.getName())) { 327 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 328 } 329 330 /* 331 * Checking hfile format is a time-consuming operation, we should have an option to skip this 332 * step when bulkloading millions of HFiles. See HBASE-13985. 333 */ 334 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 335 if (!validateHFile) { 336 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 337 "are not correct. If you fail to read data from your table after using this " + 338 "option, consider removing the files and bulkload again without this option. " + 339 "See HBASE-13985"); 340 } 341 // LQI queue does not need to be threadsafe -- all operations on this queue 342 // happen in this thread 343 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 344 ExecutorService pool = null; 345 SecureBulkLoadClient secureClient = null; 346 try { 347 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 348 349 if (queue.isEmpty()) { 350 LOG.warn( 351 "Bulk load operation did not find any files to load in directory {}. " + 352 "Does it contain files in subdirectories that correspond to column family names?", 353 (hfofDir != null ? hfofDir.toUri().toString() : "")); 354 return Collections.emptyMap(); 355 } 356 pool = createExecutorService(); 357 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 358 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 359 } finally { 360 cleanup(admin, queue, pool, secureClient); 361 } 362 } 363 364 /** 365 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 366 * <ol> 367 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 368 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 369 * </li> 370 * </ol> 371 * @param table Table to which these hfiles should be loaded to 372 * @param conn Connection to use 373 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 374 * @param startEndKeys starting and ending row keys of the region 375 */ 376 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 377 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 378 loadHFileQueue(table, conn, queue, startEndKeys, false); 379 } 380 381 /** 382 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 383 * <ol> 384 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 385 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 386 * </li> 387 * </ol> 388 * @param table Table to which these hfiles should be loaded to 389 * @param conn Connection to use 390 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 391 * @param startEndKeys starting and ending row keys of the region 392 */ 393 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 394 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 395 ExecutorService pool = null; 396 try { 397 pool = createExecutorService(); 398 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 399 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 400 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 401 } finally { 402 if (pool != null) { 403 pool.shutdown(); 404 } 405 } 406 } 407 408 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 409 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 410 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 411 int count = 0; 412 413 if (isSecureBulkLoadEndpointAvailable()) { 414 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 415 LOG.warn("Secure bulk load has been integrated into HBase core."); 416 } 417 418 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 419 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 420 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 421 422 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 423 // Assumes that region splits can happen while this occurs. 424 while (!queue.isEmpty()) { 425 // need to reload split keys each iteration. 426 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 427 if (count != 0) { 428 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " + 429 queue.size() + " files remaining to group or split"); 430 } 431 432 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 433 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 434 if (maxRetries != 0 && count >= maxRetries) { 435 throw new IOException( 436 "Retry attempted " + count + " times without completing, bailing out"); 437 } 438 count++; 439 440 // Using ByteBuffer for byte[] equality semantics 441 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 442 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 443 444 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 445 // Error is logged inside checkHFilesCountPerRegionPerFamily. 446 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + 447 " hfiles to one family of one region"); 448 } 449 450 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 451 item2RegionMap); 452 453 // NOTE: The next iteration's split / group could happen in parallel to 454 // atomic bulkloads assuming that there are splits and no merges, and 455 // that we can atomically pull out the groups we want to retry. 456 } 457 458 if (!queue.isEmpty()) { 459 throw new RuntimeException("Bulk load aborted with some files not yet loaded." + 460 "Please check log for more details."); 461 } 462 return item2RegionMap; 463 } 464 465 /** 466 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 467 * re-queued for another pass with the groupOrSplitPhase. 468 * <p> 469 * protected for testing. 470 */ 471 @VisibleForTesting 472 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 473 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 474 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 475 // atomically bulk load the groups. 476 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 477 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 478 .entrySet()) { 479 byte[] first = e.getKey().array(); 480 Collection<LoadQueueItem> lqis = e.getValue(); 481 482 ClientServiceCallable<byte[]> serviceCallable = 483 buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); 484 485 Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { 486 @Override 487 public List<LoadQueueItem> call() throws Exception { 488 List<LoadQueueItem> toRetry = 489 tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); 490 return toRetry; 491 } 492 }; 493 if (item2RegionMap != null) { 494 for (LoadQueueItem lqi : lqis) { 495 item2RegionMap.put(lqi, e.getKey()); 496 } 497 } 498 loadingFutures.add(pool.submit(call)); 499 } 500 501 // get all the results. 502 for (Future<List<LoadQueueItem>> future : loadingFutures) { 503 try { 504 List<LoadQueueItem> toRetry = future.get(); 505 506 if (item2RegionMap != null) { 507 for (LoadQueueItem lqi : toRetry) { 508 item2RegionMap.remove(lqi); 509 } 510 } 511 // LQIs that are requeued to be regrouped. 512 queue.addAll(toRetry); 513 514 } catch (ExecutionException e1) { 515 Throwable t = e1.getCause(); 516 if (t instanceof IOException) { 517 // At this point something unrecoverable has happened. 518 // TODO Implement bulk load recovery 519 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 520 } 521 LOG.error("Unexpected execution exception during bulk load", e1); 522 throw new IllegalStateException(t); 523 } catch (InterruptedException e1) { 524 LOG.error("Unexpected interrupted exception during bulk load", e1); 525 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 526 } 527 } 528 } 529 530 @VisibleForTesting 531 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 532 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 533 List<Pair<byte[], String>> famPaths = 534 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 535 .collect(Collectors.toList()); 536 return new ClientServiceCallable<byte[]>(conn, tableName, first, 537 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 538 @Override 539 protected byte[] rpcCall() throws Exception { 540 SecureBulkLoadClient secureClient = null; 541 boolean success = false; 542 try { 543 if (LOG.isDebugEnabled()) { 544 LOG.debug("Going to connect to server " + getLocation() + " for row " + 545 Bytes.toStringBinary(getRow()) + " with hfile group " + 546 LoadIncrementalHFiles.this.toString(famPaths)); 547 } 548 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 549 try (Table table = conn.getTable(getTableName())) { 550 secureClient = new SecureBulkLoadClient(getConf(), table); 551 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 552 assignSeqIds, fsDelegationToken.getUserToken(), 553 bulkToken, copyFile, clusterIds, replicate); 554 } 555 return success ? regionName : null; 556 } finally { 557 // Best effort copying of files that might not have been imported 558 // from the staging directory back to original location 559 // in user directory 560 if (secureClient != null && !success) { 561 FileSystem targetFs = FileSystem.get(getConf()); 562 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 563 // Check to see if the source and target filesystems are the same 564 // If they are the same filesystem, we will try move the files back 565 // because previously we moved them to the staging directory. 566 if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 567 for (Pair<byte[], String> el : famPaths) { 568 Path hfileStagingPath = null; 569 Path hfileOrigPath = new Path(el.getSecond()); 570 try { 571 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 572 hfileOrigPath.getName()); 573 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 574 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 575 } else if (targetFs.exists(hfileStagingPath)) { 576 LOG.debug( 577 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 578 } 579 } catch (Exception ex) { 580 LOG.debug( 581 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 582 } 583 } 584 } 585 } 586 } 587 } 588 }; 589 } 590 591 private boolean checkHFilesCountPerRegionPerFamily( 592 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 593 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 594 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 595 for (LoadQueueItem lqi : e.getValue()) { 596 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 597 count.increment(); 598 if (count.intValue() > maxFilesPerRegionPerFamily) { 599 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + 600 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + 601 " of region with start key " + Bytes.toStringBinary(e.getKey())); 602 return false; 603 } 604 } 605 } 606 return true; 607 } 608 609 /** 610 * @param table the table to load into 611 * @param pool the ExecutorService 612 * @param queue the queue for LoadQueueItem 613 * @param startEndKeys start and end keys 614 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 615 */ 616 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 617 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 618 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 619 // <region start key, LQI> need synchronized only within this scope of this 620 // phase because of the puts that happen in futures. 621 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 622 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 623 Set<String> missingHFiles = new HashSet<>(); 624 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 625 new Pair<>(regionGroups, missingHFiles); 626 627 // drain LQIs and figure out bulk load groups 628 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 629 while (!queue.isEmpty()) { 630 final LoadQueueItem item = queue.remove(); 631 632 final Callable<Pair<List<LoadQueueItem>, String>> call = 633 new Callable<Pair<List<LoadQueueItem>, String>>() { 634 @Override 635 public Pair<List<LoadQueueItem>, String> call() throws Exception { 636 Pair<List<LoadQueueItem>, String> splits = 637 groupOrSplit(regionGroups, item, table, startEndKeys); 638 return splits; 639 } 640 }; 641 splittingFutures.add(pool.submit(call)); 642 } 643 // get all the results. All grouping and splitting must finish before 644 // we can attempt the atomic loads. 645 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 646 try { 647 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 648 if (splits != null) { 649 if (splits.getFirst() != null) { 650 queue.addAll(splits.getFirst()); 651 } else { 652 missingHFiles.add(splits.getSecond()); 653 } 654 } 655 } catch (ExecutionException e1) { 656 Throwable t = e1.getCause(); 657 if (t instanceof IOException) { 658 LOG.error("IOException during splitting", e1); 659 throw (IOException) t; // would have been thrown if not parallelized, 660 } 661 LOG.error("Unexpected execution exception during splitting", e1); 662 throw new IllegalStateException(t); 663 } catch (InterruptedException e1) { 664 LOG.error("Unexpected interrupted exception during splitting", e1); 665 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 666 } 667 } 668 return pair; 669 } 670 671 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 672 byte[] startKey, byte[] splitKey) throws IOException { 673 Path hfilePath = item.getFilePath(); 674 byte[] family = item.getFamily(); 675 Path tmpDir = hfilePath.getParent(); 676 if (!tmpDir.getName().equals(TMP_DIR)) { 677 tmpDir = new Path(tmpDir, TMP_DIR); 678 } 679 680 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 681 682 String uniqueName = getUniqueName(); 683 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 684 685 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 686 Path topOut = new Path(tmpDir, uniqueName + ".top"); 687 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 688 689 FileSystem fs = tmpDir.getFileSystem(getConf()); 690 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 691 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 692 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 693 694 // Add these back at the *front* of the queue, so there's a lower 695 // chance that the region will just split again before we get there. 696 List<LoadQueueItem> lqis = new ArrayList<>(2); 697 lqis.add(new LoadQueueItem(family, botOut)); 698 lqis.add(new LoadQueueItem(family, topOut)); 699 700 // If the current item is already the result of previous splits, 701 // we don't need it anymore. Clean up to save space. 702 // It is not part of the original input files. 703 try { 704 if (tmpDir.getName().equals(TMP_DIR)) { 705 fs.delete(hfilePath, false); 706 } 707 } catch (IOException e) { 708 LOG.warn("Unable to delete temporary split file " + hfilePath); 709 } 710 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 711 return lqis; 712 } 713 714 /** 715 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 716 * no longer fits into a region, physically splits the hfile such that the new bottom half will 717 * fit and returns the list of LQI's corresponding to the resultant hfiles. 718 * <p> 719 * protected for testing 720 * @throws IOException if an IO failure is encountered 721 */ 722 @VisibleForTesting 723 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 724 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 725 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 726 Path hfilePath = item.getFilePath(); 727 Optional<byte[]> first, last; 728 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 729 CacheConfig.DISABLED, true, getConf())) { 730 hfr.loadFileInfo(); 731 first = hfr.getFirstRowKey(); 732 last = hfr.getLastRowKey(); 733 } catch (FileNotFoundException fnfe) { 734 LOG.debug("encountered", fnfe); 735 return new Pair<>(null, hfilePath.getName()); 736 } 737 738 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) + 739 " last=" + last.map(Bytes::toStringBinary)); 740 if (!first.isPresent() || !last.isPresent()) { 741 assert !first.isPresent() && !last.isPresent(); 742 // TODO what if this is due to a bad HFile? 743 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 744 return null; 745 } 746 if (Bytes.compareTo(first.get(), last.get()) > 0) { 747 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) + 748 " > " + Bytes.toStringBinary(last.get())); 749 } 750 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR); 751 if (idx < 0) { 752 // not on boundary, returns -(insertion index). Calculate region it 753 // would be in. 754 idx = -(idx + 1) - 1; 755 } 756 int indexForCallable = idx; 757 758 /** 759 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 760 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 761 * region. 3) if the endkey of the last region is not empty. 762 */ 763 if (indexForCallable < 0) { 764 throw new IOException("The first region info for table " + table.getName() + 765 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 766 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && 767 !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { 768 throw new IOException("The last region info for table " + table.getName() + 769 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 770 } else if (indexForCallable + 1 < startEndKeys.getFirst().length && 771 !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], 772 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { 773 throw new IOException("The endkey of one region for table " + table.getName() + 774 " is not equal to the startkey of the next region in hbase:meta." + 775 "Please use hbck tool to fix it first."); 776 } 777 778 boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 || 779 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); 780 if (!lastKeyInRange) { 781 List<LoadQueueItem> lqis = splitStoreFile(item, table, 782 startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); 783 return new Pair<>(lqis, null); 784 } 785 786 // group regions. 787 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); 788 return null; 789 } 790 791 /** 792 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 793 * hfiles that need to be retried. If it is successful it will return an empty list. 794 * <p> 795 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 796 * and fails atomically. 797 * <p> 798 * Protected for testing. 799 * @return empty list if success, list of items to retry on recoverable failure 800 */ 801 @VisibleForTesting 802 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 803 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 804 throws IOException { 805 List<LoadQueueItem> toRetry = new ArrayList<>(); 806 try { 807 Configuration conf = getConf(); 808 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 809 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 810 if (region == null) { 811 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + 812 " into table " + tableName + " with files " + lqis + 813 " failed. This is recoverable and they will be retried."); 814 toRetry.addAll(lqis); // return lqi's to retry 815 } 816 // success 817 return toRetry; 818 } catch (IOException e) { 819 LOG.error("Encountered unrecoverable error from region server, additional details: " + 820 serviceCallable.getExceptionMessageAdditionalDetail(), 821 e); 822 LOG.warn( 823 "Received a " + e.getClass().getSimpleName() 824 + " from region server: " 825 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 826 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 827 && numRetries.get() < getConf().getInt( 828 HConstants.HBASE_CLIENT_RETRIES_NUMBER, 829 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 830 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 831 + numRetries.incrementAndGet()); 832 toRetry.addAll(lqis); 833 return toRetry; 834 } 835 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 836 throw e; 837 } 838 } 839 840 /** 841 * If the table is created for the first time, then "completebulkload" reads the files twice. More 842 * modifications necessary if we want to avoid doing it. 843 */ 844 private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { 845 final FileSystem fs = hfofDir.getFileSystem(getConf()); 846 847 // Add column families 848 // Build a set of keys 849 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 850 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 851 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 852 @Override 853 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 854 ColumnFamilyDescriptorBuilder builder = 855 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 856 familyBuilders.add(builder); 857 return builder; 858 } 859 860 @Override 861 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 862 throws IOException { 863 Path hfile = hfileStatus.getPath(); 864 try (HFile.Reader reader = 865 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 866 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 867 builder.setCompressionType(reader.getFileContext().getCompression()); 868 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + 869 " for family " + builder.getNameAsString()); 870 } 871 reader.loadFileInfo(); 872 byte[] first = reader.getFirstRowKey().get(); 873 byte[] last = reader.getLastRowKey().get(); 874 875 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + 876 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 877 878 // To eventually infer start key-end key boundaries 879 Integer value = map.containsKey(first) ? map.get(first) : 0; 880 map.put(first, value + 1); 881 882 value = map.containsKey(last) ? map.get(last) : 0; 883 map.put(last, value - 1); 884 } 885 } 886 }); 887 888 byte[][] keys = inferBoundaries(map); 889 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 890 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 891 .forEachOrdered(tdBuilder::setColumnFamily); 892 admin.createTable(tdBuilder.build(), keys); 893 894 LOG.info("Table " + tableName + " is available!!"); 895 } 896 897 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 898 SecureBulkLoadClient secureClient) throws IOException { 899 fsDelegationToken.releaseDelegationToken(); 900 if (bulkToken != null && secureClient != null) { 901 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 902 } 903 if (pool != null) { 904 pool.shutdown(); 905 } 906 if (!queue.isEmpty()) { 907 StringBuilder err = new StringBuilder(); 908 err.append("-------------------------------------------------\n"); 909 err.append("Bulk load aborted with some files not yet loaded:\n"); 910 err.append("-------------------------------------------------\n"); 911 for (LoadQueueItem q : queue) { 912 err.append(" ").append(q.getFilePath()).append('\n'); 913 } 914 LOG.error(err.toString()); 915 } 916 } 917 918 // unique file name for the table 919 private String getUniqueName() { 920 return UUID.randomUUID().toString().replaceAll("-", ""); 921 } 922 923 /** 924 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 925 */ 926 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 927 throws IOException { 928 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 929 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 930 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 931 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 932 if (unmatchedFamilies.size() > 0) { 933 String msg = 934 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + 935 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + 936 familyNames; 937 LOG.error(msg); 938 if (!silence) { 939 throw new IOException(msg); 940 } 941 } 942 } 943 944 /** 945 * Populate the Queue with given HFiles 946 */ 947 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 948 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 949 } 950 951 /** 952 * Walk the given directory for all HFiles, and return a Queue containing all such files. 953 */ 954 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 955 final boolean validateHFile) throws IOException { 956 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 957 @Override 958 public byte[] bulkFamily(final byte[] familyName) { 959 return familyName; 960 } 961 962 @Override 963 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 964 long length = hfile.getLen(); 965 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 966 HConstants.DEFAULT_MAX_FILE_SIZE)) { 967 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + 968 " bytes can be problematic as it may lead to oversplitting."); 969 } 970 ret.add(new LoadQueueItem(family, hfile.getPath())); 971 } 972 }, validateHFile); 973 } 974 975 private interface BulkHFileVisitor<TFamily> { 976 977 TFamily bulkFamily(byte[] familyName) throws IOException; 978 979 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 980 } 981 982 /** 983 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 984 * non-valid hfiles. 985 */ 986 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 987 final BulkHFileVisitor<TFamily> visitor) throws IOException { 988 visitBulkHFiles(fs, bulkDir, visitor, true); 989 } 990 991 /** 992 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 993 * skip non-valid hfiles by default, or skip this validation by setting 994 * 'hbase.loadincremental.validate.hfile' to false. 995 */ 996 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 997 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 998 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 999 for (FileStatus familyStat : familyDirStatuses) { 1000 if (!familyStat.isDirectory()) { 1001 LOG.warn("Skipping non-directory " + familyStat.getPath()); 1002 continue; 1003 } 1004 Path familyDir = familyStat.getPath(); 1005 byte[] familyName = Bytes.toBytes(familyDir.getName()); 1006 // Skip invalid family 1007 try { 1008 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 1009 } catch (IllegalArgumentException e) { 1010 LOG.warn("Skipping invalid " + familyStat.getPath()); 1011 continue; 1012 } 1013 TFamily family = visitor.bulkFamily(familyName); 1014 1015 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1016 for (FileStatus hfileStatus : hfileStatuses) { 1017 if (!fs.isFile(hfileStatus.getPath())) { 1018 LOG.warn("Skipping non-file " + hfileStatus); 1019 continue; 1020 } 1021 1022 Path hfile = hfileStatus.getPath(); 1023 // Skip "_", reference, HFileLink 1024 String fileName = hfile.getName(); 1025 if (fileName.startsWith("_")) { 1026 continue; 1027 } 1028 if (StoreFileInfo.isReference(fileName)) { 1029 LOG.warn("Skipping reference " + fileName); 1030 continue; 1031 } 1032 if (HFileLink.isHFileLink(fileName)) { 1033 LOG.warn("Skipping HFileLink " + fileName); 1034 continue; 1035 } 1036 1037 // Validate HFile Format if needed 1038 if (validateHFile) { 1039 try { 1040 if (!HFile.isHFileFormat(fs, hfile)) { 1041 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1042 continue; 1043 } 1044 } catch (FileNotFoundException e) { 1045 LOG.warn("the file " + hfile + " was removed"); 1046 continue; 1047 } 1048 } 1049 1050 visitor.bulkHFile(family, hfileStatus); 1051 } 1052 } 1053 } 1054 1055 // Initialize a thread pool 1056 private ExecutorService createExecutorService() { 1057 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1058 new LinkedBlockingQueue<>(), 1059 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1060 pool.allowCoreThreadTimeOut(true); 1061 return pool; 1062 } 1063 1064 private final String toString(List<Pair<byte[], String>> list) { 1065 StringBuilder sb = new StringBuilder(); 1066 sb.append('['); 1067 list.forEach(p -> { 1068 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1069 .append('}'); 1070 }); 1071 sb.append(']'); 1072 return sb.toString(); 1073 } 1074 1075 private boolean isSecureBulkLoadEndpointAvailable() { 1076 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1077 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1078 } 1079 1080 /** 1081 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1082 * filters, etc. 1083 */ 1084 @VisibleForTesting 1085 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1086 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1087 // Open reader with no block cache, and not in-memory 1088 Reference topReference = Reference.createTopReference(splitKey); 1089 Reference bottomReference = Reference.createBottomReference(splitKey); 1090 1091 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1092 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1093 } 1094 1095 /** 1096 * Copy half of an HFile into a new HFile. 1097 */ 1098 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1099 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1100 FileSystem fs = inFile.getFileSystem(conf); 1101 CacheConfig cacheConf = CacheConfig.DISABLED; 1102 HalfStoreFileReader halfReader = null; 1103 StoreFileWriter halfWriter = null; 1104 try { 1105 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, 1106 new AtomicInteger(0), true, conf); 1107 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1108 1109 int blocksize = familyDescriptor.getBlocksize(); 1110 Algorithm compression = familyDescriptor.getCompressionType(); 1111 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1112 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1113 .withChecksumType(HStore.getChecksumType(conf)) 1114 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1115 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1116 .build(); 1117 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1118 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1119 HFileScanner scanner = halfReader.getScanner(false, false, false); 1120 scanner.seekTo(); 1121 do { 1122 halfWriter.append(scanner.getCell()); 1123 } while (scanner.next()); 1124 1125 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1126 if (shouldCopyHFileMetaKey(entry.getKey())) { 1127 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1128 } 1129 } 1130 } finally { 1131 if (halfReader != null) { 1132 try { 1133 halfReader.close(cacheConf.shouldEvictOnClose()); 1134 } catch (IOException e) { 1135 LOG.warn("failed to close hfile reader for " + inFile, e); 1136 } 1137 } 1138 if (halfWriter != null) { 1139 halfWriter.close(); 1140 } 1141 1142 } 1143 } 1144 1145 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1146 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1147 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1148 return false; 1149 } 1150 1151 return !HFile.isReservedFileInfoKey(key); 1152 } 1153 1154 private boolean isCreateTable() { 1155 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1156 } 1157 1158 private boolean isSilence() { 1159 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1160 } 1161 1162 private boolean isAlwaysCopyFiles() { 1163 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1164 } 1165 1166 protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName) 1167 throws IOException { 1168 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1169 Admin admin = connection.getAdmin()) { 1170 if (!admin.tableExists(tableName)) { 1171 if (isCreateTable()) { 1172 createTable(tableName, hfofDir, admin); 1173 } else { 1174 String errorMsg = format("Table '%s' does not exist.", tableName); 1175 LOG.error(errorMsg); 1176 throw new TableNotFoundException(errorMsg); 1177 } 1178 } 1179 try (Table table = connection.getTable(tableName); 1180 RegionLocator locator = connection.getRegionLocator(tableName)) { 1181 return doBulkLoad(hfofDir, admin, table, locator, isSilence(), 1182 isAlwaysCopyFiles()); 1183 } 1184 } 1185 } 1186 /** 1187 * Perform bulk load on the given table. 1188 * @param hfofDir the directory that was provided as the output path of a job using 1189 * HFileOutputFormat 1190 * @param tableName the table to load into 1191 */ 1192 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1193 throws IOException { 1194 return run(new Path(hfofDir), tableName); 1195 } 1196 1197 /** 1198 * Perform bulk load on the given table. 1199 * @param family2Files map of family to List of hfiles 1200 * @param tableName the table to load into 1201 */ 1202 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1203 TableName tableName) throws IOException { 1204 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1205 Admin admin = connection.getAdmin()) { 1206 if (!admin.tableExists(tableName)) { 1207 String errorMsg = format("Table '%s' does not exist.", tableName); 1208 LOG.error(errorMsg); 1209 throw new TableNotFoundException(errorMsg); 1210 } 1211 try (Table table = connection.getTable(tableName); 1212 RegionLocator locator = connection.getRegionLocator(tableName)) { 1213 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1214 } 1215 } 1216 } 1217 1218 @Override 1219 public int run(String[] args) throws Exception { 1220 if (args.length != 2 && args.length != 3) { 1221 usage(); 1222 return -1; 1223 } 1224 String dirPath = args[0]; 1225 TableName tableName = TableName.valueOf(args[1]); 1226 1227 1228 if (args.length == 2) { 1229 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1230 } else { 1231 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1232 FileSystem fs = FileSystem.get(getConf()); 1233 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1234 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1235 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1236 byte[] familyName = Bytes.toBytes(family); 1237 if (family2Files.containsKey(familyName)) { 1238 family2Files.get(familyName).add(path); 1239 } else { 1240 family2Files.put(familyName, Lists.newArrayList(path)); 1241 } 1242 }); 1243 } 1244 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1245 } 1246 1247 } 1248 1249 public static void main(String[] args) throws Exception { 1250 Configuration conf = HBaseConfiguration.create(); 1251 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1252 System.exit(ret); 1253 } 1254 1255 /** 1256 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1257 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1258 * property. This directory is used as a temporary directory where all files are initially 1259 * copied/moved from user given directory, set all the required file permissions and then from 1260 * their it is finally loaded into a table. This should be set only when, one would like to manage 1261 * the staging directory by itself. Otherwise this tool will handle this by itself. 1262 * @param stagingDir staging directory path 1263 */ 1264 public void setBulkToken(String stagingDir) { 1265 this.bulkToken = stagingDir; 1266 } 1267 1268 public void setClusterIds(List<String> clusterIds) { 1269 this.clusterIds = clusterIds; 1270 } 1271 1272 /** 1273 * Disables replication for these bulkloaded files. 1274 */ 1275 public void disableReplication(){ 1276 this.replicate = false; 1277 } 1278 /** 1279 * Infers region boundaries for a new table. 1280 * <p> 1281 * Parameter: <br> 1282 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1283 * <ul> 1284 * <li>If a key is a start key of a file, then it maps to +1</li> 1285 * <li>If a key is an end key of a file, then it maps to -1</li> 1286 * </ul> 1287 * <p> 1288 * Algo:<br> 1289 * <ol> 1290 * <li>Poll on the keys in order: 1291 * <ol type="a"> 1292 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1293 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1294 * boundary list.</li> 1295 * </ol> 1296 * </li> 1297 * <li>Return the boundary list.</li> 1298 * </ol> 1299 */ 1300 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1301 List<byte[]> keysArray = new ArrayList<>(); 1302 int runningValue = 0; 1303 byte[] currStartKey = null; 1304 boolean firstBoundary = true; 1305 1306 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1307 if (runningValue == 0) { 1308 currStartKey = item.getKey(); 1309 } 1310 runningValue += item.getValue(); 1311 if (runningValue == 0) { 1312 if (!firstBoundary) { 1313 keysArray.add(currStartKey); 1314 } 1315 firstBoundary = false; 1316 } 1317 } 1318 1319 return keysArray.toArray(new byte[0][]); 1320 } 1321}