001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Future; 046import java.util.concurrent.LinkedBlockingQueue; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Collectors; 051import org.apache.commons.lang3.mutable.MutableInt; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.conf.Configured; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.TableNotFoundException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.ClientServiceCallable; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.RegionLocator; 069import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 070import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 074import org.apache.hadoop.hbase.io.HFileLink; 075import org.apache.hadoop.hbase.io.HalfStoreFileReader; 076import org.apache.hadoop.hbase.io.Reference; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFileContext; 081import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 083import org.apache.hadoop.hbase.io.hfile.HFileInfo; 084import org.apache.hadoop.hbase.io.hfile.HFileScanner; 085import org.apache.hadoop.hbase.io.hfile.ReaderContext; 086import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 087import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 088import org.apache.hadoop.hbase.regionserver.BloomType; 089import org.apache.hadoop.hbase.regionserver.HStore; 090import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 091import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 092import org.apache.hadoop.hbase.security.UserProvider; 093import org.apache.hadoop.hbase.security.token.FsDelegationToken; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.FSUtils; 096import org.apache.hadoop.hbase.util.FSVisitor; 097import org.apache.hadoop.hbase.util.Pair; 098import org.apache.hadoop.util.Tool; 099import org.apache.hadoop.util.ToolRunner; 100import org.apache.yetus.audience.InterfaceAudience; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103 104import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 105import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 106import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 107import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 108import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 109import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 110import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 111 112/** 113 * Tool to load the output of HFileOutputFormat into an existing table. 114 * <p/> 115 * Notice that, by default, this class should be kept till 4.0.0, but as this is a bad practice that 116 * we expose an implementation class instead of an interface, we want to fix it ASAP. That's why we 117 * will remove this class completely in 3.0.0. Please change your code to use 118 * {@link BulkLoadHFiles}. 119 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please 120 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)} 121 * and {@link #run(String, TableName)}, as all the methods other than them will be 122 * removed with no replacement. 123 */ 124@Deprecated 125@InterfaceAudience.Public 126public class LoadIncrementalHFiles extends Configured implements Tool { 127 128 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 129 130 /** 131 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not 132 * depend on this value. 133 */ 134 @Deprecated 135 public static final String NAME = BulkLoadHFilesTool.NAME; 136 static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION; 137 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 138 BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY; 139 private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS; 140 public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY; 141 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = 142 BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY; 143 public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES; 144 145 // We use a '.' prefix which is ignored when walking directory trees 146 // above. It is invalid family name. 147 static final String TMP_DIR = ".tmp"; 148 149 private final int maxFilesPerRegionPerFamily; 150 private final boolean assignSeqIds; 151 private boolean bulkLoadByFamily; 152 153 // Source delegation token 154 private final FsDelegationToken fsDelegationToken; 155 private final UserProvider userProvider; 156 private final int nrThreads; 157 private AtomicInteger numRetries; 158 private final RpcControllerFactory rpcControllerFactory; 159 160 private String bulkToken; 161 162 private List<String> clusterIds = new ArrayList<>(); 163 164 private boolean replicate = true; 165 166 /** 167 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 168 * the case where a region has split during the process of the load. When this happens, the HFile 169 * is split into two physical parts across the new region boundary, and each part is added back 170 * into the queue. The import process finishes when the queue is empty. 171 * @deprecated since 2.2.0 and will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. 172 * @see BulkLoadHFiles 173 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a> 174 */ 175 @InterfaceAudience.Public 176 @Deprecated 177 public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { 178 179 public LoadQueueItem(byte[] family, Path hfilePath) { 180 super(family, hfilePath); 181 } 182 } 183 184 public LoadIncrementalHFiles(Configuration conf) { 185 // make a copy, just to be sure we're not overriding someone else's config 186 super(HBaseConfiguration.create(conf)); 187 conf = getConf(); 188 // disable blockcache for tool invocation, see HBASE-10500 189 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 190 userProvider = UserProvider.instantiate(conf); 191 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 192 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 193 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 194 bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 195 nrThreads = conf.getInt("hbase.loadincremental.threads.max", 196 Runtime.getRuntime().availableProcessors()); 197 numRetries = new AtomicInteger(0); 198 rpcControllerFactory = new RpcControllerFactory(conf); 199 } 200 201 private void usage() { 202 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 203 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 204 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 205 + "into an hbase table.\n" 206 + "OPTIONS (for other -D options, see source code):\n" 207 + " -D" + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 208 + "table must exist.\n" 209 + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes to ignore unmatched column families.\n" 210 + " -loadTable for when directory of files to load has a depth of 3; target table must " 211 + "exist;\n" 212 + " must be last of the options on command line.\n" 213 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 214 + "documentation.\n"); 215 } 216 217 /** 218 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 219 * passed directory and validates whether the prepared queue has all the valid table column 220 * families in it. 221 * @param hfilesDir directory containing list of hfiles to be loaded into the table 222 * @param table table to which hfiles should be loaded 223 * @param queue queue which needs to be loaded into the table 224 * @param validateHFile if true hfiles will be validated for its format 225 * @throws IOException If any I/O or network error occurred 226 */ 227 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 228 boolean validateHFile) throws IOException { 229 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 230 } 231 232 /** 233 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 234 * passed directory and validates whether the prepared queue has all the valid table column 235 * families in it. 236 * @param hfilesDir directory containing list of hfiles to be loaded into the table 237 * @param table table to which hfiles should be loaded 238 * @param queue queue which needs to be loaded into the table 239 * @param validateHFile if true hfiles will be validated for its format 240 * @param silence true to ignore unmatched column families 241 * @throws IOException If any I/O or network error occurred 242 */ 243 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 244 boolean validateHFile, boolean silence) throws IOException { 245 discoverLoadQueue(queue, hfilesDir, validateHFile); 246 validateFamiliesInHFiles(table, queue, silence); 247 } 248 249 /** 250 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 251 * passed directory and validates whether the prepared queue has all the valid table column 252 * families in it. 253 * @param map map of family to List of hfiles 254 * @param table table to which hfiles should be loaded 255 * @param queue queue which needs to be loaded into the table 256 * @param silence true to ignore unmatched column families 257 * @throws IOException If any I/O or network error occurred 258 */ 259 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 260 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 261 populateLoadQueue(queue, map); 262 validateFamiliesInHFiles(table, queue, silence); 263 } 264 265 /** 266 * Perform a bulk load of the given directory into the given pre-existing table. This method is 267 * not threadsafe. 268 * @param hfofDir the directory that was provided as the output path of a job using 269 * HFileOutputFormat 270 * @param admin the Admin 271 * @param table the table to load into 272 * @param regionLocator region locator 273 * @throws TableNotFoundException if table does not yet exist 274 */ 275 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 276 RegionLocator regionLocator) throws TableNotFoundException, IOException { 277 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 278 } 279 280 /** 281 * Perform a bulk load of the given directory into the given pre-existing table. This method is 282 * not threadsafe. 283 * @param map map of family to List of hfiles 284 * @param admin the Admin 285 * @param table the table to load into 286 * @param regionLocator region locator 287 * @param silence true to ignore unmatched column families 288 * @param copyFile always copy hfiles if true 289 * @throws TableNotFoundException if table does not yet exist 290 */ 291 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 292 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 293 throws TableNotFoundException, IOException { 294 if (!admin.isTableAvailable(regionLocator.getName())) { 295 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 296 } 297 // LQI queue does not need to be threadsafe -- all operations on this queue 298 // happen in this thread 299 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 300 ExecutorService pool = null; 301 SecureBulkLoadClient secureClient = null; 302 try { 303 prepareHFileQueue(map, table, queue, silence); 304 if (queue.isEmpty()) { 305 LOG.warn("Bulk load operation did not get any files to load"); 306 return Collections.emptyMap(); 307 } 308 pool = createExecutorService(); 309 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 310 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 311 } finally { 312 cleanup(admin, queue, pool, secureClient); 313 } 314 } 315 316 /** 317 * Perform a bulk load of the given directory into the given pre-existing table. This method is 318 * not threadsafe. 319 * @param hfofDir the directory that was provided as the output path of a job using 320 * HFileOutputFormat 321 * @param admin the Admin 322 * @param table the table to load into 323 * @param regionLocator region locator 324 * @param silence true to ignore unmatched column families 325 * @param copyFile always copy hfiles if true 326 * @throws TableNotFoundException if table does not yet exist 327 */ 328 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 329 RegionLocator regionLocator, boolean silence, boolean copyFile) 330 throws TableNotFoundException, IOException { 331 if (!admin.isTableAvailable(regionLocator.getName())) { 332 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 333 } 334 335 /* 336 * Checking hfile format is a time-consuming operation, we should have an option to skip this 337 * step when bulkloading millions of HFiles. See HBASE-13985. 338 */ 339 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 340 if (!validateHFile) { 341 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 342 "are not correct. If you fail to read data from your table after using this " + 343 "option, consider removing the files and bulkload again without this option. " + 344 "See HBASE-13985"); 345 } 346 // LQI queue does not need to be threadsafe -- all operations on this queue 347 // happen in this thread 348 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 349 ExecutorService pool = null; 350 SecureBulkLoadClient secureClient = null; 351 try { 352 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 353 354 if (queue.isEmpty()) { 355 LOG.warn( 356 "Bulk load operation did not find any files to load in directory {}. " + 357 "Does it contain files in subdirectories that correspond to column family names?", 358 (hfofDir != null ? hfofDir.toUri().toString() : "")); 359 return Collections.emptyMap(); 360 } 361 pool = createExecutorService(); 362 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 363 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 364 } finally { 365 cleanup(admin, queue, pool, secureClient); 366 } 367 } 368 369 /** 370 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 371 * <ol> 372 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 373 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 374 * </li> 375 * </ol> 376 * @param table Table to which these hfiles should be loaded to 377 * @param conn Connection to use 378 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 379 * @param startEndKeys starting and ending row keys of the region 380 */ 381 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 382 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 383 loadHFileQueue(table, conn, queue, startEndKeys, false); 384 } 385 386 /** 387 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 388 * <ol> 389 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 390 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 391 * </li> 392 * </ol> 393 * @param table Table to which these hfiles should be loaded to 394 * @param conn Connection to use 395 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 396 * @param startEndKeys starting and ending row keys of the region 397 */ 398 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 399 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 400 ExecutorService pool = null; 401 try { 402 pool = createExecutorService(); 403 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 404 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 405 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 406 } finally { 407 if (pool != null) { 408 pool.shutdown(); 409 } 410 } 411 } 412 413 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 414 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 415 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 416 int count = 0; 417 418 if (isSecureBulkLoadEndpointAvailable()) { 419 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 420 LOG.warn("Secure bulk load has been integrated into HBase core."); 421 } 422 423 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 424 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 425 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 426 427 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 428 // Assumes that region splits can happen while this occurs. 429 while (!queue.isEmpty()) { 430 // need to reload split keys each iteration. 431 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 432 if (count != 0) { 433 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " + 434 queue.size() + " files remaining to group or split"); 435 } 436 437 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 438 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 439 if (maxRetries != 0 && count >= maxRetries) { 440 throw new IOException( 441 "Retry attempted " + count + " times without completing, bailing out"); 442 } 443 count++; 444 445 // Using ByteBuffer for byte[] equality semantics 446 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 447 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 448 449 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 450 // Error is logged inside checkHFilesCountPerRegionPerFamily. 451 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + 452 " hfiles to one family of one region"); 453 } 454 455 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 456 item2RegionMap); 457 458 // NOTE: The next iteration's split / group could happen in parallel to 459 // atomic bulkloads assuming that there are splits and no merges, and 460 // that we can atomically pull out the groups we want to retry. 461 } 462 463 if (!queue.isEmpty()) { 464 throw new RuntimeException("Bulk load aborted with some files not yet loaded." + 465 "Please check log for more details."); 466 } 467 return item2RegionMap; 468 } 469 470 private Map<byte[], Collection<LoadQueueItem>> 471 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 472 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 473 itemsInRegion.forEach(item -> families2Queue 474 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 475 return families2Queue; 476 } 477 478 /** 479 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 480 * re-queued for another pass with the groupOrSplitPhase. 481 * <p> 482 * protected for testing. 483 */ 484 @VisibleForTesting 485 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 486 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 487 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 488 // atomically bulk load the groups. 489 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 490 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 491 .entrySet()) { 492 byte[] first = e.getKey().array(); 493 Collection<LoadQueueItem> lqis = e.getValue(); 494 if (item2RegionMap != null) { 495 for (LoadQueueItem lqi : lqis) { 496 item2RegionMap.put(lqi, e.getKey()); 497 } 498 } 499 if (bulkLoadByFamily) { 500 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures.add(pool.submit( 501 () -> tryAtomicRegionLoad(conn, table.getName(), first, familyQueue, copyFile)))); 502 } else { 503 loadingFutures.add( 504 pool.submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile))); 505 } 506 } 507 508 // get all the results. 509 for (Future<List<LoadQueueItem>> future : loadingFutures) { 510 try { 511 List<LoadQueueItem> toRetry = future.get(); 512 513 if (item2RegionMap != null) { 514 for (LoadQueueItem lqi : toRetry) { 515 item2RegionMap.remove(lqi); 516 } 517 } 518 // LQIs that are requeued to be regrouped. 519 queue.addAll(toRetry); 520 521 } catch (ExecutionException e1) { 522 Throwable t = e1.getCause(); 523 if (t instanceof IOException) { 524 // At this point something unrecoverable has happened. 525 // TODO Implement bulk load recovery 526 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 527 } 528 LOG.error("Unexpected execution exception during bulk load", e1); 529 throw new IllegalStateException(t); 530 } catch (InterruptedException e1) { 531 LOG.error("Unexpected interrupted exception during bulk load", e1); 532 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 533 } 534 } 535 } 536 537 @VisibleForTesting 538 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 539 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 540 List<Pair<byte[], String>> famPaths = 541 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 542 .collect(Collectors.toList()); 543 return new ClientServiceCallable<byte[]>(conn, tableName, first, 544 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 545 @Override 546 protected byte[] rpcCall() throws Exception { 547 SecureBulkLoadClient secureClient = null; 548 boolean success = false; 549 try { 550 if (LOG.isDebugEnabled()) { 551 LOG.debug("Going to connect to server " + getLocation() + " for row " + 552 Bytes.toStringBinary(getRow()) + " with hfile group " + 553 LoadIncrementalHFiles.this.toString(famPaths)); 554 } 555 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 556 try (Table table = conn.getTable(getTableName())) { 557 secureClient = new SecureBulkLoadClient(getConf(), table); 558 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 559 assignSeqIds, fsDelegationToken.getUserToken(), 560 bulkToken, copyFile, clusterIds, replicate); 561 } 562 return success ? regionName : null; 563 } finally { 564 // Best effort copying of files that might not have been imported 565 // from the staging directory back to original location 566 // in user directory 567 if (secureClient != null && !success) { 568 FileSystem targetFs = FileSystem.get(getConf()); 569 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 570 // Check to see if the source and target filesystems are the same 571 // If they are the same filesystem, we will try move the files back 572 // because previously we moved them to the staging directory. 573 if (FSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 574 for (Pair<byte[], String> el : famPaths) { 575 Path hfileStagingPath = null; 576 Path hfileOrigPath = new Path(el.getSecond()); 577 try { 578 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 579 hfileOrigPath.getName()); 580 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 581 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 582 } else if (targetFs.exists(hfileStagingPath)) { 583 LOG.debug( 584 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 585 } 586 } catch (Exception ex) { 587 LOG.debug( 588 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 589 } 590 } 591 } 592 } 593 } 594 } 595 }; 596 } 597 598 private boolean checkHFilesCountPerRegionPerFamily( 599 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 600 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 601 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 602 for (LoadQueueItem lqi : e.getValue()) { 603 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 604 count.increment(); 605 if (count.intValue() > maxFilesPerRegionPerFamily) { 606 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + 607 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + 608 " of region with start key " + Bytes.toStringBinary(e.getKey())); 609 return false; 610 } 611 } 612 } 613 return true; 614 } 615 616 /** 617 * @param table the table to load into 618 * @param pool the ExecutorService 619 * @param queue the queue for LoadQueueItem 620 * @param startEndKeys start and end keys 621 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 622 */ 623 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 624 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 625 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 626 // <region start key, LQI> need synchronized only within this scope of this 627 // phase because of the puts that happen in futures. 628 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 629 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 630 Set<String> missingHFiles = new HashSet<>(); 631 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 632 new Pair<>(regionGroups, missingHFiles); 633 634 // drain LQIs and figure out bulk load groups 635 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 636 while (!queue.isEmpty()) { 637 final LoadQueueItem item = queue.remove(); 638 639 final Callable<Pair<List<LoadQueueItem>, String>> call = 640 new Callable<Pair<List<LoadQueueItem>, String>>() { 641 @Override 642 public Pair<List<LoadQueueItem>, String> call() throws Exception { 643 Pair<List<LoadQueueItem>, String> splits = 644 groupOrSplit(regionGroups, item, table, startEndKeys); 645 return splits; 646 } 647 }; 648 splittingFutures.add(pool.submit(call)); 649 } 650 // get all the results. All grouping and splitting must finish before 651 // we can attempt the atomic loads. 652 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 653 try { 654 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 655 if (splits != null) { 656 if (splits.getFirst() != null) { 657 queue.addAll(splits.getFirst()); 658 } else { 659 missingHFiles.add(splits.getSecond()); 660 } 661 } 662 } catch (ExecutionException e1) { 663 Throwable t = e1.getCause(); 664 if (t instanceof IOException) { 665 LOG.error("IOException during splitting", e1); 666 throw (IOException) t; // would have been thrown if not parallelized, 667 } 668 LOG.error("Unexpected execution exception during splitting", e1); 669 throw new IllegalStateException(t); 670 } catch (InterruptedException e1) { 671 LOG.error("Unexpected interrupted exception during splitting", e1); 672 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 673 } 674 } 675 return pair; 676 } 677 678 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 679 byte[] startKey, byte[] splitKey) throws IOException { 680 Path hfilePath = item.getFilePath(); 681 byte[] family = item.getFamily(); 682 Path tmpDir = hfilePath.getParent(); 683 if (!tmpDir.getName().equals(TMP_DIR)) { 684 tmpDir = new Path(tmpDir, TMP_DIR); 685 } 686 687 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 688 689 String uniqueName = getUniqueName(); 690 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 691 692 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 693 Path topOut = new Path(tmpDir, uniqueName + ".top"); 694 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 695 696 FileSystem fs = tmpDir.getFileSystem(getConf()); 697 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 698 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 699 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 700 701 // Add these back at the *front* of the queue, so there's a lower 702 // chance that the region will just split again before we get there. 703 List<LoadQueueItem> lqis = new ArrayList<>(2); 704 lqis.add(new LoadQueueItem(family, botOut)); 705 lqis.add(new LoadQueueItem(family, topOut)); 706 707 // If the current item is already the result of previous splits, 708 // we don't need it anymore. Clean up to save space. 709 // It is not part of the original input files. 710 try { 711 if (tmpDir.getName().equals(TMP_DIR)) { 712 fs.delete(hfilePath, false); 713 } 714 } catch (IOException e) { 715 LOG.warn("Unable to delete temporary split file " + hfilePath); 716 } 717 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 718 return lqis; 719 } 720 721 /** 722 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 723 * no longer fits into a region, physically splits the hfile such that the new bottom half will 724 * fit and returns the list of LQI's corresponding to the resultant hfiles. 725 * <p> 726 * protected for testing 727 * @throws IOException if an IO failure is encountered 728 */ 729 @VisibleForTesting 730 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 731 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 732 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 733 Path hfilePath = item.getFilePath(); 734 Optional<byte[]> first, last; 735 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 736 CacheConfig.DISABLED, true, getConf())) { 737 first = hfr.getFirstRowKey(); 738 last = hfr.getLastRowKey(); 739 } catch (FileNotFoundException fnfe) { 740 LOG.debug("encountered", fnfe); 741 return new Pair<>(null, hfilePath.getName()); 742 } 743 744 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) + 745 " last=" + last.map(Bytes::toStringBinary)); 746 if (!first.isPresent() || !last.isPresent()) { 747 assert !first.isPresent() && !last.isPresent(); 748 // TODO what if this is due to a bad HFile? 749 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 750 return null; 751 } 752 if (Bytes.compareTo(first.get(), last.get()) > 0) { 753 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) + 754 " > " + Bytes.toStringBinary(last.get())); 755 } 756 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR); 757 if (idx < 0) { 758 // not on boundary, returns -(insertion index). Calculate region it 759 // would be in. 760 idx = -(idx + 1) - 1; 761 } 762 int indexForCallable = idx; 763 764 /** 765 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 766 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 767 * region. 3) if the endkey of the last region is not empty. 768 */ 769 if (indexForCallable < 0) { 770 throw new IOException("The first region info for table " + table.getName() + 771 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 772 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && 773 !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { 774 throw new IOException("The last region info for table " + table.getName() + 775 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 776 } else if (indexForCallable + 1 < startEndKeys.getFirst().length && 777 !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], 778 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { 779 throw new IOException("The endkey of one region for table " + table.getName() + 780 " is not equal to the startkey of the next region in hbase:meta." + 781 "Please use hbck tool to fix it first."); 782 } 783 784 boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 || 785 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); 786 if (!lastKeyInRange) { 787 List<LoadQueueItem> lqis = splitStoreFile(item, table, 788 startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); 789 return new Pair<>(lqis, null); 790 } 791 792 // group regions. 793 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); 794 return null; 795 } 796 797 /** 798 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 799 * hfiles that need to be retried. If it is successful it will return an empty list. 800 * <p> 801 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 802 * and fails atomically. 803 * <p> 804 * Protected for testing. 805 * @return empty list if success, list of items to retry on recoverable failure 806 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 807 */ 808 @Deprecated 809 @VisibleForTesting 810 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, 811 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis, 812 boolean copyFile) throws IOException { 813 ClientServiceCallable<byte[]> serviceCallable = 814 buildClientServiceCallable(conn, tableName, first, lqis, copyFile); 815 return tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); 816 } 817 818 /** 819 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 820 * hfiles that need to be retried. If it is successful it will return an empty list. 821 * <p> 822 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 823 * and fails atomically. 824 * <p> 825 * Protected for testing. 826 * @return empty list if success, list of items to retry on recoverable failure 827 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 828 */ 829 @Deprecated 830 @VisibleForTesting 831 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 832 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 833 throws IOException { 834 List<LoadQueueItem> toRetry = new ArrayList<>(); 835 try { 836 Configuration conf = getConf(); 837 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 838 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 839 if (region == null) { 840 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + 841 " into table " + tableName + " with files " + lqis + 842 " failed. This is recoverable and they will be retried."); 843 toRetry.addAll(lqis); // return lqi's to retry 844 } 845 // success 846 return toRetry; 847 } catch (IOException e) { 848 LOG.error("Encountered unrecoverable error from region server, additional details: " + 849 serviceCallable.getExceptionMessageAdditionalDetail(), 850 e); 851 LOG.warn( 852 "Received a " + e.getClass().getSimpleName() 853 + " from region server: " 854 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 855 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 856 && numRetries.get() < getConf().getInt( 857 HConstants.HBASE_CLIENT_RETRIES_NUMBER, 858 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 859 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 860 + numRetries.incrementAndGet()); 861 toRetry.addAll(lqis); 862 return toRetry; 863 } 864 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 865 throw e; 866 } 867 } 868 869 /** 870 * If the table is created for the first time, then "completebulkload" reads the files twice. More 871 * modifications necessary if we want to avoid doing it. 872 */ 873 private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { 874 final FileSystem fs = hfofDir.getFileSystem(getConf()); 875 876 // Add column families 877 // Build a set of keys 878 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 879 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 880 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 881 @Override 882 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 883 ColumnFamilyDescriptorBuilder builder = 884 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 885 familyBuilders.add(builder); 886 return builder; 887 } 888 889 @Override 890 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 891 throws IOException { 892 Path hfile = hfileStatus.getPath(); 893 try (HFile.Reader reader = 894 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 895 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 896 builder.setCompressionType(reader.getFileContext().getCompression()); 897 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + 898 " for family " + builder.getNameAsString()); 899 } 900 byte[] first = reader.getFirstRowKey().get(); 901 byte[] last = reader.getLastRowKey().get(); 902 903 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + 904 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 905 906 // To eventually infer start key-end key boundaries 907 Integer value = map.containsKey(first) ? map.get(first) : 0; 908 map.put(first, value + 1); 909 910 value = map.containsKey(last) ? map.get(last) : 0; 911 map.put(last, value - 1); 912 } 913 } 914 }); 915 916 byte[][] keys = inferBoundaries(map); 917 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 918 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 919 .forEachOrdered(tdBuilder::setColumnFamily); 920 admin.createTable(tdBuilder.build(), keys); 921 922 LOG.info("Table " + tableName + " is available!!"); 923 } 924 925 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 926 SecureBulkLoadClient secureClient) throws IOException { 927 fsDelegationToken.releaseDelegationToken(); 928 if (bulkToken != null && secureClient != null) { 929 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 930 } 931 if (pool != null) { 932 pool.shutdown(); 933 } 934 if (!queue.isEmpty()) { 935 StringBuilder err = new StringBuilder(); 936 err.append("-------------------------------------------------\n"); 937 err.append("Bulk load aborted with some files not yet loaded:\n"); 938 err.append("-------------------------------------------------\n"); 939 for (LoadQueueItem q : queue) { 940 err.append(" ").append(q.getFilePath()).append('\n'); 941 } 942 LOG.error(err.toString()); 943 } 944 } 945 946 // unique file name for the table 947 private String getUniqueName() { 948 return UUID.randomUUID().toString().replaceAll("-", ""); 949 } 950 951 /** 952 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 953 */ 954 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 955 throws IOException { 956 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 957 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 958 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 959 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 960 if (unmatchedFamilies.size() > 0) { 961 String msg = 962 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + 963 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + 964 familyNames; 965 LOG.error(msg); 966 if (!silence) { 967 throw new IOException(msg); 968 } 969 } 970 } 971 972 /** 973 * Populate the Queue with given HFiles 974 */ 975 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 976 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 977 } 978 979 /** 980 * Walk the given directory for all HFiles, and return a Queue containing all such files. 981 */ 982 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 983 final boolean validateHFile) throws IOException { 984 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 985 @Override 986 public byte[] bulkFamily(final byte[] familyName) { 987 return familyName; 988 } 989 990 @Override 991 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 992 long length = hfile.getLen(); 993 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 994 HConstants.DEFAULT_MAX_FILE_SIZE)) { 995 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + 996 " bytes can be problematic as it may lead to oversplitting."); 997 } 998 ret.add(new LoadQueueItem(family, hfile.getPath())); 999 } 1000 }, validateHFile); 1001 } 1002 1003 private interface BulkHFileVisitor<TFamily> { 1004 1005 TFamily bulkFamily(byte[] familyName) throws IOException; 1006 1007 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 1008 } 1009 1010 /** 1011 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 1012 * non-valid hfiles. 1013 */ 1014 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 1015 final BulkHFileVisitor<TFamily> visitor) throws IOException { 1016 visitBulkHFiles(fs, bulkDir, visitor, true); 1017 } 1018 1019 /** 1020 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 1021 * skip non-valid hfiles by default, or skip this validation by setting 1022 * 'hbase.loadincremental.validate.hfile' to false. 1023 */ 1024 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 1025 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 1026 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 1027 for (FileStatus familyStat : familyDirStatuses) { 1028 if (!familyStat.isDirectory()) { 1029 LOG.warn("Skipping non-directory " + familyStat.getPath()); 1030 continue; 1031 } 1032 Path familyDir = familyStat.getPath(); 1033 byte[] familyName = Bytes.toBytes(familyDir.getName()); 1034 // Skip invalid family 1035 try { 1036 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 1037 } catch (IllegalArgumentException e) { 1038 LOG.warn("Skipping invalid " + familyStat.getPath()); 1039 continue; 1040 } 1041 TFamily family = visitor.bulkFamily(familyName); 1042 1043 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1044 for (FileStatus hfileStatus : hfileStatuses) { 1045 if (!fs.isFile(hfileStatus.getPath())) { 1046 LOG.warn("Skipping non-file " + hfileStatus); 1047 continue; 1048 } 1049 1050 Path hfile = hfileStatus.getPath(); 1051 // Skip "_", reference, HFileLink 1052 String fileName = hfile.getName(); 1053 if (fileName.startsWith("_")) { 1054 continue; 1055 } 1056 if (StoreFileInfo.isReference(fileName)) { 1057 LOG.warn("Skipping reference " + fileName); 1058 continue; 1059 } 1060 if (HFileLink.isHFileLink(fileName)) { 1061 LOG.warn("Skipping HFileLink " + fileName); 1062 continue; 1063 } 1064 1065 // Validate HFile Format if needed 1066 if (validateHFile) { 1067 try { 1068 if (!HFile.isHFileFormat(fs, hfile)) { 1069 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1070 continue; 1071 } 1072 } catch (FileNotFoundException e) { 1073 LOG.warn("the file " + hfile + " was removed"); 1074 continue; 1075 } 1076 } 1077 1078 visitor.bulkHFile(family, hfileStatus); 1079 } 1080 } 1081 } 1082 1083 // Initialize a thread pool 1084 private ExecutorService createExecutorService() { 1085 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1086 new LinkedBlockingQueue<>(), 1087 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1088 pool.allowCoreThreadTimeOut(true); 1089 return pool; 1090 } 1091 1092 private final String toString(List<Pair<byte[], String>> list) { 1093 StringBuilder sb = new StringBuilder(); 1094 sb.append('['); 1095 list.forEach(p -> { 1096 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1097 .append('}'); 1098 }); 1099 sb.append(']'); 1100 return sb.toString(); 1101 } 1102 1103 private boolean isSecureBulkLoadEndpointAvailable() { 1104 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1105 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1106 } 1107 1108 /** 1109 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1110 * filters, etc. 1111 */ 1112 @VisibleForTesting 1113 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1114 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1115 // Open reader with no block cache, and not in-memory 1116 Reference topReference = Reference.createTopReference(splitKey); 1117 Reference bottomReference = Reference.createBottomReference(splitKey); 1118 1119 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1120 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1121 } 1122 1123 /** 1124 * Copy half of an HFile into a new HFile. 1125 */ 1126 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1127 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1128 FileSystem fs = inFile.getFileSystem(conf); 1129 CacheConfig cacheConf = CacheConfig.DISABLED; 1130 HalfStoreFileReader halfReader = null; 1131 StoreFileWriter halfWriter = null; 1132 try { 1133 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 1134 HFileInfo hfile = new HFileInfo(context, conf); 1135 halfReader = 1136 new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf); 1137 hfile.initMetaAndIndex(halfReader.getHFileReader()); 1138 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1139 1140 int blocksize = familyDescriptor.getBlocksize(); 1141 Algorithm compression = familyDescriptor.getCompressionType(); 1142 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1143 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1144 .withChecksumType(HStore.getChecksumType(conf)) 1145 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1146 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1147 .build(); 1148 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1149 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1150 HFileScanner scanner = halfReader.getScanner(false, false, false); 1151 scanner.seekTo(); 1152 do { 1153 halfWriter.append(scanner.getCell()); 1154 } while (scanner.next()); 1155 1156 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1157 if (shouldCopyHFileMetaKey(entry.getKey())) { 1158 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1159 } 1160 } 1161 } finally { 1162 if (halfReader != null) { 1163 try { 1164 halfReader.close(cacheConf.shouldEvictOnClose()); 1165 } catch (IOException e) { 1166 LOG.warn("failed to close hfile reader for " + inFile, e); 1167 } 1168 } 1169 if (halfWriter != null) { 1170 halfWriter.close(); 1171 } 1172 1173 } 1174 } 1175 1176 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1177 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1178 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1179 return false; 1180 } 1181 1182 return !HFileInfo.isReservedFileInfoKey(key); 1183 } 1184 1185 private boolean isCreateTable() { 1186 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1187 } 1188 1189 private boolean isSilence() { 1190 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1191 } 1192 1193 private boolean isAlwaysCopyFiles() { 1194 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1195 } 1196 1197 protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName) 1198 throws IOException { 1199 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1200 Admin admin = connection.getAdmin()) { 1201 if (!admin.tableExists(tableName)) { 1202 if (isCreateTable()) { 1203 createTable(tableName, hfofDir, admin); 1204 } else { 1205 String errorMsg = format("Table '%s' does not exist.", tableName); 1206 LOG.error(errorMsg); 1207 throw new TableNotFoundException(errorMsg); 1208 } 1209 } 1210 try (Table table = connection.getTable(tableName); 1211 RegionLocator locator = connection.getRegionLocator(tableName)) { 1212 return doBulkLoad(hfofDir, admin, table, locator, isSilence(), 1213 isAlwaysCopyFiles()); 1214 } 1215 } 1216 } 1217 /** 1218 * Perform bulk load on the given table. 1219 * @param hfofDir the directory that was provided as the output path of a job using 1220 * HFileOutputFormat 1221 * @param tableName the table to load into 1222 */ 1223 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1224 throws IOException { 1225 return run(new Path(hfofDir), tableName); 1226 } 1227 1228 /** 1229 * Perform bulk load on the given table. 1230 * @param family2Files map of family to List of hfiles 1231 * @param tableName the table to load into 1232 */ 1233 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1234 TableName tableName) throws IOException { 1235 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1236 Admin admin = connection.getAdmin()) { 1237 if (!admin.tableExists(tableName)) { 1238 String errorMsg = format("Table '%s' does not exist.", tableName); 1239 LOG.error(errorMsg); 1240 throw new TableNotFoundException(errorMsg); 1241 } 1242 try (Table table = connection.getTable(tableName); 1243 RegionLocator locator = connection.getRegionLocator(tableName)) { 1244 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1245 } 1246 } 1247 } 1248 1249 @Override 1250 public int run(String[] args) throws Exception { 1251 if (args.length != 2 && args.length != 3) { 1252 usage(); 1253 return -1; 1254 } 1255 String dirPath = args[0]; 1256 TableName tableName = TableName.valueOf(args[1]); 1257 1258 1259 if (args.length == 2) { 1260 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1261 } else { 1262 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1263 FileSystem fs = FileSystem.get(getConf()); 1264 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1265 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1266 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1267 byte[] familyName = Bytes.toBytes(family); 1268 if (family2Files.containsKey(familyName)) { 1269 family2Files.get(familyName).add(path); 1270 } else { 1271 family2Files.put(familyName, Lists.newArrayList(path)); 1272 } 1273 }); 1274 } 1275 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1276 } 1277 1278 } 1279 1280 public static void main(String[] args) throws Exception { 1281 Configuration conf = HBaseConfiguration.create(); 1282 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1283 System.exit(ret); 1284 } 1285 1286 /** 1287 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1288 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1289 * property. This directory is used as a temporary directory where all files are initially 1290 * copied/moved from user given directory, set all the required file permissions and then from 1291 * their it is finally loaded into a table. This should be set only when, one would like to manage 1292 * the staging directory by itself. Otherwise this tool will handle this by itself. 1293 * @param stagingDir staging directory path 1294 */ 1295 public void setBulkToken(String stagingDir) { 1296 this.bulkToken = stagingDir; 1297 } 1298 1299 public void setClusterIds(List<String> clusterIds) { 1300 this.clusterIds = clusterIds; 1301 } 1302 1303 /** 1304 * Disables replication for these bulkloaded files. 1305 */ 1306 public void disableReplication(){ 1307 this.replicate = false; 1308 } 1309 /** 1310 * Infers region boundaries for a new table. 1311 * <p> 1312 * Parameter: <br> 1313 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1314 * <ul> 1315 * <li>If a key is a start key of a file, then it maps to +1</li> 1316 * <li>If a key is an end key of a file, then it maps to -1</li> 1317 * </ul> 1318 * <p> 1319 * Algo:<br> 1320 * <ol> 1321 * <li>Poll on the keys in order: 1322 * <ol type="a"> 1323 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1324 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1325 * boundary list.</li> 1326 * </ol> 1327 * </li> 1328 * <li>Return the boundary list.</li> 1329 * </ol> 1330 */ 1331 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1332 List<byte[]> keysArray = new ArrayList<>(); 1333 int runningValue = 0; 1334 byte[] currStartKey = null; 1335 boolean firstBoundary = true; 1336 1337 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1338 if (runningValue == 0) { 1339 currStartKey = item.getKey(); 1340 } 1341 runningValue += item.getValue(); 1342 if (runningValue == 0) { 1343 if (!firstBoundary) { 1344 keysArray.add(currStartKey); 1345 } 1346 firstBoundary = false; 1347 } 1348 } 1349 1350 return keysArray.toArray(new byte[0][]); 1351 } 1352}