001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.ByteBuffer; 024import java.util.ArrayDeque; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Deque; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.Optional; 036import java.util.Set; 037import java.util.SortedMap; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Future; 044import java.util.concurrent.LinkedBlockingQueue; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.stream.Collectors; 049import static java.lang.String.format; 050import org.apache.commons.lang3.mutable.MutableInt; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.conf.Configured; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.permission.FsPermission; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.TableNotFoundException; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.client.ClientServiceCallable; 063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 065import org.apache.hadoop.hbase.client.Connection; 066import org.apache.hadoop.hbase.client.ConnectionFactory; 067import org.apache.hadoop.hbase.client.RegionLocator; 068import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 069import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 072import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 073import org.apache.hadoop.hbase.io.HFileLink; 074import org.apache.hadoop.hbase.io.HalfStoreFileReader; 075import org.apache.hadoop.hbase.io.Reference; 076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 077import org.apache.hadoop.hbase.io.hfile.CacheConfig; 078import org.apache.hadoop.hbase.io.hfile.HFile; 079import org.apache.hadoop.hbase.io.hfile.HFileContext; 080import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 084import org.apache.hadoop.hbase.regionserver.BloomType; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 087import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 088import org.apache.hadoop.hbase.security.UserProvider; 089import org.apache.hadoop.hbase.security.token.FsDelegationToken; 090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 091import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 092import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 093import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 094import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 095import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.FSHDFSUtils; 099import org.apache.hadoop.hbase.util.FSVisitor; 100import org.apache.hadoop.hbase.util.Pair; 101import org.apache.hadoop.util.Tool; 102import org.apache.hadoop.util.ToolRunner; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107/** 108 * Tool to load the output of HFileOutputFormat into an existing table. 109 */ 110@InterfaceAudience.Public 111public class LoadIncrementalHFiles extends Configured implements Tool { 112 113 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 114 115 public static final String NAME = "completebulkload"; 116 static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; 117 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 118 "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; 119 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; 120 public final static String CREATE_TABLE_CONF_KEY = "create.table"; 121 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; 122 public final static String ALWAYS_COPY_FILES = "always.copy.files"; 123 124 // We use a '.' prefix which is ignored when walking directory trees 125 // above. It is invalid family name. 126 static final String TMP_DIR = ".tmp"; 127 128 private final int maxFilesPerRegionPerFamily; 129 private final boolean assignSeqIds; 130 131 // Source delegation token 132 private final FsDelegationToken fsDelegationToken; 133 private final UserProvider userProvider; 134 private final int nrThreads; 135 private AtomicInteger numRetries; 136 private final RpcControllerFactory rpcControllerFactory; 137 138 private String bulkToken; 139 140 private List<String> clusterIds = new ArrayList<>(); 141 142 private boolean replicate = true; 143 144 /** 145 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 146 * the case where a region has split during the process of the load. When this happens, the HFile 147 * is split into two physical parts across the new region boundary, and each part is added back 148 * into the queue. The import process finishes when the queue is empty. 149 */ 150 @InterfaceAudience.Public 151 public static class LoadQueueItem { 152 private final byte[] family; 153 private final Path hfilePath; 154 155 public LoadQueueItem(byte[] family, Path hfilePath) { 156 this.family = family; 157 this.hfilePath = hfilePath; 158 } 159 160 @Override 161 public String toString() { 162 return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); 163 } 164 165 public byte[] getFamily() { 166 return family; 167 } 168 169 public Path getFilePath() { 170 return hfilePath; 171 } 172 } 173 174 public LoadIncrementalHFiles(Configuration conf) { 175 // make a copy, just to be sure we're not overriding someone else's config 176 super(HBaseConfiguration.create(conf)); 177 conf = getConf(); 178 // disable blockcache for tool invocation, see HBASE-10500 179 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 180 userProvider = UserProvider.instantiate(conf); 181 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 182 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 183 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 184 nrThreads = conf.getInt("hbase.loadincremental.threads.max", 185 Runtime.getRuntime().availableProcessors()); 186 numRetries = new AtomicInteger(0); 187 rpcControllerFactory = new RpcControllerFactory(conf); 188 } 189 190 private void usage() { 191 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 192 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 193 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 194 + "into an hbase table.\n" 195 + "OPTIONS (for other -D options, see source code):\n" 196 + " -D" + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 197 + "table must exist.\n" 198 + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes to ignore unmatched column families.\n" 199 + " -loadTable for when directory of files to load has a depth of 3; target table must " 200 + "exist;\n" 201 + " must be last of the options on command line.\n" 202 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 203 + "documentation.\n"); 204 } 205 206 /** 207 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 208 * passed directory and validates whether the prepared queue has all the valid table column 209 * families in it. 210 * @param hfilesDir directory containing list of hfiles to be loaded into the table 211 * @param table table to which hfiles should be loaded 212 * @param queue queue which needs to be loaded into the table 213 * @param validateHFile if true hfiles will be validated for its format 214 * @throws IOException If any I/O or network error occurred 215 */ 216 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 217 boolean validateHFile) throws IOException { 218 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 219 } 220 221 /** 222 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 223 * passed directory and validates whether the prepared queue has all the valid table column 224 * families in it. 225 * @param hfilesDir directory containing list of hfiles to be loaded into the table 226 * @param table table to which hfiles should be loaded 227 * @param queue queue which needs to be loaded into the table 228 * @param validateHFile if true hfiles will be validated for its format 229 * @param silence true to ignore unmatched column families 230 * @throws IOException If any I/O or network error occurred 231 */ 232 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 233 boolean validateHFile, boolean silence) throws IOException { 234 discoverLoadQueue(queue, hfilesDir, validateHFile); 235 validateFamiliesInHFiles(table, queue, silence); 236 } 237 238 /** 239 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 240 * passed directory and validates whether the prepared queue has all the valid table column 241 * families in it. 242 * @param map map of family to List of hfiles 243 * @param table table to which hfiles should be loaded 244 * @param queue queue which needs to be loaded into the table 245 * @param silence true to ignore unmatched column families 246 * @throws IOException If any I/O or network error occurred 247 */ 248 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 249 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 250 populateLoadQueue(queue, map); 251 validateFamiliesInHFiles(table, queue, silence); 252 } 253 254 /** 255 * Perform a bulk load of the given directory into the given pre-existing table. This method is 256 * not threadsafe. 257 * @param hfofDir the directory that was provided as the output path of a job using 258 * HFileOutputFormat 259 * @param admin the Admin 260 * @param table the table to load into 261 * @param regionLocator region locator 262 * @throws TableNotFoundException if table does not yet exist 263 */ 264 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 265 RegionLocator regionLocator) throws TableNotFoundException, IOException { 266 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 267 } 268 269 /** 270 * Perform a bulk load of the given directory into the given pre-existing table. This method is 271 * not threadsafe. 272 * @param map map of family to List of hfiles 273 * @param admin the Admin 274 * @param table the table to load into 275 * @param regionLocator region locator 276 * @param silence true to ignore unmatched column families 277 * @param copyFile always copy hfiles if true 278 * @throws TableNotFoundException if table does not yet exist 279 */ 280 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 281 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 282 throws TableNotFoundException, IOException { 283 if (!admin.isTableAvailable(regionLocator.getName())) { 284 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 285 } 286 // LQI queue does not need to be threadsafe -- all operations on this queue 287 // happen in this thread 288 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 289 ExecutorService pool = null; 290 SecureBulkLoadClient secureClient = null; 291 try { 292 prepareHFileQueue(map, table, queue, silence); 293 if (queue.isEmpty()) { 294 LOG.warn("Bulk load operation did not get any files to load"); 295 return Collections.emptyMap(); 296 } 297 pool = createExecutorService(); 298 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 299 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 300 } finally { 301 cleanup(admin, queue, pool, secureClient); 302 } 303 } 304 305 /** 306 * Perform a bulk load of the given directory into the given pre-existing table. This method is 307 * not threadsafe. 308 * @param hfofDir the directory that was provided as the output path of a job using 309 * HFileOutputFormat 310 * @param admin the Admin 311 * @param table the table to load into 312 * @param regionLocator region locator 313 * @param silence true to ignore unmatched column families 314 * @param copyFile always copy hfiles if true 315 * @throws TableNotFoundException if table does not yet exist 316 */ 317 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 318 RegionLocator regionLocator, boolean silence, boolean copyFile) 319 throws TableNotFoundException, IOException { 320 if (!admin.isTableAvailable(regionLocator.getName())) { 321 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 322 } 323 324 /* 325 * Checking hfile format is a time-consuming operation, we should have an option to skip this 326 * step when bulkloading millions of HFiles. See HBASE-13985. 327 */ 328 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 329 if (!validateHFile) { 330 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 331 "are not correct. If you fail to read data from your table after using this " + 332 "option, consider removing the files and bulkload again without this option. " + 333 "See HBASE-13985"); 334 } 335 // LQI queue does not need to be threadsafe -- all operations on this queue 336 // happen in this thread 337 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 338 ExecutorService pool = null; 339 SecureBulkLoadClient secureClient = null; 340 try { 341 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 342 343 if (queue.isEmpty()) { 344 LOG.warn( 345 "Bulk load operation did not find any files to load in directory {}. " + 346 "Does it contain files in subdirectories that correspond to column family names?", 347 (hfofDir != null ? hfofDir.toUri().toString() : "")); 348 return Collections.emptyMap(); 349 } 350 pool = createExecutorService(); 351 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 352 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 353 } finally { 354 cleanup(admin, queue, pool, secureClient); 355 } 356 } 357 358 /** 359 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 360 * <ol> 361 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 362 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 363 * </li> 364 * </ol> 365 * @param table Table to which these hfiles should be loaded to 366 * @param conn Connection to use 367 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 368 * @param startEndKeys starting and ending row keys of the region 369 */ 370 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 371 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 372 loadHFileQueue(table, conn, queue, startEndKeys, false); 373 } 374 375 /** 376 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 377 * <ol> 378 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 379 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 380 * </li> 381 * </ol> 382 * @param table Table to which these hfiles should be loaded to 383 * @param conn Connection to use 384 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 385 * @param startEndKeys starting and ending row keys of the region 386 */ 387 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 388 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 389 ExecutorService pool = null; 390 try { 391 pool = createExecutorService(); 392 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 393 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 394 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 395 } finally { 396 if (pool != null) { 397 pool.shutdown(); 398 } 399 } 400 } 401 402 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 403 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 404 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 405 int count = 0; 406 407 if (isSecureBulkLoadEndpointAvailable()) { 408 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 409 LOG.warn("Secure bulk load has been integrated into HBase core."); 410 } 411 412 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 413 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 414 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 415 416 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 417 // Assumes that region splits can happen while this occurs. 418 while (!queue.isEmpty()) { 419 // need to reload split keys each iteration. 420 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 421 if (count != 0) { 422 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " + 423 queue.size() + " files remaining to group or split"); 424 } 425 426 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 427 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 428 if (maxRetries != 0 && count >= maxRetries) { 429 throw new IOException( 430 "Retry attempted " + count + " times without completing, bailing out"); 431 } 432 count++; 433 434 // Using ByteBuffer for byte[] equality semantics 435 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 436 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 437 438 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 439 // Error is logged inside checkHFilesCountPerRegionPerFamily. 440 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + 441 " hfiles to one family of one region"); 442 } 443 444 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 445 item2RegionMap); 446 447 // NOTE: The next iteration's split / group could happen in parallel to 448 // atomic bulkloads assuming that there are splits and no merges, and 449 // that we can atomically pull out the groups we want to retry. 450 } 451 452 if (!queue.isEmpty()) { 453 throw new RuntimeException("Bulk load aborted with some files not yet loaded." + 454 "Please check log for more details."); 455 } 456 return item2RegionMap; 457 } 458 459 /** 460 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 461 * re-queued for another pass with the groupOrSplitPhase. 462 * <p> 463 * protected for testing. 464 */ 465 @VisibleForTesting 466 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 467 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 468 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 469 // atomically bulk load the groups. 470 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 471 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 472 .entrySet()) { 473 byte[] first = e.getKey().array(); 474 Collection<LoadQueueItem> lqis = e.getValue(); 475 476 ClientServiceCallable<byte[]> serviceCallable = 477 buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); 478 479 Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { 480 @Override 481 public List<LoadQueueItem> call() throws Exception { 482 List<LoadQueueItem> toRetry = 483 tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); 484 return toRetry; 485 } 486 }; 487 if (item2RegionMap != null) { 488 for (LoadQueueItem lqi : lqis) { 489 item2RegionMap.put(lqi, e.getKey()); 490 } 491 } 492 loadingFutures.add(pool.submit(call)); 493 } 494 495 // get all the results. 496 for (Future<List<LoadQueueItem>> future : loadingFutures) { 497 try { 498 List<LoadQueueItem> toRetry = future.get(); 499 500 if (item2RegionMap != null) { 501 for (LoadQueueItem lqi : toRetry) { 502 item2RegionMap.remove(lqi); 503 } 504 } 505 // LQIs that are requeued to be regrouped. 506 queue.addAll(toRetry); 507 508 } catch (ExecutionException e1) { 509 Throwable t = e1.getCause(); 510 if (t instanceof IOException) { 511 // At this point something unrecoverable has happened. 512 // TODO Implement bulk load recovery 513 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 514 } 515 LOG.error("Unexpected execution exception during bulk load", e1); 516 throw new IllegalStateException(t); 517 } catch (InterruptedException e1) { 518 LOG.error("Unexpected interrupted exception during bulk load", e1); 519 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 520 } 521 } 522 } 523 524 @VisibleForTesting 525 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 526 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 527 List<Pair<byte[], String>> famPaths = 528 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 529 .collect(Collectors.toList()); 530 return new ClientServiceCallable<byte[]>(conn, tableName, first, 531 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 532 @Override 533 protected byte[] rpcCall() throws Exception { 534 SecureBulkLoadClient secureClient = null; 535 boolean success = false; 536 try { 537 if (LOG.isDebugEnabled()) { 538 LOG.debug("Going to connect to server " + getLocation() + " for row " + 539 Bytes.toStringBinary(getRow()) + " with hfile group " + 540 LoadIncrementalHFiles.this.toString(famPaths)); 541 } 542 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 543 try (Table table = conn.getTable(getTableName())) { 544 secureClient = new SecureBulkLoadClient(getConf(), table); 545 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 546 assignSeqIds, fsDelegationToken.getUserToken(), 547 bulkToken, copyFile, clusterIds, replicate); 548 } 549 return success ? regionName : null; 550 } finally { 551 // Best effort copying of files that might not have been imported 552 // from the staging directory back to original location 553 // in user directory 554 if (secureClient != null && !success) { 555 FileSystem targetFs = FileSystem.get(getConf()); 556 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 557 // Check to see if the source and target filesystems are the same 558 // If they are the same filesystem, we will try move the files back 559 // because previously we moved them to the staging directory. 560 if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 561 for (Pair<byte[], String> el : famPaths) { 562 Path hfileStagingPath = null; 563 Path hfileOrigPath = new Path(el.getSecond()); 564 try { 565 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 566 hfileOrigPath.getName()); 567 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 568 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 569 } else if (targetFs.exists(hfileStagingPath)) { 570 LOG.debug( 571 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 572 } 573 } catch (Exception ex) { 574 LOG.debug( 575 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 576 } 577 } 578 } 579 } 580 } 581 } 582 }; 583 } 584 585 private boolean checkHFilesCountPerRegionPerFamily( 586 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 587 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 588 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 589 for (LoadQueueItem lqi : e.getValue()) { 590 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 591 count.increment(); 592 if (count.intValue() > maxFilesPerRegionPerFamily) { 593 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + 594 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + 595 " of region with start key " + Bytes.toStringBinary(e.getKey())); 596 return false; 597 } 598 } 599 } 600 return true; 601 } 602 603 /** 604 * @param table the table to load into 605 * @param pool the ExecutorService 606 * @param queue the queue for LoadQueueItem 607 * @param startEndKeys start and end keys 608 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 609 */ 610 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 611 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 612 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 613 // <region start key, LQI> need synchronized only within this scope of this 614 // phase because of the puts that happen in futures. 615 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 616 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 617 Set<String> missingHFiles = new HashSet<>(); 618 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 619 new Pair<>(regionGroups, missingHFiles); 620 621 // drain LQIs and figure out bulk load groups 622 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 623 while (!queue.isEmpty()) { 624 final LoadQueueItem item = queue.remove(); 625 626 final Callable<Pair<List<LoadQueueItem>, String>> call = 627 new Callable<Pair<List<LoadQueueItem>, String>>() { 628 @Override 629 public Pair<List<LoadQueueItem>, String> call() throws Exception { 630 Pair<List<LoadQueueItem>, String> splits = 631 groupOrSplit(regionGroups, item, table, startEndKeys); 632 return splits; 633 } 634 }; 635 splittingFutures.add(pool.submit(call)); 636 } 637 // get all the results. All grouping and splitting must finish before 638 // we can attempt the atomic loads. 639 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 640 try { 641 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 642 if (splits != null) { 643 if (splits.getFirst() != null) { 644 queue.addAll(splits.getFirst()); 645 } else { 646 missingHFiles.add(splits.getSecond()); 647 } 648 } 649 } catch (ExecutionException e1) { 650 Throwable t = e1.getCause(); 651 if (t instanceof IOException) { 652 LOG.error("IOException during splitting", e1); 653 throw (IOException) t; // would have been thrown if not parallelized, 654 } 655 LOG.error("Unexpected execution exception during splitting", e1); 656 throw new IllegalStateException(t); 657 } catch (InterruptedException e1) { 658 LOG.error("Unexpected interrupted exception during splitting", e1); 659 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 660 } 661 } 662 return pair; 663 } 664 665 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 666 byte[] startKey, byte[] splitKey) throws IOException { 667 Path hfilePath = item.getFilePath(); 668 byte[] family = item.getFamily(); 669 Path tmpDir = hfilePath.getParent(); 670 if (!tmpDir.getName().equals(TMP_DIR)) { 671 tmpDir = new Path(tmpDir, TMP_DIR); 672 } 673 674 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 675 676 String uniqueName = getUniqueName(); 677 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 678 679 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 680 Path topOut = new Path(tmpDir, uniqueName + ".top"); 681 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 682 683 FileSystem fs = tmpDir.getFileSystem(getConf()); 684 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 685 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 686 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 687 688 // Add these back at the *front* of the queue, so there's a lower 689 // chance that the region will just split again before we get there. 690 List<LoadQueueItem> lqis = new ArrayList<>(2); 691 lqis.add(new LoadQueueItem(family, botOut)); 692 lqis.add(new LoadQueueItem(family, topOut)); 693 694 // If the current item is already the result of previous splits, 695 // we don't need it anymore. Clean up to save space. 696 // It is not part of the original input files. 697 try { 698 if (tmpDir.getName().equals(TMP_DIR)) { 699 fs.delete(hfilePath, false); 700 } 701 } catch (IOException e) { 702 LOG.warn("Unable to delete temporary split file " + hfilePath); 703 } 704 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 705 return lqis; 706 } 707 708 /** 709 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 710 * no longer fits into a region, physically splits the hfile such that the new bottom half will 711 * fit and returns the list of LQI's corresponding to the resultant hfiles. 712 * <p> 713 * protected for testing 714 * @throws IOException if an IO failure is encountered 715 */ 716 @VisibleForTesting 717 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 718 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 719 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 720 Path hfilePath = item.getFilePath(); 721 Optional<byte[]> first, last; 722 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 723 CacheConfig.DISABLED, true, getConf())) { 724 hfr.loadFileInfo(); 725 first = hfr.getFirstRowKey(); 726 last = hfr.getLastRowKey(); 727 } catch (FileNotFoundException fnfe) { 728 LOG.debug("encountered", fnfe); 729 return new Pair<>(null, hfilePath.getName()); 730 } 731 732 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) + 733 " last=" + last.map(Bytes::toStringBinary)); 734 if (!first.isPresent() || !last.isPresent()) { 735 assert !first.isPresent() && !last.isPresent(); 736 // TODO what if this is due to a bad HFile? 737 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 738 return null; 739 } 740 if (Bytes.compareTo(first.get(), last.get()) > 0) { 741 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) + 742 " > " + Bytes.toStringBinary(last.get())); 743 } 744 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR); 745 if (idx < 0) { 746 // not on boundary, returns -(insertion index). Calculate region it 747 // would be in. 748 idx = -(idx + 1) - 1; 749 } 750 int indexForCallable = idx; 751 752 /** 753 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 754 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 755 * region. 3) if the endkey of the last region is not empty. 756 */ 757 if (indexForCallable < 0) { 758 throw new IOException("The first region info for table " + table.getName() + 759 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 760 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && 761 !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { 762 throw new IOException("The last region info for table " + table.getName() + 763 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 764 } else if (indexForCallable + 1 < startEndKeys.getFirst().length && 765 !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], 766 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { 767 throw new IOException("The endkey of one region for table " + table.getName() + 768 " is not equal to the startkey of the next region in hbase:meta." + 769 "Please use hbck tool to fix it first."); 770 } 771 772 boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 || 773 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); 774 if (!lastKeyInRange) { 775 List<LoadQueueItem> lqis = splitStoreFile(item, table, 776 startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); 777 return new Pair<>(lqis, null); 778 } 779 780 // group regions. 781 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); 782 return null; 783 } 784 785 /** 786 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 787 * hfiles that need to be retried. If it is successful it will return an empty list. 788 * <p> 789 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 790 * and fails atomically. 791 * <p> 792 * Protected for testing. 793 * @return empty list if success, list of items to retry on recoverable failure 794 */ 795 @VisibleForTesting 796 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 797 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 798 throws IOException { 799 List<LoadQueueItem> toRetry = new ArrayList<>(); 800 try { 801 Configuration conf = getConf(); 802 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 803 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 804 if (region == null) { 805 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + 806 " into table " + tableName + " with files " + lqis + 807 " failed. This is recoverable and they will be retried."); 808 toRetry.addAll(lqis); // return lqi's to retry 809 } 810 // success 811 return toRetry; 812 } catch (IOException e) { 813 LOG.error("Encountered unrecoverable error from region server, additional details: " + 814 serviceCallable.getExceptionMessageAdditionalDetail(), 815 e); 816 LOG.warn( 817 "Received a " + e.getClass().getSimpleName() 818 + " from region server: " 819 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 820 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 821 && numRetries.get() < getConf().getInt( 822 HConstants.HBASE_CLIENT_RETRIES_NUMBER, 823 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 824 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 825 + numRetries.incrementAndGet()); 826 toRetry.addAll(lqis); 827 return toRetry; 828 } 829 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 830 throw e; 831 } 832 } 833 834 /** 835 * If the table is created for the first time, then "completebulkload" reads the files twice. More 836 * modifications necessary if we want to avoid doing it. 837 */ 838 private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { 839 final Path hfofDir = new Path(dirPath); 840 final FileSystem fs = hfofDir.getFileSystem(getConf()); 841 842 // Add column families 843 // Build a set of keys 844 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 845 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 846 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 847 @Override 848 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 849 ColumnFamilyDescriptorBuilder builder = 850 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 851 familyBuilders.add(builder); 852 return builder; 853 } 854 855 @Override 856 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 857 throws IOException { 858 Path hfile = hfileStatus.getPath(); 859 try (HFile.Reader reader = 860 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 861 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 862 builder.setCompressionType(reader.getFileContext().getCompression()); 863 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + 864 " for family " + builder.getNameAsString()); 865 } 866 reader.loadFileInfo(); 867 byte[] first = reader.getFirstRowKey().get(); 868 byte[] last = reader.getLastRowKey().get(); 869 870 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + 871 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 872 873 // To eventually infer start key-end key boundaries 874 Integer value = map.containsKey(first) ? map.get(first) : 0; 875 map.put(first, value + 1); 876 877 value = map.containsKey(last) ? map.get(last) : 0; 878 map.put(last, value - 1); 879 } 880 } 881 }); 882 883 byte[][] keys = inferBoundaries(map); 884 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 885 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 886 .forEachOrdered(tdBuilder::setColumnFamily); 887 admin.createTable(tdBuilder.build(), keys); 888 889 LOG.info("Table " + tableName + " is available!!"); 890 } 891 892 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 893 SecureBulkLoadClient secureClient) throws IOException { 894 fsDelegationToken.releaseDelegationToken(); 895 if (bulkToken != null && secureClient != null) { 896 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 897 } 898 if (pool != null) { 899 pool.shutdown(); 900 } 901 if (!queue.isEmpty()) { 902 StringBuilder err = new StringBuilder(); 903 err.append("-------------------------------------------------\n"); 904 err.append("Bulk load aborted with some files not yet loaded:\n"); 905 err.append("-------------------------------------------------\n"); 906 for (LoadQueueItem q : queue) { 907 err.append(" ").append(q.getFilePath()).append('\n'); 908 } 909 LOG.error(err.toString()); 910 } 911 } 912 913 // unique file name for the table 914 private String getUniqueName() { 915 return UUID.randomUUID().toString().replaceAll("-", ""); 916 } 917 918 /** 919 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 920 */ 921 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 922 throws IOException { 923 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 924 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 925 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 926 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 927 if (unmatchedFamilies.size() > 0) { 928 String msg = 929 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + 930 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + 931 familyNames; 932 LOG.error(msg); 933 if (!silence) { 934 throw new IOException(msg); 935 } 936 } 937 } 938 939 /** 940 * Populate the Queue with given HFiles 941 */ 942 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 943 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 944 } 945 946 /** 947 * Walk the given directory for all HFiles, and return a Queue containing all such files. 948 */ 949 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 950 final boolean validateHFile) throws IOException { 951 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 952 @Override 953 public byte[] bulkFamily(final byte[] familyName) { 954 return familyName; 955 } 956 957 @Override 958 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 959 long length = hfile.getLen(); 960 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 961 HConstants.DEFAULT_MAX_FILE_SIZE)) { 962 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + 963 " bytes can be problematic as it may lead to oversplitting."); 964 } 965 ret.add(new LoadQueueItem(family, hfile.getPath())); 966 } 967 }, validateHFile); 968 } 969 970 private interface BulkHFileVisitor<TFamily> { 971 972 TFamily bulkFamily(byte[] familyName) throws IOException; 973 974 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 975 } 976 977 /** 978 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 979 * non-valid hfiles. 980 */ 981 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 982 final BulkHFileVisitor<TFamily> visitor) throws IOException { 983 visitBulkHFiles(fs, bulkDir, visitor, true); 984 } 985 986 /** 987 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 988 * skip non-valid hfiles by default, or skip this validation by setting 989 * 'hbase.loadincremental.validate.hfile' to false. 990 */ 991 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 992 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 993 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 994 for (FileStatus familyStat : familyDirStatuses) { 995 if (!familyStat.isDirectory()) { 996 LOG.warn("Skipping non-directory " + familyStat.getPath()); 997 continue; 998 } 999 Path familyDir = familyStat.getPath(); 1000 byte[] familyName = Bytes.toBytes(familyDir.getName()); 1001 // Skip invalid family 1002 try { 1003 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 1004 } catch (IllegalArgumentException e) { 1005 LOG.warn("Skipping invalid " + familyStat.getPath()); 1006 continue; 1007 } 1008 TFamily family = visitor.bulkFamily(familyName); 1009 1010 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1011 for (FileStatus hfileStatus : hfileStatuses) { 1012 if (!fs.isFile(hfileStatus.getPath())) { 1013 LOG.warn("Skipping non-file " + hfileStatus); 1014 continue; 1015 } 1016 1017 Path hfile = hfileStatus.getPath(); 1018 // Skip "_", reference, HFileLink 1019 String fileName = hfile.getName(); 1020 if (fileName.startsWith("_")) { 1021 continue; 1022 } 1023 if (StoreFileInfo.isReference(fileName)) { 1024 LOG.warn("Skipping reference " + fileName); 1025 continue; 1026 } 1027 if (HFileLink.isHFileLink(fileName)) { 1028 LOG.warn("Skipping HFileLink " + fileName); 1029 continue; 1030 } 1031 1032 // Validate HFile Format if needed 1033 if (validateHFile) { 1034 try { 1035 if (!HFile.isHFileFormat(fs, hfile)) { 1036 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1037 continue; 1038 } 1039 } catch (FileNotFoundException e) { 1040 LOG.warn("the file " + hfile + " was removed"); 1041 continue; 1042 } 1043 } 1044 1045 visitor.bulkHFile(family, hfileStatus); 1046 } 1047 } 1048 } 1049 1050 // Initialize a thread pool 1051 private ExecutorService createExecutorService() { 1052 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1053 new LinkedBlockingQueue<>(), 1054 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1055 pool.allowCoreThreadTimeOut(true); 1056 return pool; 1057 } 1058 1059 private final String toString(List<Pair<byte[], String>> list) { 1060 StringBuilder sb = new StringBuilder(); 1061 sb.append('['); 1062 list.forEach(p -> { 1063 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1064 .append('}'); 1065 }); 1066 sb.append(']'); 1067 return sb.toString(); 1068 } 1069 1070 private boolean isSecureBulkLoadEndpointAvailable() { 1071 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1072 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1073 } 1074 1075 /** 1076 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1077 * filters, etc. 1078 */ 1079 @VisibleForTesting 1080 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1081 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1082 // Open reader with no block cache, and not in-memory 1083 Reference topReference = Reference.createTopReference(splitKey); 1084 Reference bottomReference = Reference.createBottomReference(splitKey); 1085 1086 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1087 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1088 } 1089 1090 /** 1091 * Copy half of an HFile into a new HFile. 1092 */ 1093 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1094 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1095 FileSystem fs = inFile.getFileSystem(conf); 1096 CacheConfig cacheConf = CacheConfig.DISABLED; 1097 HalfStoreFileReader halfReader = null; 1098 StoreFileWriter halfWriter = null; 1099 try { 1100 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, 1101 new AtomicInteger(0), true, conf); 1102 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1103 1104 int blocksize = familyDescriptor.getBlocksize(); 1105 Algorithm compression = familyDescriptor.getCompressionType(); 1106 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1107 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1108 .withChecksumType(HStore.getChecksumType(conf)) 1109 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1110 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1111 .build(); 1112 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1113 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1114 HFileScanner scanner = halfReader.getScanner(false, false, false); 1115 scanner.seekTo(); 1116 do { 1117 halfWriter.append(scanner.getCell()); 1118 } while (scanner.next()); 1119 1120 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1121 if (shouldCopyHFileMetaKey(entry.getKey())) { 1122 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1123 } 1124 } 1125 } finally { 1126 if (halfReader != null) { 1127 try { 1128 halfReader.close(cacheConf.shouldEvictOnClose()); 1129 } catch (IOException e) { 1130 LOG.warn("failed to close hfile reader for " + inFile, e); 1131 } 1132 } 1133 if (halfWriter != null) { 1134 halfWriter.close(); 1135 } 1136 1137 } 1138 } 1139 1140 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1141 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1142 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1143 return false; 1144 } 1145 1146 return !HFile.isReservedFileInfoKey(key); 1147 } 1148 1149 private boolean isCreateTable() { 1150 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1151 } 1152 1153 private boolean isSilence() { 1154 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1155 } 1156 1157 private boolean isAlwaysCopyFiles() { 1158 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1159 } 1160 1161 /** 1162 * Perform bulk load on the given table. 1163 * @param hfofDir the directory that was provided as the output path of a job using 1164 * HFileOutputFormat 1165 * @param tableName the table to load into 1166 */ 1167 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1168 throws IOException { 1169 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1170 Admin admin = connection.getAdmin()) { 1171 if (!admin.tableExists(tableName)) { 1172 if (isCreateTable()) { 1173 createTable(tableName, hfofDir, admin); 1174 } else { 1175 String errorMsg = format("Table '%s' does not exist.", tableName); 1176 LOG.error(errorMsg); 1177 throw new TableNotFoundException(errorMsg); 1178 } 1179 } 1180 try (Table table = connection.getTable(tableName); 1181 RegionLocator locator = connection.getRegionLocator(tableName)) { 1182 return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), 1183 isAlwaysCopyFiles()); 1184 } 1185 } 1186 } 1187 1188 /** 1189 * Perform bulk load on the given table. 1190 * @param family2Files map of family to List of hfiles 1191 * @param tableName the table to load into 1192 */ 1193 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1194 TableName tableName) throws IOException { 1195 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1196 Admin admin = connection.getAdmin()) { 1197 if (!admin.tableExists(tableName)) { 1198 String errorMsg = format("Table '%s' does not exist.", tableName); 1199 LOG.error(errorMsg); 1200 throw new TableNotFoundException(errorMsg); 1201 } 1202 try (Table table = connection.getTable(tableName); 1203 RegionLocator locator = connection.getRegionLocator(tableName)) { 1204 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1205 } 1206 } 1207 } 1208 1209 @Override 1210 public int run(String[] args) throws Exception { 1211 if (args.length != 2 && args.length != 3) { 1212 usage(); 1213 return -1; 1214 } 1215 String dirPath = args[0]; 1216 TableName tableName = TableName.valueOf(args[1]); 1217 1218 1219 if (args.length == 2) { 1220 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1221 } else { 1222 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1223 FileSystem fs = FileSystem.get(getConf()); 1224 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1225 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1226 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1227 byte[] familyName = Bytes.toBytes(family); 1228 if (family2Files.containsKey(familyName)) { 1229 family2Files.get(familyName).add(path); 1230 } else { 1231 family2Files.put(familyName, Lists.newArrayList(path)); 1232 } 1233 }); 1234 } 1235 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1236 } 1237 1238 } 1239 1240 public static void main(String[] args) throws Exception { 1241 Configuration conf = HBaseConfiguration.create(); 1242 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1243 System.exit(ret); 1244 } 1245 1246 /** 1247 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1248 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1249 * property. This directory is used as a temporary directory where all files are initially 1250 * copied/moved from user given directory, set all the required file permissions and then from 1251 * their it is finally loaded into a table. This should be set only when, one would like to manage 1252 * the staging directory by itself. Otherwise this tool will handle this by itself. 1253 * @param stagingDir staging directory path 1254 */ 1255 public void setBulkToken(String stagingDir) { 1256 this.bulkToken = stagingDir; 1257 } 1258 1259 public void setClusterIds(List<String> clusterIds) { 1260 this.clusterIds = clusterIds; 1261 } 1262 1263 /** 1264 * Disables replication for these bulkloaded files. 1265 */ 1266 public void disableReplication(){ 1267 this.replicate = false; 1268 } 1269 /** 1270 * Infers region boundaries for a new table. 1271 * <p> 1272 * Parameter: <br> 1273 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1274 * <ul> 1275 * <li>If a key is a start key of a file, then it maps to +1</li> 1276 * <li>If a key is an end key of a file, then it maps to -1</li> 1277 * </ul> 1278 * <p> 1279 * Algo:<br> 1280 * <ol> 1281 * <li>Poll on the keys in order: 1282 * <ol type="a"> 1283 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1284 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1285 * boundary list.</li> 1286 * </ol> 1287 * </li> 1288 * <li>Return the boundary list.</li> 1289 * </ol> 1290 */ 1291 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1292 List<byte[]> keysArray = new ArrayList<>(); 1293 int runningValue = 0; 1294 byte[] currStartKey = null; 1295 boolean firstBoundary = true; 1296 1297 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1298 if (runningValue == 0) { 1299 currStartKey = item.getKey(); 1300 } 1301 runningValue += item.getValue(); 1302 if (runningValue == 0) { 1303 if (!firstBoundary) { 1304 keysArray.add(currStartKey); 1305 } 1306 firstBoundary = false; 1307 } 1308 } 1309 1310 return keysArray.toArray(new byte[0][]); 1311 } 1312}