001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.ExecutorService; 045import java.util.concurrent.Future; 046import java.util.concurrent.LinkedBlockingQueue; 047import java.util.concurrent.ThreadPoolExecutor; 048import java.util.concurrent.TimeUnit; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.stream.Collectors; 051import org.apache.commons.lang3.mutable.MutableInt; 052import org.apache.hadoop.conf.Configuration; 053import org.apache.hadoop.conf.Configured; 054import org.apache.hadoop.fs.FileStatus; 055import org.apache.hadoop.fs.FileSystem; 056import org.apache.hadoop.fs.Path; 057import org.apache.hadoop.fs.permission.FsPermission; 058import org.apache.hadoop.hbase.HBaseConfiguration; 059import org.apache.hadoop.hbase.HConstants; 060import org.apache.hadoop.hbase.TableName; 061import org.apache.hadoop.hbase.TableNotFoundException; 062import org.apache.hadoop.hbase.client.Admin; 063import org.apache.hadoop.hbase.client.ClientServiceCallable; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 065import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 066import org.apache.hadoop.hbase.client.Connection; 067import org.apache.hadoop.hbase.client.ConnectionFactory; 068import org.apache.hadoop.hbase.client.RegionLocator; 069import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 070import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 071import org.apache.hadoop.hbase.client.Table; 072import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 073import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 074import org.apache.hadoop.hbase.io.HFileLink; 075import org.apache.hadoop.hbase.io.HalfStoreFileReader; 076import org.apache.hadoop.hbase.io.Reference; 077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 078import org.apache.hadoop.hbase.io.hfile.CacheConfig; 079import org.apache.hadoop.hbase.io.hfile.HFile; 080import org.apache.hadoop.hbase.io.hfile.HFileContext; 081import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 082import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 083import org.apache.hadoop.hbase.io.hfile.HFileInfo; 084import org.apache.hadoop.hbase.io.hfile.HFileScanner; 085import org.apache.hadoop.hbase.io.hfile.ReaderContext; 086import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 087import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 088import org.apache.hadoop.hbase.regionserver.BloomType; 089import org.apache.hadoop.hbase.regionserver.HStore; 090import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 091import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 092import org.apache.hadoop.hbase.security.UserProvider; 093import org.apache.hadoop.hbase.security.token.FsDelegationToken; 094import org.apache.hadoop.hbase.util.Bytes; 095import org.apache.hadoop.hbase.util.FSUtils; 096import org.apache.hadoop.hbase.util.FSVisitor; 097import org.apache.hadoop.hbase.util.Pair; 098import org.apache.hadoop.util.Tool; 099import org.apache.hadoop.util.ToolRunner; 100import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 101import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 102import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 103import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 104import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 105import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 106import org.apache.yetus.audience.InterfaceAudience; 107import org.slf4j.Logger; 108import org.slf4j.LoggerFactory; 109 110/** 111 * Tool to load the output of HFileOutputFormat into an existing table. 112 * <p/> 113 * Notice that, by default, this class should be kept till 4.0.0, but as this is a bad practice that 114 * we expose an implementation class instead of an interface, we want to fix it ASAP. That's why we 115 * will remove this class completely in 3.0.0. Please change your code to use 116 * {@link BulkLoadHFiles}. 117 * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please 118 * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)} 119 * and {@link #run(String, TableName)}, as all the methods other than them will be 120 * removed with no replacement. 121 */ 122@Deprecated 123@InterfaceAudience.Public 124public class LoadIncrementalHFiles extends Configured implements Tool { 125 126 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 127 128 /** 129 * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not 130 * depend on this value. 131 */ 132 @Deprecated 133 public static final String NAME = BulkLoadHFilesTool.NAME; 134 static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION; 135 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 136 BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY; 137 private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS; 138 public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY; 139 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = 140 BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY; 141 public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES; 142 143 // We use a '.' prefix which is ignored when walking directory trees 144 // above. It is invalid family name. 145 static final String TMP_DIR = ".tmp"; 146 147 private int maxFilesPerRegionPerFamily; 148 private boolean assignSeqIds; 149 private boolean bulkLoadByFamily; 150 151 // Source delegation token 152 private FsDelegationToken fsDelegationToken; 153 private UserProvider userProvider; 154 private int nrThreads; 155 private AtomicInteger numRetries; 156 private RpcControllerFactory rpcControllerFactory; 157 158 private String bulkToken; 159 160 private List<String> clusterIds = new ArrayList<>(); 161 162 private boolean replicate = true; 163 164 /** 165 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 166 * the case where a region has split during the process of the load. When this happens, the HFile 167 * is split into two physical parts across the new region boundary, and each part is added back 168 * into the queue. The import process finishes when the queue is empty. 169 * @deprecated since 2.2.0 and will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. 170 * @see BulkLoadHFiles 171 * @see <a href="https://issues.apache.org/jira/browse/HBASE-21782">HBASE-21782</a> 172 */ 173 @InterfaceAudience.Public 174 @Deprecated 175 public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem { 176 177 public LoadQueueItem(byte[] family, Path hfilePath) { 178 super(family, hfilePath); 179 } 180 } 181 182 public LoadIncrementalHFiles(Configuration conf) { 183 // make a copy, just to be sure we're not overriding someone else's config 184 super(HBaseConfiguration.create(conf)); 185 initialize(); 186 } 187 188 public void initialize() { 189 Configuration conf = getConf(); 190 // disable blockcache for tool invocation, see HBASE-10500 191 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 192 userProvider = UserProvider.instantiate(conf); 193 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 194 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 195 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 196 bulkLoadByFamily = conf.getBoolean(BulkLoadHFiles.BULK_LOAD_HFILES_BY_FAMILY, false); 197 nrThreads = conf.getInt("hbase.loadincremental.threads.max", 198 Runtime.getRuntime().availableProcessors()); 199 numRetries = new AtomicInteger(0); 200 rpcControllerFactory = new RpcControllerFactory(conf); 201 } 202 203 private void usage() { 204 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 205 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 206 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 207 + "into an hbase table.\n" 208 + "OPTIONS (for other -D options, see source code):\n" 209 + " -D" + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 210 + "table must exist.\n" 211 + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes to ignore unmatched column families.\n" 212 + " -loadTable for when directory of files to load has a depth of 3; target table must " 213 + "exist;\n" 214 + " must be last of the options on command line.\n" 215 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 216 + "documentation.\n"); 217 } 218 219 /** 220 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 221 * passed directory and validates whether the prepared queue has all the valid table column 222 * families in it. 223 * @param hfilesDir directory containing list of hfiles to be loaded into the table 224 * @param table table to which hfiles should be loaded 225 * @param queue queue which needs to be loaded into the table 226 * @param validateHFile if true hfiles will be validated for its format 227 * @throws IOException If any I/O or network error occurred 228 */ 229 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 230 boolean validateHFile) throws IOException { 231 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 232 } 233 234 /** 235 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 236 * passed directory and validates whether the prepared queue has all the valid table column 237 * families in it. 238 * @param hfilesDir directory containing list of hfiles to be loaded into the table 239 * @param table table to which hfiles should be loaded 240 * @param queue queue which needs to be loaded into the table 241 * @param validateHFile if true hfiles will be validated for its format 242 * @param silence true to ignore unmatched column families 243 * @throws IOException If any I/O or network error occurred 244 */ 245 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 246 boolean validateHFile, boolean silence) throws IOException { 247 discoverLoadQueue(queue, hfilesDir, validateHFile); 248 validateFamiliesInHFiles(table, queue, silence); 249 } 250 251 /** 252 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 253 * passed directory and validates whether the prepared queue has all the valid table column 254 * families in it. 255 * @param map map of family to List of hfiles 256 * @param table table to which hfiles should be loaded 257 * @param queue queue which needs to be loaded into the table 258 * @param silence true to ignore unmatched column families 259 * @throws IOException If any I/O or network error occurred 260 */ 261 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 262 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 263 populateLoadQueue(queue, map); 264 validateFamiliesInHFiles(table, queue, silence); 265 } 266 267 /** 268 * Perform a bulk load of the given directory into the given pre-existing table. This method is 269 * not threadsafe. 270 * @param hfofDir the directory that was provided as the output path of a job using 271 * HFileOutputFormat 272 * @param admin the Admin 273 * @param table the table to load into 274 * @param regionLocator region locator 275 * @throws TableNotFoundException if table does not yet exist 276 */ 277 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 278 RegionLocator regionLocator) throws TableNotFoundException, IOException { 279 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 280 } 281 282 /** 283 * Perform a bulk load of the given directory into the given pre-existing table. This method is 284 * not threadsafe. 285 * @param map map of family to List of hfiles 286 * @param admin the Admin 287 * @param table the table to load into 288 * @param regionLocator region locator 289 * @param silence true to ignore unmatched column families 290 * @param copyFile always copy hfiles if true 291 * @throws TableNotFoundException if table does not yet exist 292 */ 293 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 294 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 295 throws TableNotFoundException, IOException { 296 if (!admin.isTableAvailable(regionLocator.getName())) { 297 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 298 } 299 // LQI queue does not need to be threadsafe -- all operations on this queue 300 // happen in this thread 301 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 302 ExecutorService pool = null; 303 SecureBulkLoadClient secureClient = null; 304 try { 305 prepareHFileQueue(map, table, queue, silence); 306 if (queue.isEmpty()) { 307 LOG.warn("Bulk load operation did not get any files to load"); 308 return Collections.emptyMap(); 309 } 310 pool = createExecutorService(); 311 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 312 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 313 } finally { 314 cleanup(admin, queue, pool, secureClient); 315 } 316 } 317 318 /** 319 * Perform a bulk load of the given directory into the given pre-existing table. This method is 320 * not threadsafe. 321 * @param hfofDir the directory that was provided as the output path of a job using 322 * HFileOutputFormat 323 * @param admin the Admin 324 * @param table the table to load into 325 * @param regionLocator region locator 326 * @param silence true to ignore unmatched column families 327 * @param copyFile always copy hfiles if true 328 * @throws TableNotFoundException if table does not yet exist 329 */ 330 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 331 RegionLocator regionLocator, boolean silence, boolean copyFile) 332 throws TableNotFoundException, IOException { 333 if (!admin.isTableAvailable(regionLocator.getName())) { 334 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 335 } 336 337 /* 338 * Checking hfile format is a time-consuming operation, we should have an option to skip this 339 * step when bulkloading millions of HFiles. See HBASE-13985. 340 */ 341 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 342 if (!validateHFile) { 343 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 344 "are not correct. If you fail to read data from your table after using this " + 345 "option, consider removing the files and bulkload again without this option. " + 346 "See HBASE-13985"); 347 } 348 // LQI queue does not need to be threadsafe -- all operations on this queue 349 // happen in this thread 350 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 351 ExecutorService pool = null; 352 SecureBulkLoadClient secureClient = null; 353 try { 354 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 355 356 if (queue.isEmpty()) { 357 LOG.warn( 358 "Bulk load operation did not find any files to load in directory {}. " + 359 "Does it contain files in subdirectories that correspond to column family names?", 360 (hfofDir != null ? hfofDir.toUri().toString() : "")); 361 return Collections.emptyMap(); 362 } 363 pool = createExecutorService(); 364 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 365 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 366 } finally { 367 cleanup(admin, queue, pool, secureClient); 368 } 369 } 370 371 /** 372 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 373 * <ol> 374 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 375 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 376 * </li> 377 * </ol> 378 * @param table Table to which these hfiles should be loaded to 379 * @param conn Connection to use 380 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 381 * @param startEndKeys starting and ending row keys of the region 382 */ 383 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 384 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 385 loadHFileQueue(table, conn, queue, startEndKeys, false); 386 } 387 388 /** 389 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 390 * <ol> 391 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 392 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 393 * </li> 394 * </ol> 395 * @param table Table to which these hfiles should be loaded to 396 * @param conn Connection to use 397 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 398 * @param startEndKeys starting and ending row keys of the region 399 */ 400 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 401 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 402 ExecutorService pool = null; 403 try { 404 pool = createExecutorService(); 405 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 406 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 407 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 408 } finally { 409 if (pool != null) { 410 pool.shutdown(); 411 } 412 } 413 } 414 415 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 416 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 417 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 418 int count = 0; 419 420 if (isSecureBulkLoadEndpointAvailable()) { 421 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 422 LOG.warn("Secure bulk load has been integrated into HBase core."); 423 } 424 425 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 426 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 427 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 428 429 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 430 // Assumes that region splits can happen while this occurs. 431 while (!queue.isEmpty()) { 432 // need to reload split keys each iteration. 433 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 434 if (count != 0) { 435 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " + 436 queue.size() + " files remaining to group or split"); 437 } 438 439 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 440 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 441 if (maxRetries != 0 && count >= maxRetries) { 442 throw new IOException( 443 "Retry attempted " + count + " times without completing, bailing out"); 444 } 445 count++; 446 447 // Using ByteBuffer for byte[] equality semantics 448 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 449 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 450 451 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 452 // Error is logged inside checkHFilesCountPerRegionPerFamily. 453 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + 454 " hfiles to one family of one region"); 455 } 456 457 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 458 item2RegionMap); 459 460 // NOTE: The next iteration's split / group could happen in parallel to 461 // atomic bulkloads assuming that there are splits and no merges, and 462 // that we can atomically pull out the groups we want to retry. 463 } 464 465 if (!queue.isEmpty()) { 466 throw new RuntimeException("Bulk load aborted with some files not yet loaded." + 467 "Please check log for more details."); 468 } 469 return item2RegionMap; 470 } 471 472 private Map<byte[], Collection<LoadQueueItem>> 473 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 474 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 475 itemsInRegion.forEach(item -> families2Queue 476 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 477 return families2Queue; 478 } 479 480 /** 481 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 482 * re-queued for another pass with the groupOrSplitPhase. 483 * <p> 484 * protected for testing. 485 */ 486 @InterfaceAudience.Private 487 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 488 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 489 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 490 // atomically bulk load the groups. 491 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 492 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 493 .entrySet()) { 494 byte[] first = e.getKey().array(); 495 Collection<LoadQueueItem> lqis = e.getValue(); 496 if (item2RegionMap != null) { 497 for (LoadQueueItem lqi : lqis) { 498 item2RegionMap.put(lqi, e.getKey()); 499 } 500 } 501 if (bulkLoadByFamily) { 502 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures.add(pool.submit( 503 () -> tryAtomicRegionLoad(conn, table.getName(), first, familyQueue, copyFile)))); 504 } else { 505 loadingFutures.add( 506 pool.submit(() -> tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile))); 507 } 508 } 509 510 // get all the results. 511 for (Future<List<LoadQueueItem>> future : loadingFutures) { 512 try { 513 List<LoadQueueItem> toRetry = future.get(); 514 515 if (item2RegionMap != null) { 516 for (LoadQueueItem lqi : toRetry) { 517 item2RegionMap.remove(lqi); 518 } 519 } 520 // LQIs that are requeued to be regrouped. 521 queue.addAll(toRetry); 522 523 } catch (ExecutionException e1) { 524 Throwable t = e1.getCause(); 525 if (t instanceof IOException) { 526 // At this point something unrecoverable has happened. 527 // TODO Implement bulk load recovery 528 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 529 } 530 LOG.error("Unexpected execution exception during bulk load", e1); 531 throw new IllegalStateException(t); 532 } catch (InterruptedException e1) { 533 LOG.error("Unexpected interrupted exception during bulk load", e1); 534 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 535 } 536 } 537 } 538 539 @InterfaceAudience.Private 540 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 541 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 542 List<Pair<byte[], String>> famPaths = 543 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 544 .collect(Collectors.toList()); 545 return new ClientServiceCallable<byte[]>(conn, tableName, first, 546 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 547 @Override 548 protected byte[] rpcCall() throws Exception { 549 SecureBulkLoadClient secureClient = null; 550 boolean success = false; 551 try { 552 if (LOG.isDebugEnabled()) { 553 LOG.debug("Going to connect to server " + getLocation() + " for row " + 554 Bytes.toStringBinary(getRow()) + " with hfile group " + 555 LoadIncrementalHFiles.this.toString(famPaths)); 556 } 557 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 558 try (Table table = conn.getTable(getTableName())) { 559 secureClient = new SecureBulkLoadClient(getConf(), table); 560 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 561 assignSeqIds, fsDelegationToken.getUserToken(), 562 bulkToken, copyFile, clusterIds, replicate); 563 } 564 return success ? regionName : null; 565 } finally { 566 // Best effort copying of files that might not have been imported 567 // from the staging directory back to original location 568 // in user directory 569 if (secureClient != null && !success) { 570 FileSystem targetFs = FileSystem.get(getConf()); 571 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 572 // Check to see if the source and target filesystems are the same 573 // If they are the same filesystem, we will try move the files back 574 // because previously we moved them to the staging directory. 575 if (FSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 576 for (Pair<byte[], String> el : famPaths) { 577 Path hfileStagingPath = null; 578 Path hfileOrigPath = new Path(el.getSecond()); 579 try { 580 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 581 hfileOrigPath.getName()); 582 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 583 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 584 } else if (targetFs.exists(hfileStagingPath)) { 585 LOG.debug( 586 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 587 } 588 } catch (Exception ex) { 589 LOG.debug( 590 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 591 } 592 } 593 } 594 } 595 } 596 } 597 }; 598 } 599 600 private boolean checkHFilesCountPerRegionPerFamily( 601 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 602 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 603 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 604 for (LoadQueueItem lqi : e.getValue()) { 605 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 606 count.increment(); 607 if (count.intValue() > maxFilesPerRegionPerFamily) { 608 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + 609 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + 610 " of region with start key " + Bytes.toStringBinary(e.getKey())); 611 return false; 612 } 613 } 614 } 615 return true; 616 } 617 618 /** 619 * @param table the table to load into 620 * @param pool the ExecutorService 621 * @param queue the queue for LoadQueueItem 622 * @param startEndKeys start and end keys 623 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 624 */ 625 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 626 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 627 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 628 // <region start key, LQI> need synchronized only within this scope of this 629 // phase because of the puts that happen in futures. 630 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 631 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 632 Set<String> missingHFiles = new HashSet<>(); 633 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 634 new Pair<>(regionGroups, missingHFiles); 635 636 // drain LQIs and figure out bulk load groups 637 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 638 while (!queue.isEmpty()) { 639 final LoadQueueItem item = queue.remove(); 640 641 final Callable<Pair<List<LoadQueueItem>, String>> call = 642 new Callable<Pair<List<LoadQueueItem>, String>>() { 643 @Override 644 public Pair<List<LoadQueueItem>, String> call() throws Exception { 645 Pair<List<LoadQueueItem>, String> splits = 646 groupOrSplit(regionGroups, item, table, startEndKeys); 647 return splits; 648 } 649 }; 650 splittingFutures.add(pool.submit(call)); 651 } 652 // get all the results. All grouping and splitting must finish before 653 // we can attempt the atomic loads. 654 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 655 try { 656 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 657 if (splits != null) { 658 if (splits.getFirst() != null) { 659 queue.addAll(splits.getFirst()); 660 } else { 661 missingHFiles.add(splits.getSecond()); 662 } 663 } 664 } catch (ExecutionException e1) { 665 Throwable t = e1.getCause(); 666 if (t instanceof IOException) { 667 LOG.error("IOException during splitting", e1); 668 throw (IOException) t; // would have been thrown if not parallelized, 669 } 670 LOG.error("Unexpected execution exception during splitting", e1); 671 throw new IllegalStateException(t); 672 } catch (InterruptedException e1) { 673 LOG.error("Unexpected interrupted exception during splitting", e1); 674 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 675 } 676 } 677 return pair; 678 } 679 680 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 681 byte[] startKey, byte[] splitKey) throws IOException { 682 Path hfilePath = item.getFilePath(); 683 byte[] family = item.getFamily(); 684 Path tmpDir = hfilePath.getParent(); 685 if (!tmpDir.getName().equals(TMP_DIR)) { 686 tmpDir = new Path(tmpDir, TMP_DIR); 687 } 688 689 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 690 691 String uniqueName = getUniqueName(); 692 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 693 694 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 695 Path topOut = new Path(tmpDir, uniqueName + ".top"); 696 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 697 698 FileSystem fs = tmpDir.getFileSystem(getConf()); 699 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 700 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 701 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 702 703 // Add these back at the *front* of the queue, so there's a lower 704 // chance that the region will just split again before we get there. 705 List<LoadQueueItem> lqis = new ArrayList<>(2); 706 lqis.add(new LoadQueueItem(family, botOut)); 707 lqis.add(new LoadQueueItem(family, topOut)); 708 709 // If the current item is already the result of previous splits, 710 // we don't need it anymore. Clean up to save space. 711 // It is not part of the original input files. 712 try { 713 if (tmpDir.getName().equals(TMP_DIR)) { 714 fs.delete(hfilePath, false); 715 } 716 } catch (IOException e) { 717 LOG.warn("Unable to delete temporary split file " + hfilePath); 718 } 719 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 720 return lqis; 721 } 722 723 /** 724 * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending 725 * order by start key 726 * @param key the key need to find which region belong to 727 * @return region index 728 */ 729 private int getRegionIndex(final Pair<byte[][], byte[][]> startEndKeys, byte[] key) { 730 int idx = Arrays.binarySearch(startEndKeys.getFirst(), key, Bytes.BYTES_COMPARATOR); 731 if (idx < 0) { 732 // not on boundary, returns -(insertion index). Calculate region it 733 // would be in. 734 idx = -(idx + 1) - 1; 735 } 736 return idx; 737 } 738 739 /** 740 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 741 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 742 * region. 3) if the endkey of the last region is not empty. 743 */ 744 private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]> startEndKeys, 745 TableName tableName) throws IOException { 746 if (idx < 0) { 747 throw new IOException("The first region info for table " + tableName + 748 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 749 } else if ((idx == startEndKeys.getFirst().length - 1) && 750 !Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY)) { 751 throw new IOException("The last region info for table " + tableName + 752 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 753 } else if (idx + 1 < startEndKeys.getFirst().length && 754 !(Bytes.compareTo(startEndKeys.getSecond()[idx], 755 startEndKeys.getFirst()[idx + 1]) == 0)) { 756 throw new IOException("The endkey of one region for table " + tableName + 757 " is not equal to the startkey of the next region in hbase:meta." + 758 "Please use hbck tool to fix it first."); 759 } 760 } 761 762 /** 763 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 764 * no longer fits into a region, physically splits the hfile such that the new bottom half will 765 * fit and returns the list of LQI's corresponding to the resultant hfiles. 766 * <p> 767 * protected for testing 768 * @throws IOException if an IO failure is encountered 769 */ 770 @InterfaceAudience.Private 771 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 772 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 773 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 774 Path hfilePath = item.getFilePath(); 775 Optional<byte[]> first, last; 776 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 777 CacheConfig.DISABLED, true, getConf())) { 778 first = hfr.getFirstRowKey(); 779 last = hfr.getLastRowKey(); 780 } catch (FileNotFoundException fnfe) { 781 LOG.debug("encountered", fnfe); 782 return new Pair<>(null, hfilePath.getName()); 783 } 784 785 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) 786 + " last=" + last.map(Bytes::toStringBinary)); 787 if (!first.isPresent() || !last.isPresent()) { 788 assert !first.isPresent() && !last.isPresent(); 789 // TODO what if this is due to a bad HFile? 790 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 791 return null; 792 } 793 if (Bytes.compareTo(first.get(), last.get()) > 0) { 794 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) 795 + " > " + Bytes.toStringBinary(last.get())); 796 } 797 798 int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get()); 799 checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, table.getName()); 800 boolean lastKeyInRange = 801 Bytes.compareTo(last.get(), startEndKeys.getSecond()[firstKeyRegionIdx]) < 0 || Bytes 802 .equals(startEndKeys.getSecond()[firstKeyRegionIdx], HConstants.EMPTY_BYTE_ARRAY); 803 if (!lastKeyInRange) { 804 int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get()); 805 int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) >>> 1; 806 // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger 807 // than hfile.endkey w/o this check 808 if (splitIdx != firstKeyRegionIdx) { 809 checkRegionIndexValid(splitIdx, startEndKeys, table.getName()); 810 } 811 List<LoadQueueItem> lqis = splitStoreFile(item, table, 812 startEndKeys.getFirst()[firstKeyRegionIdx], startEndKeys.getSecond()[splitIdx]); 813 return new Pair<>(lqis, null); 814 } 815 816 // group regions. 817 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[firstKeyRegionIdx]), item); 818 return null; 819 } 820 821 /** 822 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 823 * hfiles that need to be retried. If it is successful it will return an empty list. 824 * <p> 825 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 826 * and fails atomically. 827 * <p> 828 * Protected for testing. 829 * @return empty list if success, list of items to retry on recoverable failure 830 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 831 */ 832 @Deprecated 833 @InterfaceAudience.Private 834 protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn, 835 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis, 836 boolean copyFile) throws IOException { 837 ClientServiceCallable<byte[]> serviceCallable = 838 buildClientServiceCallable(conn, tableName, first, lqis, copyFile); 839 return tryAtomicRegionLoad(serviceCallable, tableName, first, lqis); 840 } 841 842 /** 843 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 844 * hfiles that need to be retried. If it is successful it will return an empty list. 845 * <p> 846 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 847 * and fails atomically. 848 * <p> 849 * Protected for testing. 850 * @return empty list if success, list of items to retry on recoverable failure 851 * @deprecated as of release 2.3.0. Use {@link BulkLoadHFiles} instead. 852 */ 853 @Deprecated 854 @InterfaceAudience.Private 855 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 856 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 857 throws IOException { 858 List<LoadQueueItem> toRetry = new ArrayList<>(); 859 try { 860 Configuration conf = getConf(); 861 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 862 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 863 if (region == null) { 864 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + 865 " into table " + tableName + " with files " + lqis + 866 " failed. This is recoverable and they will be retried."); 867 toRetry.addAll(lqis); // return lqi's to retry 868 } 869 // success 870 return toRetry; 871 } catch (IOException e) { 872 LOG.error("Encountered unrecoverable error from region server, additional details: " + 873 serviceCallable.getExceptionMessageAdditionalDetail(), 874 e); 875 LOG.warn( 876 "Received a " + e.getClass().getSimpleName() 877 + " from region server: " 878 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 879 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 880 && numRetries.get() < getConf().getInt( 881 HConstants.HBASE_CLIENT_RETRIES_NUMBER, 882 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 883 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 884 + numRetries.incrementAndGet()); 885 toRetry.addAll(lqis); 886 return toRetry; 887 } 888 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 889 throw e; 890 } 891 } 892 893 /** 894 * If the table is created for the first time, then "completebulkload" reads the files twice. More 895 * modifications necessary if we want to avoid doing it. 896 */ 897 private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException { 898 final FileSystem fs = hfofDir.getFileSystem(getConf()); 899 900 // Add column families 901 // Build a set of keys 902 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 903 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 904 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 905 @Override 906 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 907 ColumnFamilyDescriptorBuilder builder = 908 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 909 familyBuilders.add(builder); 910 return builder; 911 } 912 913 @Override 914 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 915 throws IOException { 916 Path hfile = hfileStatus.getPath(); 917 try (HFile.Reader reader = 918 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 919 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 920 builder.setCompressionType(reader.getFileContext().getCompression()); 921 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + 922 " for family " + builder.getNameAsString()); 923 } 924 byte[] first = reader.getFirstRowKey().get(); 925 byte[] last = reader.getLastRowKey().get(); 926 927 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + 928 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 929 930 // To eventually infer start key-end key boundaries 931 Integer value = map.containsKey(first) ? map.get(first) : 0; 932 map.put(first, value + 1); 933 934 value = map.containsKey(last) ? map.get(last) : 0; 935 map.put(last, value - 1); 936 } 937 } 938 }); 939 940 byte[][] keys = inferBoundaries(map); 941 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 942 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 943 .forEachOrdered(tdBuilder::setColumnFamily); 944 admin.createTable(tdBuilder.build(), keys); 945 946 LOG.info("Table " + tableName + " is available!!"); 947 } 948 949 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 950 SecureBulkLoadClient secureClient) throws IOException { 951 fsDelegationToken.releaseDelegationToken(); 952 if (bulkToken != null && secureClient != null) { 953 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 954 } 955 if (pool != null) { 956 pool.shutdown(); 957 } 958 if (!queue.isEmpty()) { 959 StringBuilder err = new StringBuilder(); 960 err.append("-------------------------------------------------\n"); 961 err.append("Bulk load aborted with some files not yet loaded:\n"); 962 err.append("-------------------------------------------------\n"); 963 for (LoadQueueItem q : queue) { 964 err.append(" ").append(q.getFilePath()).append('\n'); 965 } 966 LOG.error(err.toString()); 967 } 968 } 969 970 // unique file name for the table 971 private String getUniqueName() { 972 return UUID.randomUUID().toString().replaceAll("-", ""); 973 } 974 975 /** 976 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 977 */ 978 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 979 throws IOException { 980 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 981 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 982 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 983 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 984 if (unmatchedFamilies.size() > 0) { 985 String msg = 986 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + 987 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + 988 familyNames; 989 LOG.error(msg); 990 if (!silence) { 991 throw new IOException(msg); 992 } 993 } 994 } 995 996 /** 997 * Populate the Queue with given HFiles 998 */ 999 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 1000 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 1001 } 1002 1003 /** 1004 * Walk the given directory for all HFiles, and return a Queue containing all such files. 1005 */ 1006 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 1007 final boolean validateHFile) throws IOException { 1008 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 1009 @Override 1010 public byte[] bulkFamily(final byte[] familyName) { 1011 return familyName; 1012 } 1013 1014 @Override 1015 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 1016 long length = hfile.getLen(); 1017 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 1018 HConstants.DEFAULT_MAX_FILE_SIZE)) { 1019 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + 1020 " bytes can be problematic as it may lead to oversplitting."); 1021 } 1022 ret.add(new LoadQueueItem(family, hfile.getPath())); 1023 } 1024 }, validateHFile); 1025 } 1026 1027 private interface BulkHFileVisitor<TFamily> { 1028 1029 TFamily bulkFamily(byte[] familyName) throws IOException; 1030 1031 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 1032 } 1033 1034 /** 1035 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 1036 * non-valid hfiles. 1037 */ 1038 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 1039 final BulkHFileVisitor<TFamily> visitor) throws IOException { 1040 visitBulkHFiles(fs, bulkDir, visitor, true); 1041 } 1042 1043 /** 1044 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 1045 * skip non-valid hfiles by default, or skip this validation by setting 1046 * 'hbase.loadincremental.validate.hfile' to false. 1047 */ 1048 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 1049 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 1050 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 1051 for (FileStatus familyStat : familyDirStatuses) { 1052 if (!familyStat.isDirectory()) { 1053 LOG.warn("Skipping non-directory " + familyStat.getPath()); 1054 continue; 1055 } 1056 Path familyDir = familyStat.getPath(); 1057 byte[] familyName = Bytes.toBytes(familyDir.getName()); 1058 // Skip invalid family 1059 try { 1060 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 1061 } catch (IllegalArgumentException e) { 1062 LOG.warn("Skipping invalid " + familyStat.getPath()); 1063 continue; 1064 } 1065 TFamily family = visitor.bulkFamily(familyName); 1066 1067 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1068 for (FileStatus hfileStatus : hfileStatuses) { 1069 if (!fs.isFile(hfileStatus.getPath())) { 1070 LOG.warn("Skipping non-file " + hfileStatus); 1071 continue; 1072 } 1073 1074 Path hfile = hfileStatus.getPath(); 1075 // Skip "_", reference, HFileLink 1076 String fileName = hfile.getName(); 1077 if (fileName.startsWith("_")) { 1078 continue; 1079 } 1080 if (StoreFileInfo.isReference(fileName)) { 1081 LOG.warn("Skipping reference " + fileName); 1082 continue; 1083 } 1084 if (HFileLink.isHFileLink(fileName)) { 1085 LOG.warn("Skipping HFileLink " + fileName); 1086 continue; 1087 } 1088 1089 // Validate HFile Format if needed 1090 if (validateHFile) { 1091 try { 1092 if (!HFile.isHFileFormat(fs, hfile)) { 1093 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1094 continue; 1095 } 1096 } catch (FileNotFoundException e) { 1097 LOG.warn("the file " + hfile + " was removed"); 1098 continue; 1099 } 1100 } 1101 1102 visitor.bulkHFile(family, hfileStatus); 1103 } 1104 } 1105 } 1106 1107 // Initialize a thread pool 1108 private ExecutorService createExecutorService() { 1109 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1110 new LinkedBlockingQueue<>(), 1111 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1112 pool.allowCoreThreadTimeOut(true); 1113 return pool; 1114 } 1115 1116 private final String toString(List<Pair<byte[], String>> list) { 1117 StringBuilder sb = new StringBuilder(); 1118 sb.append('['); 1119 list.forEach(p -> { 1120 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1121 .append('}'); 1122 }); 1123 sb.append(']'); 1124 return sb.toString(); 1125 } 1126 1127 private boolean isSecureBulkLoadEndpointAvailable() { 1128 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1129 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1130 } 1131 1132 /** 1133 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1134 * filters, etc. 1135 */ 1136 @InterfaceAudience.Private 1137 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1138 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1139 // Open reader with no block cache, and not in-memory 1140 Reference topReference = Reference.createTopReference(splitKey); 1141 Reference bottomReference = Reference.createBottomReference(splitKey); 1142 1143 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1144 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1145 } 1146 1147 /** 1148 * Copy half of an HFile into a new HFile. 1149 */ 1150 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1151 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1152 FileSystem fs = inFile.getFileSystem(conf); 1153 CacheConfig cacheConf = CacheConfig.DISABLED; 1154 HalfStoreFileReader halfReader = null; 1155 StoreFileWriter halfWriter = null; 1156 try { 1157 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 1158 HFileInfo hfile = new HFileInfo(context, conf); 1159 halfReader = 1160 new HalfStoreFileReader(context, hfile, cacheConf, reference, new AtomicInteger(0), conf); 1161 hfile.initMetaAndIndex(halfReader.getHFileReader()); 1162 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1163 1164 int blocksize = familyDescriptor.getBlocksize(); 1165 Algorithm compression = familyDescriptor.getCompressionType(); 1166 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1167 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1168 .withChecksumType(HStore.getChecksumType(conf)) 1169 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1170 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1171 .build(); 1172 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1173 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1174 HFileScanner scanner = halfReader.getScanner(false, false, false); 1175 scanner.seekTo(); 1176 do { 1177 halfWriter.append(scanner.getCell()); 1178 } while (scanner.next()); 1179 1180 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1181 if (shouldCopyHFileMetaKey(entry.getKey())) { 1182 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1183 } 1184 } 1185 } finally { 1186 if (halfReader != null) { 1187 try { 1188 halfReader.close(cacheConf.shouldEvictOnClose()); 1189 } catch (IOException e) { 1190 LOG.warn("failed to close hfile reader for " + inFile, e); 1191 } 1192 } 1193 if (halfWriter != null) { 1194 halfWriter.close(); 1195 } 1196 1197 } 1198 } 1199 1200 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1201 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1202 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1203 return false; 1204 } 1205 1206 return !HFileInfo.isReservedFileInfoKey(key); 1207 } 1208 1209 private boolean isCreateTable() { 1210 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1211 } 1212 1213 private boolean isSilence() { 1214 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1215 } 1216 1217 private boolean isAlwaysCopyFiles() { 1218 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1219 } 1220 1221 protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName) 1222 throws IOException { 1223 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1224 Admin admin = connection.getAdmin()) { 1225 if (!admin.tableExists(tableName)) { 1226 if (isCreateTable()) { 1227 createTable(tableName, hfofDir, admin); 1228 } else { 1229 String errorMsg = format("Table '%s' does not exist.", tableName); 1230 LOG.error(errorMsg); 1231 throw new TableNotFoundException(errorMsg); 1232 } 1233 } 1234 try (Table table = connection.getTable(tableName); 1235 RegionLocator locator = connection.getRegionLocator(tableName)) { 1236 return doBulkLoad(hfofDir, admin, table, locator, isSilence(), 1237 isAlwaysCopyFiles()); 1238 } 1239 } 1240 } 1241 /** 1242 * Perform bulk load on the given table. 1243 * @param hfofDir the directory that was provided as the output path of a job using 1244 * HFileOutputFormat 1245 * @param tableName the table to load into 1246 */ 1247 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1248 throws IOException { 1249 return run(new Path(hfofDir), tableName); 1250 } 1251 1252 /** 1253 * Perform bulk load on the given table. 1254 * @param family2Files map of family to List of hfiles 1255 * @param tableName the table to load into 1256 */ 1257 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1258 TableName tableName) throws IOException { 1259 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1260 Admin admin = connection.getAdmin()) { 1261 if (!admin.tableExists(tableName)) { 1262 String errorMsg = format("Table '%s' does not exist.", tableName); 1263 LOG.error(errorMsg); 1264 throw new TableNotFoundException(errorMsg); 1265 } 1266 try (Table table = connection.getTable(tableName); 1267 RegionLocator locator = connection.getRegionLocator(tableName)) { 1268 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1269 } 1270 } 1271 } 1272 1273 @Override 1274 public int run(String[] args) throws Exception { 1275 if (args.length != 2 && args.length != 3) { 1276 usage(); 1277 return -1; 1278 } 1279 // Re-initialize to apply -D options from the command line parameters 1280 initialize(); 1281 String dirPath = args[0]; 1282 TableName tableName = TableName.valueOf(args[1]); 1283 if (args.length == 2) { 1284 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1285 } else { 1286 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1287 FileSystem fs = FileSystem.get(getConf()); 1288 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1289 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1290 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1291 byte[] familyName = Bytes.toBytes(family); 1292 if (family2Files.containsKey(familyName)) { 1293 family2Files.get(familyName).add(path); 1294 } else { 1295 family2Files.put(familyName, Lists.newArrayList(path)); 1296 } 1297 }); 1298 } 1299 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1300 } 1301 1302 } 1303 1304 public static void main(String[] args) throws Exception { 1305 Configuration conf = HBaseConfiguration.create(); 1306 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1307 System.exit(ret); 1308 } 1309 1310 /** 1311 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1312 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1313 * property. This directory is used as a temporary directory where all files are initially 1314 * copied/moved from user given directory, set all the required file permissions and then from 1315 * their it is finally loaded into a table. This should be set only when, one would like to manage 1316 * the staging directory by itself. Otherwise this tool will handle this by itself. 1317 * @param stagingDir staging directory path 1318 */ 1319 public void setBulkToken(String stagingDir) { 1320 this.bulkToken = stagingDir; 1321 } 1322 1323 public void setClusterIds(List<String> clusterIds) { 1324 this.clusterIds = clusterIds; 1325 } 1326 1327 /** 1328 * Disables replication for these bulkloaded files. 1329 */ 1330 public void disableReplication(){ 1331 this.replicate = false; 1332 } 1333 /** 1334 * Infers region boundaries for a new table. 1335 * <p> 1336 * Parameter: <br> 1337 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1338 * <ul> 1339 * <li>If a key is a start key of a file, then it maps to +1</li> 1340 * <li>If a key is an end key of a file, then it maps to -1</li> 1341 * </ul> 1342 * <p> 1343 * Algo:<br> 1344 * <ol> 1345 * <li>Poll on the keys in order: 1346 * <ol type="a"> 1347 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1348 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1349 * boundary list.</li> 1350 * </ol> 1351 * </li> 1352 * <li>Return the boundary list.</li> 1353 * </ol> 1354 */ 1355 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1356 List<byte[]> keysArray = new ArrayList<>(); 1357 int runningValue = 0; 1358 byte[] currStartKey = null; 1359 boolean firstBoundary = true; 1360 1361 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1362 if (runningValue == 0) { 1363 currStartKey = item.getKey(); 1364 } 1365 runningValue += item.getValue(); 1366 if (runningValue == 0) { 1367 if (!firstBoundary) { 1368 keysArray.add(currStartKey); 1369 } 1370 firstBoundary = false; 1371 } 1372 } 1373 1374 return keysArray.toArray(new byte[0][]); 1375 } 1376}