001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.io.InterruptedIOException; 023import java.nio.ByteBuffer; 024import java.util.ArrayDeque; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collection; 028import java.util.Collections; 029import java.util.Deque; 030import java.util.HashMap; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Map.Entry; 035import java.util.Optional; 036import java.util.Set; 037import java.util.SortedMap; 038import java.util.TreeMap; 039import java.util.UUID; 040import java.util.concurrent.Callable; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.ExecutorService; 043import java.util.concurrent.Future; 044import java.util.concurrent.LinkedBlockingQueue; 045import java.util.concurrent.ThreadPoolExecutor; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.atomic.AtomicInteger; 048import java.util.stream.Collectors; 049import static java.lang.String.format; 050import org.apache.commons.lang3.mutable.MutableInt; 051import org.apache.hadoop.conf.Configuration; 052import org.apache.hadoop.conf.Configured; 053import org.apache.hadoop.fs.FileStatus; 054import org.apache.hadoop.fs.FileSystem; 055import org.apache.hadoop.fs.Path; 056import org.apache.hadoop.fs.permission.FsPermission; 057import org.apache.hadoop.hbase.HBaseConfiguration; 058import org.apache.hadoop.hbase.HConstants; 059import org.apache.hadoop.hbase.TableName; 060import org.apache.hadoop.hbase.TableNotFoundException; 061import org.apache.hadoop.hbase.client.Admin; 062import org.apache.hadoop.hbase.client.ClientServiceCallable; 063import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 064import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 065import org.apache.hadoop.hbase.client.Connection; 066import org.apache.hadoop.hbase.client.ConnectionFactory; 067import org.apache.hadoop.hbase.client.RegionLocator; 068import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; 069import org.apache.hadoop.hbase.client.SecureBulkLoadClient; 070import org.apache.hadoop.hbase.client.Table; 071import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 072import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; 073import org.apache.hadoop.hbase.io.HFileLink; 074import org.apache.hadoop.hbase.io.HalfStoreFileReader; 075import org.apache.hadoop.hbase.io.Reference; 076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 077import org.apache.hadoop.hbase.io.hfile.CacheConfig; 078import org.apache.hadoop.hbase.io.hfile.HFile; 079import org.apache.hadoop.hbase.io.hfile.HFileContext; 080import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 081import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 082import org.apache.hadoop.hbase.io.hfile.HFileScanner; 083import org.apache.hadoop.hbase.ipc.RpcControllerFactory; 084import org.apache.hadoop.hbase.regionserver.BloomType; 085import org.apache.hadoop.hbase.regionserver.HStore; 086import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 087import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 088import org.apache.hadoop.hbase.security.UserProvider; 089import org.apache.hadoop.hbase.security.token.FsDelegationToken; 090import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 091import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 092import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 093import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 094import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 095import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.FSHDFSUtils; 099import org.apache.hadoop.hbase.util.FSVisitor; 100import org.apache.hadoop.hbase.util.Pair; 101import org.apache.hadoop.util.Tool; 102import org.apache.hadoop.util.ToolRunner; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107/** 108 * Tool to load the output of HFileOutputFormat into an existing table. 109 */ 110@InterfaceAudience.Public 111public class LoadIncrementalHFiles extends Configured implements Tool { 112 113 private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class); 114 115 public static final String NAME = "completebulkload"; 116 static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException"; 117 public static final String MAX_FILES_PER_REGION_PER_FAMILY = 118 "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; 119 private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; 120 public final static String CREATE_TABLE_CONF_KEY = "create.table"; 121 public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families"; 122 public final static String ALWAYS_COPY_FILES = "always.copy.files"; 123 124 // We use a '.' prefix which is ignored when walking directory trees 125 // above. It is invalid family name. 126 static final String TMP_DIR = ".tmp"; 127 128 private final int maxFilesPerRegionPerFamily; 129 private final boolean assignSeqIds; 130 131 // Source delegation token 132 private final FsDelegationToken fsDelegationToken; 133 private final UserProvider userProvider; 134 private final int nrThreads; 135 private AtomicInteger numRetries; 136 private final RpcControllerFactory rpcControllerFactory; 137 138 private String bulkToken; 139 140 /** 141 * Represents an HFile waiting to be loaded. An queue is used in this class in order to support 142 * the case where a region has split during the process of the load. When this happens, the HFile 143 * is split into two physical parts across the new region boundary, and each part is added back 144 * into the queue. The import process finishes when the queue is empty. 145 */ 146 @InterfaceAudience.Public 147 public static class LoadQueueItem { 148 private final byte[] family; 149 private final Path hfilePath; 150 151 public LoadQueueItem(byte[] family, Path hfilePath) { 152 this.family = family; 153 this.hfilePath = hfilePath; 154 } 155 156 @Override 157 public String toString() { 158 return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString(); 159 } 160 161 public byte[] getFamily() { 162 return family; 163 } 164 165 public Path getFilePath() { 166 return hfilePath; 167 } 168 } 169 170 public LoadIncrementalHFiles(Configuration conf) { 171 // make a copy, just to be sure we're not overriding someone else's config 172 super(HBaseConfiguration.create(conf)); 173 conf = getConf(); 174 // disable blockcache for tool invocation, see HBASE-10500 175 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 176 userProvider = UserProvider.instantiate(conf); 177 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 178 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 179 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 180 nrThreads = conf.getInt("hbase.loadincremental.threads.max", 181 Runtime.getRuntime().availableProcessors()); 182 numRetries = new AtomicInteger(0); 183 rpcControllerFactory = new RpcControllerFactory(conf); 184 } 185 186 private void usage() { 187 System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable" 188 + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by " 189 + "this tool\n Note: if you set this to 'no', then the target table must already exist " 190 + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you" 191 + " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used " 192 + "to ignore unmatched column families\n" + 193 "\n"); 194 } 195 196 /** 197 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 198 * passed directory and validates whether the prepared queue has all the valid table column 199 * families in it. 200 * @param hfilesDir directory containing list of hfiles to be loaded into the table 201 * @param table table to which hfiles should be loaded 202 * @param queue queue which needs to be loaded into the table 203 * @param validateHFile if true hfiles will be validated for its format 204 * @throws IOException If any I/O or network error occurred 205 */ 206 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 207 boolean validateHFile) throws IOException { 208 prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); 209 } 210 211 /** 212 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 213 * passed directory and validates whether the prepared queue has all the valid table column 214 * families in it. 215 * @param hfilesDir directory containing list of hfiles to be loaded into the table 216 * @param table table to which hfiles should be loaded 217 * @param queue queue which needs to be loaded into the table 218 * @param validateHFile if true hfiles will be validated for its format 219 * @param silence true to ignore unmatched column families 220 * @throws IOException If any I/O or network error occurred 221 */ 222 public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue, 223 boolean validateHFile, boolean silence) throws IOException { 224 discoverLoadQueue(queue, hfilesDir, validateHFile); 225 validateFamiliesInHFiles(table, queue, silence); 226 } 227 228 /** 229 * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the 230 * passed directory and validates whether the prepared queue has all the valid table column 231 * families in it. 232 * @param map map of family to List of hfiles 233 * @param table table to which hfiles should be loaded 234 * @param queue queue which needs to be loaded into the table 235 * @param silence true to ignore unmatched column families 236 * @throws IOException If any I/O or network error occurred 237 */ 238 public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table, 239 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 240 populateLoadQueue(queue, map); 241 validateFamiliesInHFiles(table, queue, silence); 242 } 243 244 /** 245 * Perform a bulk load of the given directory into the given pre-existing table. This method is 246 * not threadsafe. 247 * @param hfofDir the directory that was provided as the output path of a job using 248 * HFileOutputFormat 249 * @param admin the Admin 250 * @param table the table to load into 251 * @param regionLocator region locator 252 * @throws TableNotFoundException if table does not yet exist 253 */ 254 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 255 RegionLocator regionLocator) throws TableNotFoundException, IOException { 256 return doBulkLoad(hfofDir, admin, table, regionLocator, false, false); 257 } 258 259 /** 260 * Perform a bulk load of the given directory into the given pre-existing table. This method is 261 * not threadsafe. 262 * @param map map of family to List of hfiles 263 * @param admin the Admin 264 * @param table the table to load into 265 * @param regionLocator region locator 266 * @param silence true to ignore unmatched column families 267 * @param copyFile always copy hfiles if true 268 * @throws TableNotFoundException if table does not yet exist 269 */ 270 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, 271 Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) 272 throws TableNotFoundException, IOException { 273 if (!admin.isTableAvailable(regionLocator.getName())) { 274 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 275 } 276 // LQI queue does not need to be threadsafe -- all operations on this queue 277 // happen in this thread 278 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 279 ExecutorService pool = null; 280 SecureBulkLoadClient secureClient = null; 281 try { 282 prepareHFileQueue(map, table, queue, silence); 283 if (queue.isEmpty()) { 284 LOG.warn("Bulk load operation did not get any files to load"); 285 return Collections.emptyMap(); 286 } 287 pool = createExecutorService(); 288 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 289 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 290 } finally { 291 cleanup(admin, queue, pool, secureClient); 292 } 293 } 294 295 /** 296 * Perform a bulk load of the given directory into the given pre-existing table. This method is 297 * not threadsafe. 298 * @param hfofDir the directory that was provided as the output path of a job using 299 * HFileOutputFormat 300 * @param admin the Admin 301 * @param table the table to load into 302 * @param regionLocator region locator 303 * @param silence true to ignore unmatched column families 304 * @param copyFile always copy hfiles if true 305 * @throws TableNotFoundException if table does not yet exist 306 */ 307 public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table, 308 RegionLocator regionLocator, boolean silence, boolean copyFile) 309 throws TableNotFoundException, IOException { 310 if (!admin.isTableAvailable(regionLocator.getName())) { 311 throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); 312 } 313 314 /* 315 * Checking hfile format is a time-consuming operation, we should have an option to skip this 316 * step when bulkloading millions of HFiles. See HBASE-13985. 317 */ 318 boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); 319 if (!validateHFile) { 320 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + 321 "are not correct. If you fail to read data from your table after using this " + 322 "option, consider removing the files and bulkload again without this option. " + 323 "See HBASE-13985"); 324 } 325 // LQI queue does not need to be threadsafe -- all operations on this queue 326 // happen in this thread 327 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 328 ExecutorService pool = null; 329 SecureBulkLoadClient secureClient = null; 330 try { 331 prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); 332 333 if (queue.isEmpty()) { 334 LOG.warn( 335 "Bulk load operation did not find any files to load in directory {}. " + 336 "Does it contain files in subdirectories that correspond to column family names?", 337 (hfofDir != null ? hfofDir.toUri().toString() : "")); 338 return Collections.emptyMap(); 339 } 340 pool = createExecutorService(); 341 secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); 342 return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); 343 } finally { 344 cleanup(admin, queue, pool, secureClient); 345 } 346 } 347 348 /** 349 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 350 * <ol> 351 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 352 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 353 * </li> 354 * </ol> 355 * @param table Table to which these hfiles should be loaded to 356 * @param conn Connection to use 357 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 358 * @param startEndKeys starting and ending row keys of the region 359 */ 360 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 361 Pair<byte[][], byte[][]> startEndKeys) throws IOException { 362 loadHFileQueue(table, conn, queue, startEndKeys, false); 363 } 364 365 /** 366 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 367 * <ol> 368 * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li> 369 * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) 370 * </li> 371 * </ol> 372 * @param table Table to which these hfiles should be loaded to 373 * @param conn Connection to use 374 * @param queue {@link LoadQueueItem} has hfiles yet to be loaded 375 * @param startEndKeys starting and ending row keys of the region 376 */ 377 public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue, 378 Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException { 379 ExecutorService pool = null; 380 try { 381 pool = createExecutorService(); 382 Multimap<ByteBuffer, LoadQueueItem> regionGroups = 383 groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); 384 bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null); 385 } finally { 386 if (pool != null) { 387 pool.shutdown(); 388 } 389 } 390 } 391 392 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table, 393 RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool, 394 SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { 395 int count = 0; 396 397 if (isSecureBulkLoadEndpointAvailable()) { 398 LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); 399 LOG.warn("Secure bulk load has been integrated into HBase core."); 400 } 401 402 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 403 bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); 404 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 405 406 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 407 // Assumes that region splits can happen while this occurs. 408 while (!queue.isEmpty()) { 409 // need to reload split keys each iteration. 410 final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys(); 411 if (count != 0) { 412 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " + 413 queue.size() + " files remaining to group or split"); 414 } 415 416 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 417 maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); 418 if (maxRetries != 0 && count >= maxRetries) { 419 throw new IOException( 420 "Retry attempted " + count + " times without completing, bailing out"); 421 } 422 count++; 423 424 // Using ByteBuffer for byte[] equality semantics 425 pair = groupOrSplitPhase(table, pool, queue, startEndKeys); 426 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 427 428 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 429 // Error is logged inside checkHFilesCountPerRegionPerFamily. 430 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + 431 " hfiles to one family of one region"); 432 } 433 434 bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile, 435 item2RegionMap); 436 437 // NOTE: The next iteration's split / group could happen in parallel to 438 // atomic bulkloads assuming that there are splits and no merges, and 439 // that we can atomically pull out the groups we want to retry. 440 } 441 442 if (!queue.isEmpty()) { 443 throw new RuntimeException("Bulk load aborted with some files not yet loaded." + 444 "Please check log for more details."); 445 } 446 return item2RegionMap; 447 } 448 449 /** 450 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 451 * re-queued for another pass with the groupOrSplitPhase. 452 * <p> 453 * protected for testing. 454 */ 455 @VisibleForTesting 456 protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool, 457 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, 458 boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 459 // atomically bulk load the groups. 460 Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>(); 461 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap() 462 .entrySet()) { 463 byte[] first = e.getKey().array(); 464 Collection<LoadQueueItem> lqis = e.getValue(); 465 466 ClientServiceCallable<byte[]> serviceCallable = 467 buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile); 468 469 Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() { 470 @Override 471 public List<LoadQueueItem> call() throws Exception { 472 List<LoadQueueItem> toRetry = 473 tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis); 474 return toRetry; 475 } 476 }; 477 if (item2RegionMap != null) { 478 for (LoadQueueItem lqi : lqis) { 479 item2RegionMap.put(lqi, e.getKey()); 480 } 481 } 482 loadingFutures.add(pool.submit(call)); 483 } 484 485 // get all the results. 486 for (Future<List<LoadQueueItem>> future : loadingFutures) { 487 try { 488 List<LoadQueueItem> toRetry = future.get(); 489 490 if (item2RegionMap != null) { 491 for (LoadQueueItem lqi : toRetry) { 492 item2RegionMap.remove(lqi); 493 } 494 } 495 // LQIs that are requeued to be regrouped. 496 queue.addAll(toRetry); 497 498 } catch (ExecutionException e1) { 499 Throwable t = e1.getCause(); 500 if (t instanceof IOException) { 501 // At this point something unrecoverable has happened. 502 // TODO Implement bulk load recovery 503 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 504 } 505 LOG.error("Unexpected execution exception during bulk load", e1); 506 throw new IllegalStateException(t); 507 } catch (InterruptedException e1) { 508 LOG.error("Unexpected interrupted exception during bulk load", e1); 509 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 510 } 511 } 512 } 513 514 @VisibleForTesting 515 protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn, 516 TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) { 517 List<Pair<byte[], String>> famPaths = 518 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 519 .collect(Collectors.toList()); 520 return new ClientServiceCallable<byte[]>(conn, tableName, first, 521 rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) { 522 @Override 523 protected byte[] rpcCall() throws Exception { 524 SecureBulkLoadClient secureClient = null; 525 boolean success = false; 526 try { 527 if (LOG.isDebugEnabled()) { 528 LOG.debug("Going to connect to server " + getLocation() + " for row " + 529 Bytes.toStringBinary(getRow()) + " with hfile group " + 530 LoadIncrementalHFiles.this.toString(famPaths)); 531 } 532 byte[] regionName = getLocation().getRegionInfo().getRegionName(); 533 try (Table table = conn.getTable(getTableName())) { 534 secureClient = new SecureBulkLoadClient(getConf(), table); 535 success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, 536 assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); 537 } 538 return success ? regionName : null; 539 } finally { 540 // Best effort copying of files that might not have been imported 541 // from the staging directory back to original location 542 // in user directory 543 if (secureClient != null && !success) { 544 FileSystem targetFs = FileSystem.get(getConf()); 545 FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf()); 546 // Check to see if the source and target filesystems are the same 547 // If they are the same filesystem, we will try move the files back 548 // because previously we moved them to the staging directory. 549 if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) { 550 for (Pair<byte[], String> el : famPaths) { 551 Path hfileStagingPath = null; 552 Path hfileOrigPath = new Path(el.getSecond()); 553 try { 554 hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())), 555 hfileOrigPath.getName()); 556 if (targetFs.rename(hfileStagingPath, hfileOrigPath)) { 557 LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath); 558 } else if (targetFs.exists(hfileStagingPath)) { 559 LOG.debug( 560 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath); 561 } 562 } catch (Exception ex) { 563 LOG.debug( 564 "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex); 565 } 566 } 567 } 568 } 569 } 570 } 571 }; 572 } 573 574 private boolean checkHFilesCountPerRegionPerFamily( 575 final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 576 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 577 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 578 for (LoadQueueItem lqi : e.getValue()) { 579 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 580 count.increment(); 581 if (count.intValue() > maxFilesPerRegionPerFamily) { 582 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + 583 " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) + 584 " of region with start key " + Bytes.toStringBinary(e.getKey())); 585 return false; 586 } 587 } 588 } 589 return true; 590 } 591 592 /** 593 * @param conn the HBase cluster connection 594 * @param tableName the table name of the table to load into 595 * @param pool the ExecutorService 596 * @param queue the queue for LoadQueueItem 597 * @param startEndKeys start and end keys 598 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 599 */ 600 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 601 final Table table, ExecutorService pool, Deque<LoadQueueItem> queue, 602 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 603 // <region start key, LQI> need synchronized only within this scope of this 604 // phase because of the puts that happen in futures. 605 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 606 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 607 Set<String> missingHFiles = new HashSet<>(); 608 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 609 new Pair<>(regionGroups, missingHFiles); 610 611 // drain LQIs and figure out bulk load groups 612 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 613 while (!queue.isEmpty()) { 614 final LoadQueueItem item = queue.remove(); 615 616 final Callable<Pair<List<LoadQueueItem>, String>> call = 617 new Callable<Pair<List<LoadQueueItem>, String>>() { 618 @Override 619 public Pair<List<LoadQueueItem>, String> call() throws Exception { 620 Pair<List<LoadQueueItem>, String> splits = 621 groupOrSplit(regionGroups, item, table, startEndKeys); 622 return splits; 623 } 624 }; 625 splittingFutures.add(pool.submit(call)); 626 } 627 // get all the results. All grouping and splitting must finish before 628 // we can attempt the atomic loads. 629 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 630 try { 631 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 632 if (splits != null) { 633 if (splits.getFirst() != null) { 634 queue.addAll(splits.getFirst()); 635 } else { 636 missingHFiles.add(splits.getSecond()); 637 } 638 } 639 } catch (ExecutionException e1) { 640 Throwable t = e1.getCause(); 641 if (t instanceof IOException) { 642 LOG.error("IOException during splitting", e1); 643 throw (IOException) t; // would have been thrown if not parallelized, 644 } 645 LOG.error("Unexpected execution exception during splitting", e1); 646 throw new IllegalStateException(t); 647 } catch (InterruptedException e1) { 648 LOG.error("Unexpected interrupted exception during splitting", e1); 649 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 650 } 651 } 652 return pair; 653 } 654 655 private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table, 656 byte[] startKey, byte[] splitKey) throws IOException { 657 Path hfilePath = item.getFilePath(); 658 byte[] family = item.getFamily(); 659 Path tmpDir = hfilePath.getParent(); 660 if (!tmpDir.getName().equals(TMP_DIR)) { 661 tmpDir = new Path(tmpDir, TMP_DIR); 662 } 663 664 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 665 666 String uniqueName = getUniqueName(); 667 ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family); 668 669 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 670 Path topOut = new Path(tmpDir, uniqueName + ".top"); 671 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 672 673 FileSystem fs = tmpDir.getFileSystem(getConf()); 674 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 675 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 676 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 677 678 // Add these back at the *front* of the queue, so there's a lower 679 // chance that the region will just split again before we get there. 680 List<LoadQueueItem> lqis = new ArrayList<>(2); 681 lqis.add(new LoadQueueItem(family, botOut)); 682 lqis.add(new LoadQueueItem(family, topOut)); 683 684 // If the current item is already the result of previous splits, 685 // we don't need it anymore. Clean up to save space. 686 // It is not part of the original input files. 687 try { 688 if (tmpDir.getName().equals(TMP_DIR)) { 689 fs.delete(hfilePath, false); 690 } 691 } catch (IOException e) { 692 LOG.warn("Unable to delete temporary split file " + hfilePath); 693 } 694 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 695 return lqis; 696 } 697 698 /** 699 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 700 * no longer fits into a region, physically splits the hfile such that the new bottom half will 701 * fit and returns the list of LQI's corresponding to the resultant hfiles. 702 * <p> 703 * protected for testing 704 * @throws IOException if an IO failure is encountered 705 */ 706 @VisibleForTesting 707 protected Pair<List<LoadQueueItem>, String> groupOrSplit( 708 Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table, 709 final Pair<byte[][], byte[][]> startEndKeys) throws IOException { 710 Path hfilePath = item.getFilePath(); 711 Optional<byte[]> first, last; 712 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 713 CacheConfig.DISABLED, true, getConf())) { 714 hfr.loadFileInfo(); 715 first = hfr.getFirstRowKey(); 716 last = hfr.getLastRowKey(); 717 } catch (FileNotFoundException fnfe) { 718 LOG.debug("encountered", fnfe); 719 return new Pair<>(null, hfilePath.getName()); 720 } 721 722 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) + 723 " last=" + last.map(Bytes::toStringBinary)); 724 if (!first.isPresent() || !last.isPresent()) { 725 assert !first.isPresent() && !last.isPresent(); 726 // TODO what if this is due to a bad HFile? 727 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 728 return null; 729 } 730 if (Bytes.compareTo(first.get(), last.get()) > 0) { 731 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) + 732 " > " + Bytes.toStringBinary(last.get())); 733 } 734 int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR); 735 if (idx < 0) { 736 // not on boundary, returns -(insertion index). Calculate region it 737 // would be in. 738 idx = -(idx + 1) - 1; 739 } 740 int indexForCallable = idx; 741 742 /** 743 * we can consider there is a region hole in following conditions. 1) if idx < 0,then first 744 * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next 745 * region. 3) if the endkey of the last region is not empty. 746 */ 747 if (indexForCallable < 0) { 748 throw new IOException("The first region info for table " + table.getName() + 749 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 750 } else if ((indexForCallable == startEndKeys.getFirst().length - 1) && 751 !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) { 752 throw new IOException("The last region info for table " + table.getName() + 753 " can't be found in hbase:meta.Please use hbck tool to fix it first."); 754 } else if (indexForCallable + 1 < startEndKeys.getFirst().length && 755 !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable], 756 startEndKeys.getFirst()[indexForCallable + 1]) == 0)) { 757 throw new IOException("The endkey of one region for table " + table.getName() + 758 " is not equal to the startkey of the next region in hbase:meta." + 759 "Please use hbck tool to fix it first."); 760 } 761 762 boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 || 763 Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY); 764 if (!lastKeyInRange) { 765 List<LoadQueueItem> lqis = splitStoreFile(item, table, 766 startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); 767 return new Pair<>(lqis, null); 768 } 769 770 // group regions. 771 regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item); 772 return null; 773 } 774 775 /** 776 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 777 * hfiles that need to be retried. If it is successful it will return an empty list. 778 * <p> 779 * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically 780 * and fails atomically. 781 * <p> 782 * Protected for testing. 783 * @return empty list if success, list of items to retry on recoverable failure 784 */ 785 @VisibleForTesting 786 protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable, 787 final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis) 788 throws IOException { 789 List<LoadQueueItem> toRetry = new ArrayList<>(); 790 try { 791 Configuration conf = getConf(); 792 byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller() 793 .callWithRetries(serviceCallable, Integer.MAX_VALUE); 794 if (region == null) { 795 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) + 796 " into table " + tableName + " with files " + lqis + 797 " failed. This is recoverable and they will be retried."); 798 toRetry.addAll(lqis); // return lqi's to retry 799 } 800 // success 801 return toRetry; 802 } catch (IOException e) { 803 LOG.error("Encountered unrecoverable error from region server, additional details: " + 804 serviceCallable.getExceptionMessageAdditionalDetail(), 805 e); 806 LOG.warn( 807 "Received a " + e.getClass().getSimpleName() 808 + " from region server: " 809 + serviceCallable.getExceptionMessageAdditionalDetail(), e); 810 if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 811 && numRetries.get() < getConf().getInt( 812 HConstants.HBASE_CLIENT_RETRIES_NUMBER, 813 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { 814 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 815 + numRetries.incrementAndGet()); 816 toRetry.addAll(lqis); 817 return toRetry; 818 } 819 LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); 820 throw e; 821 } 822 } 823 824 /** 825 * If the table is created for the first time, then "completebulkload" reads the files twice. More 826 * modifications necessary if we want to avoid doing it. 827 */ 828 private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException { 829 final Path hfofDir = new Path(dirPath); 830 final FileSystem fs = hfofDir.getFileSystem(getConf()); 831 832 // Add column families 833 // Build a set of keys 834 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 835 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 836 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 837 @Override 838 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 839 ColumnFamilyDescriptorBuilder builder = 840 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 841 familyBuilders.add(builder); 842 return builder; 843 } 844 845 @Override 846 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 847 throws IOException { 848 Path hfile = hfileStatus.getPath(); 849 try (HFile.Reader reader = 850 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 851 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 852 builder.setCompressionType(reader.getFileContext().getCompression()); 853 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() + 854 " for family " + builder.getNameAsString()); 855 } 856 reader.loadFileInfo(); 857 byte[] first = reader.getFirstRowKey().get(); 858 byte[] last = reader.getLastRowKey().get(); 859 860 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" + 861 Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 862 863 // To eventually infer start key-end key boundaries 864 Integer value = map.containsKey(first) ? map.get(first) : 0; 865 map.put(first, value + 1); 866 867 value = map.containsKey(last) ? map.get(last) : 0; 868 map.put(last, value - 1); 869 } 870 } 871 }); 872 873 byte[][] keys = inferBoundaries(map); 874 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 875 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 876 .forEachOrdered(tdBuilder::setColumnFamily); 877 admin.createTable(tdBuilder.build(), keys); 878 879 LOG.info("Table " + tableName + " is available!!"); 880 } 881 882 private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool, 883 SecureBulkLoadClient secureClient) throws IOException { 884 fsDelegationToken.releaseDelegationToken(); 885 if (bulkToken != null && secureClient != null) { 886 secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); 887 } 888 if (pool != null) { 889 pool.shutdown(); 890 } 891 if (!queue.isEmpty()) { 892 StringBuilder err = new StringBuilder(); 893 err.append("-------------------------------------------------\n"); 894 err.append("Bulk load aborted with some files not yet loaded:\n"); 895 err.append("-------------------------------------------------\n"); 896 for (LoadQueueItem q : queue) { 897 err.append(" ").append(q.getFilePath()).append('\n'); 898 } 899 LOG.error(err.toString()); 900 } 901 } 902 903 // unique file name for the table 904 private String getUniqueName() { 905 return UUID.randomUUID().toString().replaceAll("-", ""); 906 } 907 908 /** 909 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 910 */ 911 private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence) 912 throws IOException { 913 Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream() 914 .map(f -> f.getNameAsString()).collect(Collectors.toSet()); 915 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 916 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 917 if (unmatchedFamilies.size() > 0) { 918 String msg = 919 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " + 920 unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + 921 familyNames; 922 LOG.error(msg); 923 if (!silence) { 924 throw new IOException(msg); 925 } 926 } 927 } 928 929 /** 930 * Populate the Queue with given HFiles 931 */ 932 private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 933 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 934 } 935 936 /** 937 * Walk the given directory for all HFiles, and return a Queue containing all such files. 938 */ 939 private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, 940 final boolean validateHFile) throws IOException { 941 visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() { 942 @Override 943 public byte[] bulkFamily(final byte[] familyName) { 944 return familyName; 945 } 946 947 @Override 948 public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { 949 long length = hfile.getLen(); 950 if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, 951 HConstants.DEFAULT_MAX_FILE_SIZE)) { 952 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + 953 " bytes can be problematic as it may lead to oversplitting."); 954 } 955 ret.add(new LoadQueueItem(family, hfile.getPath())); 956 } 957 }, validateHFile); 958 } 959 960 private interface BulkHFileVisitor<TFamily> { 961 962 TFamily bulkFamily(byte[] familyName) throws IOException; 963 964 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 965 } 966 967 /** 968 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and 969 * non-valid hfiles. 970 */ 971 private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir, 972 final BulkHFileVisitor<TFamily> visitor) throws IOException { 973 visitBulkHFiles(fs, bulkDir, visitor, true); 974 } 975 976 /** 977 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 978 * skip non-valid hfiles by default, or skip this validation by setting 979 * 'hbase.loadincremental.validate.hfile' to false. 980 */ 981 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 982 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 983 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 984 for (FileStatus familyStat : familyDirStatuses) { 985 if (!familyStat.isDirectory()) { 986 LOG.warn("Skipping non-directory " + familyStat.getPath()); 987 continue; 988 } 989 Path familyDir = familyStat.getPath(); 990 byte[] familyName = Bytes.toBytes(familyDir.getName()); 991 // Skip invalid family 992 try { 993 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 994 } catch (IllegalArgumentException e) { 995 LOG.warn("Skipping invalid " + familyStat.getPath()); 996 continue; 997 } 998 TFamily family = visitor.bulkFamily(familyName); 999 1000 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 1001 for (FileStatus hfileStatus : hfileStatuses) { 1002 if (!fs.isFile(hfileStatus.getPath())) { 1003 LOG.warn("Skipping non-file " + hfileStatus); 1004 continue; 1005 } 1006 1007 Path hfile = hfileStatus.getPath(); 1008 // Skip "_", reference, HFileLink 1009 String fileName = hfile.getName(); 1010 if (fileName.startsWith("_")) { 1011 continue; 1012 } 1013 if (StoreFileInfo.isReference(fileName)) { 1014 LOG.warn("Skipping reference " + fileName); 1015 continue; 1016 } 1017 if (HFileLink.isHFileLink(fileName)) { 1018 LOG.warn("Skipping HFileLink " + fileName); 1019 continue; 1020 } 1021 1022 // Validate HFile Format if needed 1023 if (validateHFile) { 1024 try { 1025 if (!HFile.isHFileFormat(fs, hfile)) { 1026 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 1027 continue; 1028 } 1029 } catch (FileNotFoundException e) { 1030 LOG.warn("the file " + hfile + " was removed"); 1031 continue; 1032 } 1033 } 1034 1035 visitor.bulkHFile(family, hfileStatus); 1036 } 1037 } 1038 } 1039 1040 // Initialize a thread pool 1041 private ExecutorService createExecutorService() { 1042 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 1043 new LinkedBlockingQueue<>(), 1044 new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build()); 1045 pool.allowCoreThreadTimeOut(true); 1046 return pool; 1047 } 1048 1049 private final String toString(List<Pair<byte[], String>> list) { 1050 StringBuilder sb = new StringBuilder(); 1051 sb.append('['); 1052 list.forEach(p -> { 1053 sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond()) 1054 .append('}'); 1055 }); 1056 sb.append(']'); 1057 return sb.toString(); 1058 } 1059 1060 private boolean isSecureBulkLoadEndpointAvailable() { 1061 String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ""); 1062 return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); 1063 } 1064 1065 /** 1066 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 1067 * filters, etc. 1068 */ 1069 @VisibleForTesting 1070 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 1071 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 1072 // Open reader with no block cache, and not in-memory 1073 Reference topReference = Reference.createTopReference(splitKey); 1074 Reference bottomReference = Reference.createBottomReference(splitKey); 1075 1076 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 1077 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 1078 } 1079 1080 /** 1081 * Copy half of an HFile into a new HFile. 1082 */ 1083 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 1084 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 1085 FileSystem fs = inFile.getFileSystem(conf); 1086 CacheConfig cacheConf = CacheConfig.DISABLED; 1087 HalfStoreFileReader halfReader = null; 1088 StoreFileWriter halfWriter = null; 1089 try { 1090 halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true, 1091 new AtomicInteger(0), true, conf); 1092 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 1093 1094 int blocksize = familyDescriptor.getBlocksize(); 1095 Algorithm compression = familyDescriptor.getCompressionType(); 1096 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 1097 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 1098 .withChecksumType(HStore.getChecksumType(conf)) 1099 .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize) 1100 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 1101 .build(); 1102 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 1103 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 1104 HFileScanner scanner = halfReader.getScanner(false, false, false); 1105 scanner.seekTo(); 1106 do { 1107 halfWriter.append(scanner.getCell()); 1108 } while (scanner.next()); 1109 1110 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 1111 if (shouldCopyHFileMetaKey(entry.getKey())) { 1112 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 1113 } 1114 } 1115 } finally { 1116 if (halfReader != null) { 1117 try { 1118 halfReader.close(cacheConf.shouldEvictOnClose()); 1119 } catch (IOException e) { 1120 LOG.warn("failed to close hfile reader for " + inFile, e); 1121 } 1122 } 1123 if (halfWriter != null) { 1124 halfWriter.close(); 1125 } 1126 1127 } 1128 } 1129 1130 private static boolean shouldCopyHFileMetaKey(byte[] key) { 1131 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 1132 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 1133 return false; 1134 } 1135 1136 return !HFile.isReservedFileInfoKey(key); 1137 } 1138 1139 private boolean isCreateTable() { 1140 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 1141 } 1142 1143 private boolean isSilence() { 1144 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 1145 } 1146 1147 private boolean isAlwaysCopyFiles() { 1148 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 1149 } 1150 1151 /** 1152 * Perform bulk load on the given table. 1153 * @param hfofDir the directory that was provided as the output path of a job using 1154 * HFileOutputFormat 1155 * @param tableName the table to load into 1156 */ 1157 public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName) 1158 throws IOException { 1159 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1160 Admin admin = connection.getAdmin()) { 1161 if (!admin.tableExists(tableName)) { 1162 if (isCreateTable()) { 1163 createTable(tableName, hfofDir, admin); 1164 } else { 1165 String errorMsg = format("Table '%s' does not exist.", tableName); 1166 LOG.error(errorMsg); 1167 throw new TableNotFoundException(errorMsg); 1168 } 1169 } 1170 try (Table table = connection.getTable(tableName); 1171 RegionLocator locator = connection.getRegionLocator(tableName)) { 1172 return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), 1173 isAlwaysCopyFiles()); 1174 } 1175 } 1176 } 1177 1178 /** 1179 * Perform bulk load on the given table. 1180 * @param family2Files map of family to List of hfiles 1181 * @param tableName the table to load into 1182 */ 1183 public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files, 1184 TableName tableName) throws IOException { 1185 try (Connection connection = ConnectionFactory.createConnection(getConf()); 1186 Admin admin = connection.getAdmin()) { 1187 if (!admin.tableExists(tableName)) { 1188 String errorMsg = format("Table '%s' does not exist.", tableName); 1189 LOG.error(errorMsg); 1190 throw new TableNotFoundException(errorMsg); 1191 } 1192 try (Table table = connection.getTable(tableName); 1193 RegionLocator locator = connection.getRegionLocator(tableName)) { 1194 return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles()); 1195 } 1196 } 1197 } 1198 1199 @Override 1200 public int run(String[] args) throws Exception { 1201 if (args.length != 2 && args.length != 3) { 1202 usage(); 1203 return -1; 1204 } 1205 String dirPath = args[0]; 1206 TableName tableName = TableName.valueOf(args[1]); 1207 1208 1209 if (args.length == 2) { 1210 return !run(dirPath, tableName).isEmpty() ? 0 : -1; 1211 } else { 1212 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1213 FileSystem fs = FileSystem.get(getConf()); 1214 for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { 1215 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1216 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1217 byte[] familyName = Bytes.toBytes(family); 1218 if (family2Files.containsKey(familyName)) { 1219 family2Files.get(familyName).add(path); 1220 } else { 1221 family2Files.put(familyName, Lists.newArrayList(path)); 1222 } 1223 }); 1224 } 1225 return !run(family2Files, tableName).isEmpty() ? 0 : -1; 1226 } 1227 1228 } 1229 1230 public static void main(String[] args) throws Exception { 1231 Configuration conf = HBaseConfiguration.create(); 1232 int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args); 1233 System.exit(ret); 1234 } 1235 1236 /** 1237 * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is 1238 * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes 1239 * property. This directory is used as a temporary directory where all files are initially 1240 * copied/moved from user given directory, set all the required file permissions and then from 1241 * their it is finally loaded into a table. This should be set only when, one would like to manage 1242 * the staging directory by itself. Otherwise this tool will handle this by itself. 1243 * @param stagingDir staging directory path 1244 */ 1245 public void setBulkToken(String stagingDir) { 1246 this.bulkToken = stagingDir; 1247 } 1248 1249 /** 1250 * Infers region boundaries for a new table. 1251 * <p> 1252 * Parameter: <br> 1253 * bdryMap is a map between keys to an integer belonging to {+1, -1} 1254 * <ul> 1255 * <li>If a key is a start key of a file, then it maps to +1</li> 1256 * <li>If a key is an end key of a file, then it maps to -1</li> 1257 * </ul> 1258 * <p> 1259 * Algo:<br> 1260 * <ol> 1261 * <li>Poll on the keys in order: 1262 * <ol type="a"> 1263 * <li>Keep adding the mapped values to these keys (runningSum)</li> 1264 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 1265 * boundary list.</li> 1266 * </ol> 1267 * </li> 1268 * <li>Return the boundary list.</li> 1269 * </ol> 1270 */ 1271 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 1272 List<byte[]> keysArray = new ArrayList<>(); 1273 int runningValue = 0; 1274 byte[] currStartKey = null; 1275 boolean firstBoundary = true; 1276 1277 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 1278 if (runningValue == 0) { 1279 currStartKey = item.getKey(); 1280 } 1281 runningValue += item.getValue(); 1282 if (runningValue == 0) { 1283 if (!firstBoundary) { 1284 keysArray.add(currStartKey); 1285 } 1286 firstBoundary = false; 1287 } 1288 } 1289 1290 return keysArray.toArray(new byte[0][]); 1291 } 1292}