001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.util.ArrayDeque; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Deque; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Optional; 039import java.util.Set; 040import java.util.SortedMap; 041import java.util.TreeMap; 042import java.util.UUID; 043import java.util.concurrent.Callable; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Future; 048import java.util.concurrent.LinkedBlockingQueue; 049import java.util.concurrent.ThreadPoolExecutor; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.stream.Collectors; 053import org.apache.commons.lang3.mutable.MutableInt; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.conf.Configured; 056import org.apache.hadoop.fs.FileStatus; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellUtil; 062import org.apache.hadoop.hbase.HBaseConfiguration; 063import org.apache.hadoop.hbase.HBaseInterfaceAudience; 064import org.apache.hadoop.hbase.HConstants; 065import org.apache.hadoop.hbase.HRegionLocation; 066import org.apache.hadoop.hbase.TableName; 067import org.apache.hadoop.hbase.TableNotFoundException; 068import org.apache.hadoop.hbase.client.AsyncAdmin; 069import org.apache.hadoop.hbase.client.AsyncClusterConnection; 070import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; 071import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 074import org.apache.hadoop.hbase.client.TableDescriptor; 075import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 076import org.apache.hadoop.hbase.io.HFileLink; 077import org.apache.hadoop.hbase.io.HalfStoreFileReader; 078import org.apache.hadoop.hbase.io.Reference; 079import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 080import org.apache.hadoop.hbase.io.hfile.CacheConfig; 081import org.apache.hadoop.hbase.io.hfile.HFile; 082import org.apache.hadoop.hbase.io.hfile.HFileContext; 083import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 084import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 085import org.apache.hadoop.hbase.io.hfile.HFileInfo; 086import org.apache.hadoop.hbase.io.hfile.HFileScanner; 087import org.apache.hadoop.hbase.io.hfile.ReaderContext; 088import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 091import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 092import org.apache.hadoop.hbase.regionserver.StoreUtils; 093import org.apache.hadoop.hbase.security.UserProvider; 094import org.apache.hadoop.hbase.security.token.FsDelegationToken; 095import org.apache.hadoop.hbase.util.Bytes; 096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 097import org.apache.hadoop.hbase.util.FSVisitor; 098import org.apache.hadoop.hbase.util.FutureUtils; 099import org.apache.hadoop.hbase.util.Pair; 100import org.apache.hadoop.util.Tool; 101import org.apache.hadoop.util.ToolRunner; 102import org.apache.yetus.audience.InterfaceAudience; 103import org.slf4j.Logger; 104import org.slf4j.LoggerFactory; 105 106import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 107import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 108import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 109import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 110import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 111import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 112 113/** 114 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a 115 * tool. 116 */ 117@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 118public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool { 119 120 private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class); 121 122 /** 123 * Keep locality while generating HFiles for bulkload. See HBASE-12596 124 */ 125 public static final String LOCALITY_SENSITIVE_CONF_KEY = 126 "hbase.bulkload.locality.sensitive.enabled"; 127 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 128 129 public static final String NAME = "completebulkload"; 130 /** 131 * Whether to run validation on hfiles before loading. 132 */ 133 private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile"; 134 /** 135 * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because 136 * of compacting at server side 137 */ 138 public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; 139 140 public static final String FAIL_IF_NEED_SPLIT_HFILE = 141 "hbase.loadincremental.fail.if.need.split.hfile"; 142 143 // We use a '.' prefix which is ignored when walking directory trees 144 // above. It is invalid family name. 145 static final String TMP_DIR = ".tmp"; 146 147 private int maxFilesPerRegionPerFamily; 148 private boolean assignSeqIds; 149 private boolean bulkLoadByFamily; 150 151 // Source delegation token 152 private FsDelegationToken fsDelegationToken; 153 private UserProvider userProvider; 154 private int nrThreads; 155 private final AtomicInteger numRetries = new AtomicInteger(0); 156 private String bulkToken; 157 158 private List<String> clusterIds = new ArrayList<>(); 159 private boolean replicate = true; 160 private boolean failIfNeedSplitHFile = false; 161 162 public BulkLoadHFilesTool(Configuration conf) { 163 // make a copy, just to be sure we're not overriding someone else's config 164 super(new Configuration(conf)); 165 initialize(); 166 } 167 168 public void initialize() { 169 Configuration conf = getConf(); 170 // disable blockcache for tool invocation, see HBASE-10500 171 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 172 userProvider = UserProvider.instantiate(conf); 173 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 174 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 175 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 176 nrThreads = 177 conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); 178 bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false); 179 failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false); 180 } 181 182 // Initialize a thread pool 183 private ExecutorService createExecutorService() { 184 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 185 new LinkedBlockingQueue<>(), 186 new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build()); 187 pool.allowCoreThreadTimeOut(true); 188 return pool; 189 } 190 191 private boolean isCreateTable() { 192 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 193 } 194 195 private boolean isSilence() { 196 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 197 } 198 199 private boolean isAlwaysCopyFiles() { 200 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 201 } 202 203 private static boolean shouldCopyHFileMetaKey(byte[] key) { 204 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 205 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 206 return false; 207 } 208 209 return !HFileInfo.isReservedFileInfoKey(key); 210 } 211 212 /** 213 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 214 */ 215 private static void validateFamiliesInHFiles(TableDescriptor tableDesc, 216 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 217 Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies()) 218 .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); 219 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 220 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 221 if (unmatchedFamilies.size() > 0) { 222 String msg = 223 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " 224 + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName() 225 + " are: " + familyNames; 226 LOG.error(msg); 227 if (!silence) { 228 throw new IOException(msg); 229 } 230 } 231 } 232 233 /** 234 * Populate the Queue with given HFiles 235 */ 236 private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 237 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 238 } 239 240 private interface BulkHFileVisitor<TFamily> { 241 242 TFamily bulkFamily(byte[] familyName) throws IOException; 243 244 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 245 } 246 247 /** 248 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 249 * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES} 250 * to false. 251 */ 252 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 253 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 254 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 255 for (FileStatus familyStat : familyDirStatuses) { 256 if (!familyStat.isDirectory()) { 257 LOG.warn("Skipping non-directory " + familyStat.getPath()); 258 continue; 259 } 260 Path familyDir = familyStat.getPath(); 261 byte[] familyName = Bytes.toBytes(familyDir.getName()); 262 // Skip invalid family 263 try { 264 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 265 } catch (IllegalArgumentException e) { 266 LOG.warn("Skipping invalid " + familyStat.getPath()); 267 continue; 268 } 269 TFamily family = visitor.bulkFamily(familyName); 270 271 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 272 for (FileStatus hfileStatus : hfileStatuses) { 273 if (!fs.isFile(hfileStatus.getPath())) { 274 LOG.warn("Skipping non-file " + hfileStatus); 275 continue; 276 } 277 278 Path hfile = hfileStatus.getPath(); 279 // Skip "_", reference, HFileLink 280 String fileName = hfile.getName(); 281 if (fileName.startsWith("_")) { 282 continue; 283 } 284 if (StoreFileInfo.isReference(fileName)) { 285 LOG.warn("Skipping reference " + fileName); 286 continue; 287 } 288 if (HFileLink.isHFileLink(fileName)) { 289 LOG.warn("Skipping HFileLink " + fileName); 290 continue; 291 } 292 293 // Validate HFile Format if needed 294 if (validateHFile) { 295 try { 296 if (!HFile.isHFileFormat(fs, hfile)) { 297 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 298 continue; 299 } 300 } catch (FileNotFoundException e) { 301 LOG.warn("the file " + hfile + " was removed"); 302 continue; 303 } 304 } 305 306 visitor.bulkHFile(family, hfileStatus); 307 } 308 } 309 } 310 311 /** 312 * Walk the given directory for all HFiles, and return a Queue containing all such files. 313 */ 314 private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir, 315 boolean validateHFile) throws IOException { 316 visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() { 317 @Override 318 public byte[] bulkFamily(final byte[] familyName) { 319 return familyName; 320 } 321 322 @Override 323 public void bulkHFile(final byte[] family, final FileStatus hfile) { 324 long length = hfile.getLen(); 325 if ( 326 length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 327 ) { 328 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length 329 + " bytes can be problematic as it may lead to oversplitting."); 330 } 331 ret.add(new LoadQueueItem(family, hfile.getPath())); 332 } 333 }, validateHFile); 334 } 335 336 /** 337 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 338 * passed directory and validates whether the prepared queue has all the valid table column 339 * families in it. 340 * @param map map of family to List of hfiles 341 * @param tableName table to which hfiles should be loaded 342 * @param queue queue which needs to be loaded into the table 343 * @param silence true to ignore unmatched column families 344 * @throws IOException If any I/O or network error occurred 345 */ 346 public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName, 347 Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException { 348 populateLoadQueue(queue, map); 349 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 350 silence); 351 } 352 353 /** 354 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 355 * passed directory and validates whether the prepared queue has all the valid table column 356 * families in it. 357 * @param hfilesDir directory containing list of hfiles to be loaded into the table 358 * @param queue queue which needs to be loaded into the table 359 * @param validateHFile if true hfiles will be validated for its format 360 * @param silence true to ignore unmatched column families 361 * @throws IOException If any I/O or network error occurred 362 */ 363 public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn, 364 TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile, 365 boolean silence) throws IOException { 366 discoverLoadQueue(conf, queue, hfilesDir, validateHFile); 367 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 368 silence); 369 } 370 371 /** 372 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 373 * <ol> 374 * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)} 375 * </li> 376 * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)} 377 * </li> 378 * </ol> 379 * @param conn Connection to use 380 * @param tableName Table to which these hfiles should be loaded to 381 * @param queue {@code LoadQueueItem} has hfiles yet to be loaded 382 */ 383 public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName, 384 Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException { 385 ExecutorService pool = createExecutorService(); 386 try { 387 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool, 388 queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst(); 389 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); 390 } finally { 391 pool.shutdown(); 392 } 393 } 394 395 /** 396 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 397 * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To 398 * maintain row atomicity guarantees, region server side should succeed atomically and fails 399 * atomically. 400 * @param conn Connection to use 401 * @param tableName Table to which these hfiles should be loaded to 402 * @param copyFiles whether replicate to peer cluster while bulkloading 403 * @param first the start key of region 404 * @param lqis hfiles should be loaded 405 * @return empty list if success, list of items to retry on recoverable failure 406 */ 407 @InterfaceAudience.Private 408 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 409 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 410 final byte[] first, Collection<LoadQueueItem> lqis) { 411 List<Pair<byte[], String>> familyPaths = 412 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 413 .collect(Collectors.toList()); 414 CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>(); 415 FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, 416 fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), 417 (loaded, error) -> { 418 if (error != null) { 419 LOG.error("Encountered unrecoverable error from region server", error); 420 if ( 421 getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 422 && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 423 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 424 ) { 425 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 426 + numRetries.incrementAndGet()); 427 // return lqi's to retry 428 future.complete(lqis); 429 } else { 430 LOG.error(RETRY_ON_IO_EXCEPTION 431 + " is disabled or we have reached retry limit. Unable to recover"); 432 future.completeExceptionally(error); 433 } 434 } else { 435 if (loaded) { 436 future.complete(Collections.emptyList()); 437 } else { 438 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) 439 + " into table " + tableName + " with files " + lqis 440 + " failed. This is recoverable and they will be retried."); 441 // return lqi's to retry 442 future.complete(lqis); 443 } 444 } 445 }); 446 return future; 447 } 448 449 /** 450 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 451 * re-queued for another pass with the groupOrSplitPhase. 452 * <p/> 453 * protected for testing. 454 */ 455 @InterfaceAudience.Private 456 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 457 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles, 458 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 459 // atomically bulk load the groups. 460 List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>(); 461 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap() 462 .entrySet()) { 463 byte[] first = entry.getKey().array(); 464 final Collection<LoadQueueItem> lqis = entry.getValue(); 465 if (bulkLoadByFamily) { 466 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures 467 .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue))); 468 } else { 469 loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis)); 470 } 471 if (item2RegionMap != null) { 472 for (LoadQueueItem lqi : lqis) { 473 item2RegionMap.put(lqi, entry.getKey()); 474 } 475 } 476 } 477 478 // get all the results. 479 for (Future<Collection<LoadQueueItem>> future : loadingFutures) { 480 try { 481 Collection<LoadQueueItem> toRetry = future.get(); 482 483 if (item2RegionMap != null) { 484 for (LoadQueueItem lqi : toRetry) { 485 item2RegionMap.remove(lqi); 486 } 487 } 488 // LQIs that are requeued to be regrouped. 489 queue.addAll(toRetry); 490 } catch (ExecutionException e1) { 491 Throwable t = e1.getCause(); 492 if (t instanceof IOException) { 493 // At this point something unrecoverable has happened. 494 // TODO Implement bulk load recovery 495 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 496 } 497 LOG.error("Unexpected execution exception during bulk load", e1); 498 throw new IllegalStateException(t); 499 } catch (InterruptedException e1) { 500 LOG.error("Unexpected interrupted exception during bulk load", e1); 501 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 502 } 503 } 504 } 505 506 private Map<byte[], Collection<LoadQueueItem>> 507 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 508 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 509 itemsInRegion.forEach(item -> families2Queue 510 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 511 return families2Queue; 512 } 513 514 private boolean 515 checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 516 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 517 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 518 for (LoadQueueItem lqi : e.getValue()) { 519 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 520 count.increment(); 521 if (count.intValue() > maxFilesPerRegionPerFamily) { 522 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family " 523 + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key " 524 + Bytes.toStringBinary(e.getKey())); 525 return false; 526 } 527 } 528 } 529 return true; 530 } 531 532 /** 533 * @param conn the HBase cluster connection 534 * @param tableName the table name of the table to load into 535 * @param pool the ExecutorService 536 * @param queue the queue for LoadQueueItem 537 * @param startEndKeys start and end keys 538 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 539 */ 540 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 541 AsyncClusterConnection conn, TableName tableName, ExecutorService pool, 542 Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 543 // <region start key, LQI> need synchronized only within this scope of this 544 // phase because of the puts that happen in futures. 545 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 546 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 547 Set<String> missingHFiles = new HashSet<>(); 548 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 549 new Pair<>(regionGroups, missingHFiles); 550 551 // drain LQIs and figure out bulk load groups 552 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 553 while (!queue.isEmpty()) { 554 final LoadQueueItem item = queue.remove(); 555 final Callable<Pair<List<LoadQueueItem>, String>> call = 556 () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 557 splittingFutures.add(pool.submit(call)); 558 } 559 // get all the results. All grouping and splitting must finish before 560 // we can attempt the atomic loads. 561 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 562 try { 563 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 564 if (splits != null) { 565 if (splits.getFirst() != null) { 566 queue.addAll(splits.getFirst()); 567 } else { 568 missingHFiles.add(splits.getSecond()); 569 } 570 } 571 } catch (ExecutionException e1) { 572 Throwable t = e1.getCause(); 573 if (t instanceof IOException) { 574 LOG.error("IOException during splitting", e1); 575 throw (IOException) t; // would have been thrown if not parallelized, 576 } 577 LOG.error("Unexpected execution exception during splitting", e1); 578 throw new IllegalStateException(t); 579 } catch (InterruptedException e1) { 580 LOG.error("Unexpected interrupted exception during splitting", e1); 581 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 582 } 583 } 584 return pair; 585 } 586 587 // unique file name for the table 588 private String getUniqueName() { 589 return UUID.randomUUID().toString().replaceAll("-", ""); 590 } 591 592 private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item, 593 TableDescriptor tableDesc, byte[] splitKey) throws IOException { 594 Path hfilePath = item.getFilePath(); 595 byte[] family = item.getFamily(); 596 Path tmpDir = hfilePath.getParent(); 597 if (!tmpDir.getName().equals(TMP_DIR)) { 598 tmpDir = new Path(tmpDir, TMP_DIR); 599 } 600 601 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 602 603 String uniqueName = getUniqueName(); 604 ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family); 605 606 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 607 Path topOut = new Path(tmpDir, uniqueName + ".top"); 608 609 splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 610 611 FileSystem fs = tmpDir.getFileSystem(getConf()); 612 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 613 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 614 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 615 616 // Add these back at the *front* of the queue, so there's a lower 617 // chance that the region will just split again before we get there. 618 List<LoadQueueItem> lqis = new ArrayList<>(2); 619 lqis.add(new LoadQueueItem(family, botOut)); 620 lqis.add(new LoadQueueItem(family, topOut)); 621 622 // If the current item is already the result of previous splits, 623 // we don't need it anymore. Clean up to save space. 624 // It is not part of the original input files. 625 try { 626 if (tmpDir.getName().equals(TMP_DIR)) { 627 fs.delete(hfilePath, false); 628 } 629 } catch (IOException e) { 630 LOG.warn("Unable to delete temporary split file " + hfilePath); 631 } 632 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 633 return lqis; 634 } 635 636 /** 637 * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending 638 * order by start key 639 * @param key the key need to find which region belong to 640 * @return region index 641 */ 642 private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) { 643 int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW), 644 (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst())); 645 if (idx < 0) { 646 // not on boundary, returns -(insertion index). Calculate region it 647 // would be in. 648 idx = -(idx + 1) - 1; 649 } 650 return idx; 651 } 652 653 /** 654 * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then 655 * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the 656 * next region. 3) if the endkey of the last region is not empty. 657 */ 658 private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys, 659 TableName tableName) throws IOException { 660 if (idx < 0) { 661 throw new IOException("The first region info for table " + tableName 662 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 663 } else if ( 664 (idx == startEndKeys.size() - 1) 665 && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY) 666 ) { 667 throw new IOException("The last region info for table " + tableName 668 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 669 } else if ( 670 idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(), 671 startEndKeys.get(idx + 1).getFirst()) == 0) 672 ) { 673 throw new IOException("The endkey of one region for table " + tableName 674 + " is not equal to the startkey of the next region in hbase:meta." 675 + "Please use hbck tool to fix it first."); 676 } 677 } 678 679 /** 680 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 681 * no longer fits into a region, physically splits the hfile such that the new bottom half will 682 * fit and returns the list of LQI's corresponding to the resultant hfiles. 683 * <p/> 684 * protected for testing 685 * @throws IOException if an IO failure is encountered 686 */ 687 @InterfaceAudience.Private 688 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 689 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 690 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 691 Path hfilePath = item.getFilePath(); 692 Optional<byte[]> first, last; 693 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 694 CacheConfig.DISABLED, true, getConf())) { 695 first = hfr.getFirstRowKey(); 696 last = hfr.getLastRowKey(); 697 } catch (FileNotFoundException fnfe) { 698 LOG.debug("encountered", fnfe); 699 return new Pair<>(null, hfilePath.getName()); 700 } 701 702 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) 703 + " last=" + last.map(Bytes::toStringBinary)); 704 if (!first.isPresent() || !last.isPresent()) { 705 assert !first.isPresent() && !last.isPresent(); 706 // TODO what if this is due to a bad HFile? 707 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 708 return null; 709 } 710 if (Bytes.compareTo(first.get(), last.get()) > 0) { 711 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) 712 + " > " + Bytes.toStringBinary(last.get())); 713 } 714 int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get()); 715 checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName); 716 boolean lastKeyInRange = 717 Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes 718 .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY); 719 if (!lastKeyInRange) { 720 if (failIfNeedSplitHFile) { 721 throw new IOException( 722 "The key range of hfile=" + hfilePath + " fits into no region. " + "And because " 723 + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps."); 724 } 725 int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get()); 726 int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2; 727 // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger 728 // than hfile.endkey w/o this check 729 if (splitIdx != firstKeyRegionIdx) { 730 checkRegionIndexValid(splitIdx, startEndKeys, tableName); 731 } 732 byte[] splitPoint = startEndKeys.get(splitIdx).getSecond(); 733 List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item, 734 FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); 735 736 return new Pair<>(lqis, null); 737 } 738 739 // group regions. 740 regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item); 741 return null; 742 } 743 744 /** 745 * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata, 746 * recreating bloom filters, etc. 747 */ 748 @InterfaceAudience.Private 749 static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile, 750 ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut) 751 throws IOException { 752 // Open reader with no block cache, and not in-memory 753 Reference topReference = Reference.createTopReference(splitKey); 754 Reference bottomReference = Reference.createBottomReference(splitKey); 755 756 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc); 757 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc); 758 } 759 760 /** 761 * Copy half of an HFile into a new HFile with favored nodes. 762 */ 763 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 764 Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc) 765 throws IOException { 766 FileSystem fs = inFile.getFileSystem(conf); 767 CacheConfig cacheConf = CacheConfig.DISABLED; 768 HalfStoreFileReader halfReader = null; 769 StoreFileWriter halfWriter = null; 770 try { 771 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 772 StoreFileInfo storeFileInfo = 773 new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); 774 storeFileInfo.initHFileInfo(context); 775 halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf); 776 storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); 777 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 778 779 int blocksize = familyDescriptor.getBlocksize(); 780 Algorithm compression = familyDescriptor.getCompressionType(); 781 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 782 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 783 .withChecksumType(StoreUtils.getChecksumType(conf)) 784 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) 785 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 786 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 787 788 HFileScanner scanner = halfReader.getScanner(false, false, false); 789 scanner.seekTo(); 790 do { 791 final Cell cell = scanner.getCell(); 792 if (null != halfWriter) { 793 halfWriter.append(cell); 794 } else { 795 796 // init halfwriter 797 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 798 byte[] rowKey = CellUtil.cloneRow(cell); 799 HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); 800 InetSocketAddress[] favoredNodes = null; 801 if (null == hRegionLocation) { 802 LOG.warn( 803 "Failed get region location for rowkey {} , Using writer without favoured nodes.", 804 Bytes.toString(rowKey)); 805 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 806 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 807 } else { 808 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 809 InetSocketAddress initialIsa = 810 new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); 811 if (initialIsa.isUnresolved()) { 812 LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", 813 hRegionLocation); 814 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 815 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 816 } else { 817 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 818 favoredNodes = new InetSocketAddress[] { initialIsa }; 819 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 820 .withBloomType(bloomFilterType).withFileContext(hFileContext) 821 .withFavoredNodes(favoredNodes).build(); 822 } 823 } 824 } else { 825 halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 826 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 827 } 828 halfWriter.append(cell); 829 } 830 831 } while (scanner.next()); 832 833 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 834 if (shouldCopyHFileMetaKey(entry.getKey())) { 835 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 836 } 837 } 838 } finally { 839 if (halfReader != null) { 840 try { 841 halfReader.close(cacheConf.shouldEvictOnClose()); 842 } catch (IOException e) { 843 LOG.warn("failed to close hfile reader for " + inFile, e); 844 } 845 } 846 if (halfWriter != null) { 847 halfWriter.close(); 848 } 849 } 850 } 851 852 /** 853 * Infers region boundaries for a new table. 854 * <p/> 855 * Parameter: <br/> 856 * bdryMap is a map between keys to an integer belonging to {+1, -1} 857 * <ul> 858 * <li>If a key is a start key of a file, then it maps to +1</li> 859 * <li>If a key is an end key of a file, then it maps to -1</li> 860 * </ul> 861 * <p> 862 * Algo:<br/> 863 * <ol> 864 * <li>Poll on the keys in order: 865 * <ol type="a"> 866 * <li>Keep adding the mapped values to these keys (runningSum)</li> 867 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 868 * boundary list.</li> 869 * </ol> 870 * </li> 871 * <li>Return the boundary list.</li> 872 * </ol> 873 */ 874 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 875 List<byte[]> keysArray = new ArrayList<>(); 876 int runningValue = 0; 877 byte[] currStartKey = null; 878 boolean firstBoundary = true; 879 880 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 881 if (runningValue == 0) { 882 currStartKey = item.getKey(); 883 } 884 runningValue += item.getValue(); 885 if (runningValue == 0) { 886 if (!firstBoundary) { 887 keysArray.add(currStartKey); 888 } 889 firstBoundary = false; 890 } 891 } 892 893 return keysArray.toArray(new byte[0][]); 894 } 895 896 /** 897 * If the table is created for the first time, then "completebulkload" reads the files twice. More 898 * modifications necessary if we want to avoid doing it. 899 */ 900 private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException { 901 final FileSystem fs = hfofDir.getFileSystem(getConf()); 902 903 // Add column families 904 // Build a set of keys 905 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 906 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 907 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 908 @Override 909 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 910 ColumnFamilyDescriptorBuilder builder = 911 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 912 familyBuilders.add(builder); 913 return builder; 914 } 915 916 @Override 917 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 918 throws IOException { 919 Path hfile = hfileStatus.getPath(); 920 try (HFile.Reader reader = 921 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 922 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 923 builder.setCompressionType(reader.getFileContext().getCompression()); 924 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() 925 + " for family " + builder.getNameAsString()); 926 } 927 byte[] first = reader.getFirstRowKey().get(); 928 byte[] last = reader.getLastRowKey().get(); 929 930 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" 931 + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 932 933 // To eventually infer start key-end key boundaries 934 Integer value = map.getOrDefault(first, 0); 935 map.put(first, value + 1); 936 937 value = map.containsKey(last) ? map.get(last) : 0; 938 map.put(last, value - 1); 939 } 940 } 941 }, true); 942 943 byte[][] keys = inferBoundaries(map); 944 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 945 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 946 .forEachOrdered(tdBuilder::setColumnFamily); 947 FutureUtils.get(admin.createTable(tdBuilder.build(), keys)); 948 949 LOG.info("Table " + tableName + " is available!!"); 950 } 951 952 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn, 953 TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile) 954 throws IOException { 955 int count = 0; 956 957 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 958 bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName)); 959 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 960 961 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 962 // Assumes that region splits can happen while this occurs. 963 while (!queue.isEmpty()) { 964 // need to reload split keys each iteration. 965 final List<Pair<byte[], byte[]>> startEndKeys = 966 FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys()); 967 if (count != 0) { 968 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " 969 + queue.size() + " files remaining to group or split"); 970 } 971 972 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 973 maxRetries = Math.max(maxRetries, startEndKeys.size() + 1); 974 if (maxRetries != 0 && count >= maxRetries) { 975 throw new IOException( 976 "Retry attempted " + count + " times without completing, bailing out"); 977 } 978 count++; 979 980 // Using ByteBuffer for byte[] equality semantics 981 pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys); 982 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 983 984 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 985 // Error is logged inside checkHFilesCountPerRegionPerFamily. 986 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily 987 + " hfiles to one family of one region"); 988 } 989 990 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap); 991 992 // NOTE: The next iteration's split / group could happen in parallel to 993 // atomic bulkloads assuming that there are splits and no merges, and 994 // that we can atomically pull out the groups we want to retry. 995 } 996 997 return item2RegionMap; 998 } 999 1000 private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue, 1001 ExecutorService pool) throws IOException { 1002 fsDelegationToken.releaseDelegationToken(); 1003 if (bulkToken != null) { 1004 conn.cleanupBulkLoad(tableName, bulkToken); 1005 } 1006 if (pool != null) { 1007 pool.shutdown(); 1008 } 1009 if (!queue.isEmpty()) { 1010 StringBuilder err = new StringBuilder(); 1011 err.append("-------------------------------------------------\n"); 1012 err.append("Bulk load aborted with some files not yet loaded:\n"); 1013 err.append("-------------------------------------------------\n"); 1014 for (LoadQueueItem q : queue) { 1015 err.append(" ").append(q.getFilePath()).append('\n'); 1016 } 1017 LOG.error(err.toString()); 1018 } 1019 } 1020 1021 /** 1022 * Perform a bulk load of the given map of families to hfiles into the given pre-existing table. 1023 * This method is not threadsafe. 1024 * @param map map of family to List of hfiles 1025 * @param tableName table to load the hfiles 1026 * @param silence true to ignore unmatched column families 1027 * @param copyFile always copy hfiles if true 1028 */ 1029 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 1030 TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile) 1031 throws IOException { 1032 tableExists(conn, tableName); 1033 // LQI queue does not need to be threadsafe -- all operations on this queue 1034 // happen in this thread 1035 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 1036 ExecutorService pool = null; 1037 try { 1038 prepareHFileQueue(conn, tableName, map, queue, silence); 1039 if (queue.isEmpty()) { 1040 LOG.warn("Bulk load operation did not get any files to load"); 1041 return Collections.emptyMap(); 1042 } 1043 pool = createExecutorService(); 1044 return performBulkLoad(conn, tableName, queue, pool, copyFile); 1045 } finally { 1046 cleanup(conn, tableName, queue, pool); 1047 } 1048 } 1049 1050 /** 1051 * Perform a bulk load of the given directory into the given pre-existing table. This method is 1052 * not threadsafe. 1053 * @param tableName table to load the hfiles 1054 * @param hfofDir the directory that was provided as the output path of a job using 1055 * HFileOutputFormat 1056 * @param silence true to ignore unmatched column families 1057 * @param copyFile always copy hfiles if true 1058 */ 1059 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 1060 TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException { 1061 tableExists(conn, tableName); 1062 1063 /* 1064 * Checking hfile format is a time-consuming operation, we should have an option to skip this 1065 * step when bulkloading millions of HFiles. See HBASE-13985. 1066 */ 1067 boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true); 1068 if (!validateHFile) { 1069 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " 1070 + "are not correct. If you fail to read data from your table after using this " 1071 + "option, consider removing the files and bulkload again without this option. " 1072 + "See HBASE-13985"); 1073 } 1074 // LQI queue does not need to be threadsafe -- all operations on this queue 1075 // happen in this thread 1076 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 1077 ExecutorService pool = null; 1078 try { 1079 prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence); 1080 1081 if (queue.isEmpty()) { 1082 LOG.warn( 1083 "Bulk load operation did not find any files to load in directory {}. " 1084 + "Does it contain files in subdirectories that correspond to column family names?", 1085 (hfofDir != null ? hfofDir.toUri().toString() : "")); 1086 return Collections.emptyMap(); 1087 } 1088 pool = createExecutorService(); 1089 return performBulkLoad(conn, tableName, queue, pool, copyFile); 1090 } finally { 1091 cleanup(conn, tableName, queue, pool); 1092 } 1093 } 1094 1095 @Override 1096 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, 1097 Map<byte[], List<Path>> family2Files) throws IOException { 1098 try (AsyncClusterConnection conn = ClusterConnectionFactory 1099 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1100 return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles()); 1101 } 1102 } 1103 1104 @Override 1105 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException { 1106 try (AsyncClusterConnection conn = ClusterConnectionFactory 1107 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1108 AsyncAdmin admin = conn.getAdmin(); 1109 if (!FutureUtils.get(admin.tableExists(tableName))) { 1110 if (isCreateTable()) { 1111 createTable(tableName, dir, admin); 1112 } else { 1113 throwAndLogTableNotFoundException(tableName); 1114 } 1115 } 1116 return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles()); 1117 } 1118 } 1119 1120 /** 1121 * @throws TableNotFoundException if table does not exist. 1122 */ 1123 private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException { 1124 if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) { 1125 throwAndLogTableNotFoundException(tableName); 1126 } 1127 } 1128 1129 private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException { 1130 String errorMsg = format("Table '%s' does not exist.", tn); 1131 LOG.error(errorMsg); 1132 throw new TableNotFoundException(errorMsg); 1133 } 1134 1135 public void setBulkToken(String bulkToken) { 1136 this.bulkToken = bulkToken; 1137 } 1138 1139 public void setClusterIds(List<String> clusterIds) { 1140 this.clusterIds = clusterIds; 1141 } 1142 1143 private void usage() { 1144 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 1145 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 1146 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 1147 + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D" 1148 + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 1149 + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY 1150 + "=yes to ignore unmatched column families.\n" 1151 + " -loadTable for when directory of files to load has a depth of 3; target table must " 1152 + "exist;\n" + " must be last of the options on command line.\n" 1153 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 1154 + "documentation.\n"); 1155 } 1156 1157 @Override 1158 public int run(String[] args) throws Exception { 1159 if (args.length != 2 && args.length != 3) { 1160 usage(); 1161 return -1; 1162 } 1163 // Re-initialize to apply -D options from the command line parameters 1164 initialize(); 1165 Path dirPath = new Path(args[0]); 1166 TableName tableName = TableName.valueOf(args[1]); 1167 if (args.length == 2) { 1168 return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1; 1169 } else { 1170 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1171 FileSystem fs = FileSystem.get(getConf()); 1172 for (FileStatus regionDir : fs.listStatus(dirPath)) { 1173 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1174 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1175 byte[] familyName = Bytes.toBytes(family); 1176 if (family2Files.containsKey(familyName)) { 1177 family2Files.get(familyName).add(path); 1178 } else { 1179 family2Files.put(familyName, Lists.newArrayList(path)); 1180 } 1181 }); 1182 } 1183 return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1; 1184 } 1185 } 1186 1187 public static void main(String[] args) throws Exception { 1188 Configuration conf = HBaseConfiguration.create(); 1189 int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); 1190 System.exit(ret); 1191 } 1192 1193 @Override 1194 public void disableReplication() { 1195 this.replicate = false; 1196 } 1197 1198 @Override 1199 public boolean isReplicationDisabled() { 1200 return !this.replicate; 1201 } 1202}