001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.nio.ByteBuffer; 026import java.util.ArrayDeque; 027import java.util.ArrayList; 028import java.util.Arrays; 029import java.util.Collection; 030import java.util.Collections; 031import java.util.Deque; 032import java.util.HashMap; 033import java.util.HashSet; 034import java.util.List; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Optional; 038import java.util.Set; 039import java.util.SortedMap; 040import java.util.TreeMap; 041import java.util.UUID; 042import java.util.concurrent.Callable; 043import java.util.concurrent.CompletableFuture; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.ExecutorService; 046import java.util.concurrent.Future; 047import java.util.concurrent.LinkedBlockingQueue; 048import java.util.concurrent.ThreadPoolExecutor; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.atomic.AtomicInteger; 051import java.util.stream.Collectors; 052import org.apache.commons.lang3.mutable.MutableInt; 053import org.apache.hadoop.conf.Configuration; 054import org.apache.hadoop.conf.Configured; 055import org.apache.hadoop.fs.FileStatus; 056import org.apache.hadoop.fs.FileSystem; 057import org.apache.hadoop.fs.Path; 058import org.apache.hadoop.fs.permission.FsPermission; 059import org.apache.hadoop.hbase.HBaseConfiguration; 060import org.apache.hadoop.hbase.HBaseInterfaceAudience; 061import org.apache.hadoop.hbase.HConstants; 062import org.apache.hadoop.hbase.TableName; 063import org.apache.hadoop.hbase.TableNotFoundException; 064import org.apache.hadoop.hbase.client.AsyncAdmin; 065import org.apache.hadoop.hbase.client.AsyncClusterConnection; 066import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 069import org.apache.hadoop.hbase.client.TableDescriptor; 070import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 071import org.apache.hadoop.hbase.io.HFileLink; 072import org.apache.hadoop.hbase.io.HalfStoreFileReader; 073import org.apache.hadoop.hbase.io.Reference; 074import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.HFile; 077import org.apache.hadoop.hbase.io.hfile.HFileContext; 078import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 080import org.apache.hadoop.hbase.io.hfile.HFileInfo; 081import org.apache.hadoop.hbase.io.hfile.HFileScanner; 082import org.apache.hadoop.hbase.io.hfile.ReaderContext; 083import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 084import org.apache.hadoop.hbase.regionserver.BloomType; 085import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 086import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 087import org.apache.hadoop.hbase.regionserver.StoreUtils; 088import org.apache.hadoop.hbase.security.UserProvider; 089import org.apache.hadoop.hbase.security.token.FsDelegationToken; 090import org.apache.hadoop.hbase.util.Bytes; 091import org.apache.hadoop.hbase.util.FSVisitor; 092import org.apache.hadoop.hbase.util.FutureUtils; 093import org.apache.hadoop.hbase.util.Pair; 094import org.apache.hadoop.util.Tool; 095import org.apache.hadoop.util.ToolRunner; 096import org.apache.yetus.audience.InterfaceAudience; 097import org.slf4j.Logger; 098import org.slf4j.LoggerFactory; 099 100import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 101import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 102import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 103import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 104import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 105import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 106 107/** 108 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a 109 * tool. 110 */ 111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 112public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool { 113 114 private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class); 115 116 public static final String NAME = "completebulkload"; 117 /** 118 * Whether to run validation on hfiles before loading. 119 */ 120 private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile"; 121 /** 122 * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because 123 * of compacting at server side 124 */ 125 public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; 126 127 public static final String FAIL_IF_NEED_SPLIT_HFILE = 128 "hbase.loadincremental.fail.if.need.split.hfile"; 129 130 // We use a '.' prefix which is ignored when walking directory trees 131 // above. It is invalid family name. 132 static final String TMP_DIR = ".tmp"; 133 134 private int maxFilesPerRegionPerFamily; 135 private boolean assignSeqIds; 136 private boolean bulkLoadByFamily; 137 138 // Source delegation token 139 private FsDelegationToken fsDelegationToken; 140 private UserProvider userProvider; 141 private int nrThreads; 142 private final AtomicInteger numRetries = new AtomicInteger(0); 143 private String bulkToken; 144 145 private List<String> clusterIds = new ArrayList<>(); 146 private boolean replicate = true; 147 private boolean failIfNeedSplitHFile = false; 148 149 public BulkLoadHFilesTool(Configuration conf) { 150 // make a copy, just to be sure we're not overriding someone else's config 151 super(new Configuration(conf)); 152 initialize(); 153 } 154 155 public void initialize() { 156 Configuration conf = getConf(); 157 // disable blockcache for tool invocation, see HBASE-10500 158 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 159 userProvider = UserProvider.instantiate(conf); 160 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 161 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 162 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 163 nrThreads = 164 conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); 165 bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false); 166 failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false); 167 } 168 169 // Initialize a thread pool 170 private ExecutorService createExecutorService() { 171 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 172 new LinkedBlockingQueue<>(), 173 new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build()); 174 pool.allowCoreThreadTimeOut(true); 175 return pool; 176 } 177 178 private boolean isCreateTable() { 179 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 180 } 181 182 private boolean isSilence() { 183 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 184 } 185 186 private boolean isAlwaysCopyFiles() { 187 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 188 } 189 190 private static boolean shouldCopyHFileMetaKey(byte[] key) { 191 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 192 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 193 return false; 194 } 195 196 return !HFileInfo.isReservedFileInfoKey(key); 197 } 198 199 /** 200 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 201 */ 202 private static void validateFamiliesInHFiles(TableDescriptor tableDesc, 203 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 204 Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies()) 205 .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); 206 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 207 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 208 if (unmatchedFamilies.size() > 0) { 209 String msg = 210 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " 211 + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName() 212 + " are: " + familyNames; 213 LOG.error(msg); 214 if (!silence) { 215 throw new IOException(msg); 216 } 217 } 218 } 219 220 /** 221 * Populate the Queue with given HFiles 222 */ 223 private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 224 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 225 } 226 227 private interface BulkHFileVisitor<TFamily> { 228 229 TFamily bulkFamily(byte[] familyName) throws IOException; 230 231 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 232 } 233 234 /** 235 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 236 * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES} 237 * to false. 238 */ 239 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 240 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 241 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 242 for (FileStatus familyStat : familyDirStatuses) { 243 if (!familyStat.isDirectory()) { 244 LOG.warn("Skipping non-directory " + familyStat.getPath()); 245 continue; 246 } 247 Path familyDir = familyStat.getPath(); 248 byte[] familyName = Bytes.toBytes(familyDir.getName()); 249 // Skip invalid family 250 try { 251 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 252 } catch (IllegalArgumentException e) { 253 LOG.warn("Skipping invalid " + familyStat.getPath()); 254 continue; 255 } 256 TFamily family = visitor.bulkFamily(familyName); 257 258 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 259 for (FileStatus hfileStatus : hfileStatuses) { 260 if (!fs.isFile(hfileStatus.getPath())) { 261 LOG.warn("Skipping non-file " + hfileStatus); 262 continue; 263 } 264 265 Path hfile = hfileStatus.getPath(); 266 // Skip "_", reference, HFileLink 267 String fileName = hfile.getName(); 268 if (fileName.startsWith("_")) { 269 continue; 270 } 271 if (StoreFileInfo.isReference(fileName)) { 272 LOG.warn("Skipping reference " + fileName); 273 continue; 274 } 275 if (HFileLink.isHFileLink(fileName)) { 276 LOG.warn("Skipping HFileLink " + fileName); 277 continue; 278 } 279 280 // Validate HFile Format if needed 281 if (validateHFile) { 282 try { 283 if (!HFile.isHFileFormat(fs, hfile)) { 284 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 285 continue; 286 } 287 } catch (FileNotFoundException e) { 288 LOG.warn("the file " + hfile + " was removed"); 289 continue; 290 } 291 } 292 293 visitor.bulkHFile(family, hfileStatus); 294 } 295 } 296 } 297 298 /** 299 * Walk the given directory for all HFiles, and return a Queue containing all such files. 300 */ 301 private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir, 302 boolean validateHFile) throws IOException { 303 visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() { 304 @Override 305 public byte[] bulkFamily(final byte[] familyName) { 306 return familyName; 307 } 308 309 @Override 310 public void bulkHFile(final byte[] family, final FileStatus hfile) { 311 long length = hfile.getLen(); 312 if ( 313 length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 314 ) { 315 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length 316 + " bytes can be problematic as it may lead to oversplitting."); 317 } 318 ret.add(new LoadQueueItem(family, hfile.getPath())); 319 } 320 }, validateHFile); 321 } 322 323 /** 324 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 325 * passed directory and validates whether the prepared queue has all the valid table column 326 * families in it. 327 * @param map map of family to List of hfiles 328 * @param tableName table to which hfiles should be loaded 329 * @param queue queue which needs to be loaded into the table 330 * @param silence true to ignore unmatched column families 331 * @throws IOException If any I/O or network error occurred 332 */ 333 public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName, 334 Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException { 335 populateLoadQueue(queue, map); 336 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 337 silence); 338 } 339 340 /** 341 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 342 * passed directory and validates whether the prepared queue has all the valid table column 343 * families in it. 344 * @param hfilesDir directory containing list of hfiles to be loaded into the table 345 * @param queue queue which needs to be loaded into the table 346 * @param validateHFile if true hfiles will be validated for its format 347 * @param silence true to ignore unmatched column families 348 * @throws IOException If any I/O or network error occurred 349 */ 350 public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn, 351 TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile, 352 boolean silence) throws IOException { 353 discoverLoadQueue(conf, queue, hfilesDir, validateHFile); 354 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 355 silence); 356 } 357 358 /** 359 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 360 * <ol> 361 * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)} 362 * </li> 363 * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)} 364 * </li> 365 * </ol> 366 * @param conn Connection to use 367 * @param tableName Table to which these hfiles should be loaded to 368 * @param queue {@code LoadQueueItem} has hfiles yet to be loaded 369 */ 370 public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName, 371 Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException { 372 ExecutorService pool = createExecutorService(); 373 try { 374 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool, 375 queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst(); 376 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); 377 } finally { 378 pool.shutdown(); 379 } 380 } 381 382 /** 383 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 384 * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To 385 * maintain row atomicity guarantees, region server side should succeed atomically and fails 386 * atomically. 387 * @param conn Connection to use 388 * @param tableName Table to which these hfiles should be loaded to 389 * @param copyFiles whether replicate to peer cluster while bulkloading 390 * @param first the start key of region 391 * @param lqis hfiles should be loaded 392 * @return empty list if success, list of items to retry on recoverable failure 393 */ 394 @InterfaceAudience.Private 395 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 396 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 397 final byte[] first, Collection<LoadQueueItem> lqis) { 398 List<Pair<byte[], String>> familyPaths = 399 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 400 .collect(Collectors.toList()); 401 CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>(); 402 FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, 403 fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), 404 (loaded, error) -> { 405 if (error != null) { 406 LOG.error("Encountered unrecoverable error from region server", error); 407 if ( 408 getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 409 && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 410 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 411 ) { 412 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 413 + numRetries.incrementAndGet()); 414 // return lqi's to retry 415 future.complete(lqis); 416 } else { 417 LOG.error(RETRY_ON_IO_EXCEPTION 418 + " is disabled or we have reached retry limit. Unable to recover"); 419 future.completeExceptionally(error); 420 } 421 } else { 422 if (loaded) { 423 future.complete(Collections.emptyList()); 424 } else { 425 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) 426 + " into table " + tableName + " with files " + lqis 427 + " failed. This is recoverable and they will be retried."); 428 // return lqi's to retry 429 future.complete(lqis); 430 } 431 } 432 }); 433 return future; 434 } 435 436 /** 437 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 438 * re-queued for another pass with the groupOrSplitPhase. 439 * <p/> 440 * protected for testing. 441 */ 442 @InterfaceAudience.Private 443 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 444 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles, 445 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 446 // atomically bulk load the groups. 447 List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>(); 448 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap() 449 .entrySet()) { 450 byte[] first = entry.getKey().array(); 451 final Collection<LoadQueueItem> lqis = entry.getValue(); 452 if (bulkLoadByFamily) { 453 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures 454 .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue))); 455 } else { 456 loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis)); 457 } 458 if (item2RegionMap != null) { 459 for (LoadQueueItem lqi : lqis) { 460 item2RegionMap.put(lqi, entry.getKey()); 461 } 462 } 463 } 464 465 // get all the results. 466 for (Future<Collection<LoadQueueItem>> future : loadingFutures) { 467 try { 468 Collection<LoadQueueItem> toRetry = future.get(); 469 470 if (item2RegionMap != null) { 471 for (LoadQueueItem lqi : toRetry) { 472 item2RegionMap.remove(lqi); 473 } 474 } 475 // LQIs that are requeued to be regrouped. 476 queue.addAll(toRetry); 477 } catch (ExecutionException e1) { 478 Throwable t = e1.getCause(); 479 if (t instanceof IOException) { 480 // At this point something unrecoverable has happened. 481 // TODO Implement bulk load recovery 482 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 483 } 484 LOG.error("Unexpected execution exception during bulk load", e1); 485 throw new IllegalStateException(t); 486 } catch (InterruptedException e1) { 487 LOG.error("Unexpected interrupted exception during bulk load", e1); 488 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 489 } 490 } 491 } 492 493 private Map<byte[], Collection<LoadQueueItem>> 494 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 495 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 496 itemsInRegion.forEach(item -> families2Queue 497 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 498 return families2Queue; 499 } 500 501 private boolean 502 checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 503 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 504 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 505 for (LoadQueueItem lqi : e.getValue()) { 506 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 507 count.increment(); 508 if (count.intValue() > maxFilesPerRegionPerFamily) { 509 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family " 510 + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key " 511 + Bytes.toStringBinary(e.getKey())); 512 return false; 513 } 514 } 515 } 516 return true; 517 } 518 519 /** 520 * @param conn the HBase cluster connection 521 * @param tableName the table name of the table to load into 522 * @param pool the ExecutorService 523 * @param queue the queue for LoadQueueItem 524 * @param startEndKeys start and end keys 525 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 526 */ 527 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 528 AsyncClusterConnection conn, TableName tableName, ExecutorService pool, 529 Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 530 // <region start key, LQI> need synchronized only within this scope of this 531 // phase because of the puts that happen in futures. 532 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 533 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 534 Set<String> missingHFiles = new HashSet<>(); 535 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 536 new Pair<>(regionGroups, missingHFiles); 537 538 // drain LQIs and figure out bulk load groups 539 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 540 while (!queue.isEmpty()) { 541 final LoadQueueItem item = queue.remove(); 542 543 final Callable<Pair<List<LoadQueueItem>, String>> call = 544 () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 545 splittingFutures.add(pool.submit(call)); 546 } 547 // get all the results. All grouping and splitting must finish before 548 // we can attempt the atomic loads. 549 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 550 try { 551 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 552 if (splits != null) { 553 if (splits.getFirst() != null) { 554 queue.addAll(splits.getFirst()); 555 } else { 556 missingHFiles.add(splits.getSecond()); 557 } 558 } 559 } catch (ExecutionException e1) { 560 Throwable t = e1.getCause(); 561 if (t instanceof IOException) { 562 LOG.error("IOException during splitting", e1); 563 throw (IOException) t; // would have been thrown if not parallelized, 564 } 565 LOG.error("Unexpected execution exception during splitting", e1); 566 throw new IllegalStateException(t); 567 } catch (InterruptedException e1) { 568 LOG.error("Unexpected interrupted exception during splitting", e1); 569 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 570 } 571 } 572 return pair; 573 } 574 575 // unique file name for the table 576 private String getUniqueName() { 577 return UUID.randomUUID().toString().replaceAll("-", ""); 578 } 579 580 private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc, 581 byte[] splitKey) throws IOException { 582 Path hfilePath = item.getFilePath(); 583 byte[] family = item.getFamily(); 584 Path tmpDir = hfilePath.getParent(); 585 if (!tmpDir.getName().equals(TMP_DIR)) { 586 tmpDir = new Path(tmpDir, TMP_DIR); 587 } 588 589 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 590 591 String uniqueName = getUniqueName(); 592 ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family); 593 594 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 595 Path topOut = new Path(tmpDir, uniqueName + ".top"); 596 splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 597 598 FileSystem fs = tmpDir.getFileSystem(getConf()); 599 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 600 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 601 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 602 603 // Add these back at the *front* of the queue, so there's a lower 604 // chance that the region will just split again before we get there. 605 List<LoadQueueItem> lqis = new ArrayList<>(2); 606 lqis.add(new LoadQueueItem(family, botOut)); 607 lqis.add(new LoadQueueItem(family, topOut)); 608 609 // If the current item is already the result of previous splits, 610 // we don't need it anymore. Clean up to save space. 611 // It is not part of the original input files. 612 try { 613 if (tmpDir.getName().equals(TMP_DIR)) { 614 fs.delete(hfilePath, false); 615 } 616 } catch (IOException e) { 617 LOG.warn("Unable to delete temporary split file " + hfilePath); 618 } 619 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 620 return lqis; 621 } 622 623 /** 624 * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending 625 * order by start key 626 * @param key the key need to find which region belong to 627 * @return region index 628 */ 629 private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) { 630 int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW), 631 (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst())); 632 if (idx < 0) { 633 // not on boundary, returns -(insertion index). Calculate region it 634 // would be in. 635 idx = -(idx + 1) - 1; 636 } 637 return idx; 638 } 639 640 /** 641 * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then 642 * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the 643 * next region. 3) if the endkey of the last region is not empty. 644 */ 645 private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys, 646 TableName tableName) throws IOException { 647 if (idx < 0) { 648 throw new IOException("The first region info for table " + tableName 649 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 650 } else if ( 651 (idx == startEndKeys.size() - 1) 652 && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY) 653 ) { 654 throw new IOException("The last region info for table " + tableName 655 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 656 } else if ( 657 idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(), 658 startEndKeys.get(idx + 1).getFirst()) == 0) 659 ) { 660 throw new IOException("The endkey of one region for table " + tableName 661 + " is not equal to the startkey of the next region in hbase:meta." 662 + "Please use hbck tool to fix it first."); 663 } 664 } 665 666 /** 667 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 668 * no longer fits into a region, physically splits the hfile such that the new bottom half will 669 * fit and returns the list of LQI's corresponding to the resultant hfiles. 670 * <p/> 671 * protected for testing 672 * @throws IOException if an IO failure is encountered 673 */ 674 @InterfaceAudience.Private 675 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 676 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 677 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 678 Path hfilePath = item.getFilePath(); 679 Optional<byte[]> first, last; 680 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 681 CacheConfig.DISABLED, true, getConf())) { 682 first = hfr.getFirstRowKey(); 683 last = hfr.getLastRowKey(); 684 } catch (FileNotFoundException fnfe) { 685 LOG.debug("encountered", fnfe); 686 return new Pair<>(null, hfilePath.getName()); 687 } 688 689 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) 690 + " last=" + last.map(Bytes::toStringBinary)); 691 if (!first.isPresent() || !last.isPresent()) { 692 assert !first.isPresent() && !last.isPresent(); 693 // TODO what if this is due to a bad HFile? 694 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 695 return null; 696 } 697 if (Bytes.compareTo(first.get(), last.get()) > 0) { 698 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) 699 + " > " + Bytes.toStringBinary(last.get())); 700 } 701 int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get()); 702 checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName); 703 boolean lastKeyInRange = 704 Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes 705 .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY); 706 if (!lastKeyInRange) { 707 if (failIfNeedSplitHFile) { 708 throw new IOException( 709 "The key range of hfile=" + hfilePath + " fits into no region. " + "And because " 710 + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps."); 711 } 712 int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get()); 713 int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2; 714 // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger 715 // than hfile.endkey w/o this check 716 if (splitIdx != firstKeyRegionIdx) { 717 checkRegionIndexValid(splitIdx, startEndKeys, tableName); 718 } 719 byte[] splitPoint = startEndKeys.get(splitIdx).getSecond(); 720 List<LoadQueueItem> lqis = 721 splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); 722 return new Pair<>(lqis, null); 723 } 724 725 // group regions. 726 regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item); 727 return null; 728 } 729 730 /** 731 * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom 732 * filters, etc. 733 */ 734 @InterfaceAudience.Private 735 static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc, 736 byte[] splitKey, Path bottomOut, Path topOut) throws IOException { 737 // Open reader with no block cache, and not in-memory 738 Reference topReference = Reference.createTopReference(splitKey); 739 Reference bottomReference = Reference.createBottomReference(splitKey); 740 741 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc); 742 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc); 743 } 744 745 /** 746 * Copy half of an HFile into a new HFile. 747 */ 748 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 749 Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException { 750 FileSystem fs = inFile.getFileSystem(conf); 751 CacheConfig cacheConf = CacheConfig.DISABLED; 752 HalfStoreFileReader halfReader = null; 753 StoreFileWriter halfWriter = null; 754 try { 755 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 756 StoreFileInfo storeFileInfo = 757 new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); 758 storeFileInfo.initHFileInfo(context); 759 halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf); 760 storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); 761 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 762 763 int blocksize = familyDescriptor.getBlocksize(); 764 Algorithm compression = familyDescriptor.getCompressionType(); 765 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 766 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 767 .withChecksumType(StoreUtils.getChecksumType(conf)) 768 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) 769 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 770 .build(); 771 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 772 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 773 HFileScanner scanner = halfReader.getScanner(false, false, false); 774 scanner.seekTo(); 775 do { 776 halfWriter.append(scanner.getCell()); 777 } while (scanner.next()); 778 779 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 780 if (shouldCopyHFileMetaKey(entry.getKey())) { 781 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 782 } 783 } 784 } finally { 785 if (halfReader != null) { 786 try { 787 halfReader.close(cacheConf.shouldEvictOnClose()); 788 } catch (IOException e) { 789 LOG.warn("failed to close hfile reader for " + inFile, e); 790 } 791 } 792 if (halfWriter != null) { 793 halfWriter.close(); 794 } 795 } 796 } 797 798 /** 799 * Infers region boundaries for a new table. 800 * <p/> 801 * Parameter: <br/> 802 * bdryMap is a map between keys to an integer belonging to {+1, -1} 803 * <ul> 804 * <li>If a key is a start key of a file, then it maps to +1</li> 805 * <li>If a key is an end key of a file, then it maps to -1</li> 806 * </ul> 807 * <p> 808 * Algo:<br/> 809 * <ol> 810 * <li>Poll on the keys in order: 811 * <ol type="a"> 812 * <li>Keep adding the mapped values to these keys (runningSum)</li> 813 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 814 * boundary list.</li> 815 * </ol> 816 * </li> 817 * <li>Return the boundary list.</li> 818 * </ol> 819 */ 820 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 821 List<byte[]> keysArray = new ArrayList<>(); 822 int runningValue = 0; 823 byte[] currStartKey = null; 824 boolean firstBoundary = true; 825 826 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 827 if (runningValue == 0) { 828 currStartKey = item.getKey(); 829 } 830 runningValue += item.getValue(); 831 if (runningValue == 0) { 832 if (!firstBoundary) { 833 keysArray.add(currStartKey); 834 } 835 firstBoundary = false; 836 } 837 } 838 839 return keysArray.toArray(new byte[0][]); 840 } 841 842 /** 843 * If the table is created for the first time, then "completebulkload" reads the files twice. More 844 * modifications necessary if we want to avoid doing it. 845 */ 846 private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException { 847 final FileSystem fs = hfofDir.getFileSystem(getConf()); 848 849 // Add column families 850 // Build a set of keys 851 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 852 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 853 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 854 @Override 855 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 856 ColumnFamilyDescriptorBuilder builder = 857 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 858 familyBuilders.add(builder); 859 return builder; 860 } 861 862 @Override 863 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 864 throws IOException { 865 Path hfile = hfileStatus.getPath(); 866 try (HFile.Reader reader = 867 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 868 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 869 builder.setCompressionType(reader.getFileContext().getCompression()); 870 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() 871 + " for family " + builder.getNameAsString()); 872 } 873 byte[] first = reader.getFirstRowKey().get(); 874 byte[] last = reader.getLastRowKey().get(); 875 876 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" 877 + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 878 879 // To eventually infer start key-end key boundaries 880 Integer value = map.getOrDefault(first, 0); 881 map.put(first, value + 1); 882 883 value = map.containsKey(last) ? map.get(last) : 0; 884 map.put(last, value - 1); 885 } 886 } 887 }, true); 888 889 byte[][] keys = inferBoundaries(map); 890 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 891 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 892 .forEachOrdered(tdBuilder::setColumnFamily); 893 FutureUtils.get(admin.createTable(tdBuilder.build(), keys)); 894 895 LOG.info("Table " + tableName + " is available!!"); 896 } 897 898 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn, 899 TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile) 900 throws IOException { 901 int count = 0; 902 903 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 904 bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName)); 905 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 906 907 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 908 // Assumes that region splits can happen while this occurs. 909 while (!queue.isEmpty()) { 910 // need to reload split keys each iteration. 911 final List<Pair<byte[], byte[]>> startEndKeys = 912 FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys()); 913 if (count != 0) { 914 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " 915 + queue.size() + " files remaining to group or split"); 916 } 917 918 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 919 maxRetries = Math.max(maxRetries, startEndKeys.size() + 1); 920 if (maxRetries != 0 && count >= maxRetries) { 921 throw new IOException( 922 "Retry attempted " + count + " times without completing, bailing out"); 923 } 924 count++; 925 926 // Using ByteBuffer for byte[] equality semantics 927 pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys); 928 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 929 930 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 931 // Error is logged inside checkHFilesCountPerRegionPerFamily. 932 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily 933 + " hfiles to one family of one region"); 934 } 935 936 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap); 937 938 // NOTE: The next iteration's split / group could happen in parallel to 939 // atomic bulkloads assuming that there are splits and no merges, and 940 // that we can atomically pull out the groups we want to retry. 941 } 942 943 return item2RegionMap; 944 } 945 946 private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue, 947 ExecutorService pool) throws IOException { 948 fsDelegationToken.releaseDelegationToken(); 949 if (bulkToken != null) { 950 conn.cleanupBulkLoad(tableName, bulkToken); 951 } 952 if (pool != null) { 953 pool.shutdown(); 954 } 955 if (!queue.isEmpty()) { 956 StringBuilder err = new StringBuilder(); 957 err.append("-------------------------------------------------\n"); 958 err.append("Bulk load aborted with some files not yet loaded:\n"); 959 err.append("-------------------------------------------------\n"); 960 for (LoadQueueItem q : queue) { 961 err.append(" ").append(q.getFilePath()).append('\n'); 962 } 963 LOG.error(err.toString()); 964 } 965 } 966 967 /** 968 * Perform a bulk load of the given map of families to hfiles into the given pre-existing table. 969 * This method is not threadsafe. 970 * @param map map of family to List of hfiles 971 * @param tableName table to load the hfiles 972 * @param silence true to ignore unmatched column families 973 * @param copyFile always copy hfiles if true 974 */ 975 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 976 TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile) 977 throws IOException { 978 tableExists(conn, tableName); 979 // LQI queue does not need to be threadsafe -- all operations on this queue 980 // happen in this thread 981 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 982 ExecutorService pool = null; 983 try { 984 prepareHFileQueue(conn, tableName, map, queue, silence); 985 if (queue.isEmpty()) { 986 LOG.warn("Bulk load operation did not get any files to load"); 987 return Collections.emptyMap(); 988 } 989 pool = createExecutorService(); 990 return performBulkLoad(conn, tableName, queue, pool, copyFile); 991 } finally { 992 cleanup(conn, tableName, queue, pool); 993 } 994 } 995 996 /** 997 * Perform a bulk load of the given directory into the given pre-existing table. This method is 998 * not threadsafe. 999 * @param tableName table to load the hfiles 1000 * @param hfofDir the directory that was provided as the output path of a job using 1001 * HFileOutputFormat 1002 * @param silence true to ignore unmatched column families 1003 * @param copyFile always copy hfiles if true 1004 */ 1005 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 1006 TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException { 1007 tableExists(conn, tableName); 1008 1009 /* 1010 * Checking hfile format is a time-consuming operation, we should have an option to skip this 1011 * step when bulkloading millions of HFiles. See HBASE-13985. 1012 */ 1013 boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true); 1014 if (!validateHFile) { 1015 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " 1016 + "are not correct. If you fail to read data from your table after using this " 1017 + "option, consider removing the files and bulkload again without this option. " 1018 + "See HBASE-13985"); 1019 } 1020 // LQI queue does not need to be threadsafe -- all operations on this queue 1021 // happen in this thread 1022 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 1023 ExecutorService pool = null; 1024 try { 1025 prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence); 1026 1027 if (queue.isEmpty()) { 1028 LOG.warn( 1029 "Bulk load operation did not find any files to load in directory {}. " 1030 + "Does it contain files in subdirectories that correspond to column family names?", 1031 (hfofDir != null ? hfofDir.toUri().toString() : "")); 1032 return Collections.emptyMap(); 1033 } 1034 pool = createExecutorService(); 1035 return performBulkLoad(conn, tableName, queue, pool, copyFile); 1036 } finally { 1037 cleanup(conn, tableName, queue, pool); 1038 } 1039 } 1040 1041 @Override 1042 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, 1043 Map<byte[], List<Path>> family2Files) throws IOException { 1044 try (AsyncClusterConnection conn = ClusterConnectionFactory 1045 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1046 return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles()); 1047 } 1048 } 1049 1050 @Override 1051 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException { 1052 try (AsyncClusterConnection conn = ClusterConnectionFactory 1053 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1054 AsyncAdmin admin = conn.getAdmin(); 1055 if (!FutureUtils.get(admin.tableExists(tableName))) { 1056 if (isCreateTable()) { 1057 createTable(tableName, dir, admin); 1058 } else { 1059 throwAndLogTableNotFoundException(tableName); 1060 } 1061 } 1062 return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles()); 1063 } 1064 } 1065 1066 /** 1067 * @throws TableNotFoundException if table does not exist. 1068 */ 1069 private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException { 1070 if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) { 1071 throwAndLogTableNotFoundException(tableName); 1072 } 1073 } 1074 1075 private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException { 1076 String errorMsg = format("Table '%s' does not exist.", tn); 1077 LOG.error(errorMsg); 1078 throw new TableNotFoundException(errorMsg); 1079 } 1080 1081 public void setBulkToken(String bulkToken) { 1082 this.bulkToken = bulkToken; 1083 } 1084 1085 public void setClusterIds(List<String> clusterIds) { 1086 this.clusterIds = clusterIds; 1087 } 1088 1089 private void usage() { 1090 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 1091 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 1092 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 1093 + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D" 1094 + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 1095 + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY 1096 + "=yes to ignore unmatched column families.\n" 1097 + " -loadTable for when directory of files to load has a depth of 3; target table must " 1098 + "exist;\n" + " must be last of the options on command line.\n" 1099 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 1100 + "documentation.\n"); 1101 } 1102 1103 @Override 1104 public int run(String[] args) throws Exception { 1105 if (args.length != 2 && args.length != 3) { 1106 usage(); 1107 return -1; 1108 } 1109 // Re-initialize to apply -D options from the command line parameters 1110 initialize(); 1111 Path dirPath = new Path(args[0]); 1112 TableName tableName = TableName.valueOf(args[1]); 1113 if (args.length == 2) { 1114 return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1; 1115 } else { 1116 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1117 FileSystem fs = FileSystem.get(getConf()); 1118 for (FileStatus regionDir : fs.listStatus(dirPath)) { 1119 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1120 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1121 byte[] familyName = Bytes.toBytes(family); 1122 if (family2Files.containsKey(familyName)) { 1123 family2Files.get(familyName).add(path); 1124 } else { 1125 family2Files.put(familyName, Lists.newArrayList(path)); 1126 } 1127 }); 1128 } 1129 return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1; 1130 } 1131 } 1132 1133 public static void main(String[] args) throws Exception { 1134 Configuration conf = HBaseConfiguration.create(); 1135 int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); 1136 System.exit(ret); 1137 } 1138 1139 @Override 1140 public void disableReplication() { 1141 this.replicate = false; 1142 } 1143 1144 @Override 1145 public boolean isReplicationDisabled() { 1146 return !this.replicate; 1147 } 1148}