001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.tool; 019 020import static java.lang.String.format; 021 022import java.io.FileNotFoundException; 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.net.InetSocketAddress; 026import java.nio.ByteBuffer; 027import java.util.ArrayDeque; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collection; 031import java.util.Collections; 032import java.util.Deque; 033import java.util.HashMap; 034import java.util.HashSet; 035import java.util.List; 036import java.util.Map; 037import java.util.Map.Entry; 038import java.util.Optional; 039import java.util.Set; 040import java.util.SortedMap; 041import java.util.TreeMap; 042import java.util.UUID; 043import java.util.concurrent.Callable; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ExecutionException; 046import java.util.concurrent.ExecutorService; 047import java.util.concurrent.Future; 048import java.util.concurrent.LinkedBlockingQueue; 049import java.util.concurrent.ThreadPoolExecutor; 050import java.util.concurrent.TimeUnit; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.stream.Collectors; 053import org.apache.commons.lang3.mutable.MutableInt; 054import org.apache.hadoop.conf.Configuration; 055import org.apache.hadoop.conf.Configured; 056import org.apache.hadoop.fs.FileStatus; 057import org.apache.hadoop.fs.FileSystem; 058import org.apache.hadoop.fs.Path; 059import org.apache.hadoop.fs.permission.FsPermission; 060import org.apache.hadoop.hbase.Cell; 061import org.apache.hadoop.hbase.CellUtil; 062import org.apache.hadoop.hbase.ExtendedCell; 063import org.apache.hadoop.hbase.HBaseConfiguration; 064import org.apache.hadoop.hbase.HBaseInterfaceAudience; 065import org.apache.hadoop.hbase.HConstants; 066import org.apache.hadoop.hbase.HRegionLocation; 067import org.apache.hadoop.hbase.KeyValue; 068import org.apache.hadoop.hbase.TableName; 069import org.apache.hadoop.hbase.TableNotFoundException; 070import org.apache.hadoop.hbase.client.AsyncAdmin; 071import org.apache.hadoop.hbase.client.AsyncClusterConnection; 072import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; 073import org.apache.hadoop.hbase.client.ClusterConnectionFactory; 074import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 075import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 076import org.apache.hadoop.hbase.client.TableDescriptor; 077import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 078import org.apache.hadoop.hbase.io.HFileLink; 079import org.apache.hadoop.hbase.io.Reference; 080import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; 081import org.apache.hadoop.hbase.io.hfile.CacheConfig; 082import org.apache.hadoop.hbase.io.hfile.HFile; 083import org.apache.hadoop.hbase.io.hfile.HFileContext; 084import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; 085import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; 086import org.apache.hadoop.hbase.io.hfile.HFileInfo; 087import org.apache.hadoop.hbase.io.hfile.ReaderContext; 088import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder; 089import org.apache.hadoop.hbase.regionserver.BloomType; 090import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 091import org.apache.hadoop.hbase.regionserver.StoreFileReader; 092import org.apache.hadoop.hbase.regionserver.StoreFileScanner; 093import org.apache.hadoop.hbase.regionserver.StoreFileWriter; 094import org.apache.hadoop.hbase.regionserver.StoreUtils; 095import org.apache.hadoop.hbase.security.UserProvider; 096import org.apache.hadoop.hbase.security.token.FsDelegationToken; 097import org.apache.hadoop.hbase.util.Bytes; 098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 099import org.apache.hadoop.hbase.util.FSVisitor; 100import org.apache.hadoop.hbase.util.FutureUtils; 101import org.apache.hadoop.hbase.util.Pair; 102import org.apache.hadoop.util.Tool; 103import org.apache.hadoop.util.ToolRunner; 104import org.apache.yetus.audience.InterfaceAudience; 105import org.slf4j.Logger; 106import org.slf4j.LoggerFactory; 107 108import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; 109import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 110import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 111import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; 112import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; 113import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 114 115/** 116 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a 117 * tool. 118 */ 119@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 120public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool { 121 122 private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class); 123 124 /** 125 * Keep locality while generating HFiles for bulkload. See HBASE-12596 126 */ 127 public static final String LOCALITY_SENSITIVE_CONF_KEY = 128 "hbase.bulkload.locality.sensitive.enabled"; 129 private static final boolean DEFAULT_LOCALITY_SENSITIVE = true; 130 131 public static final String NAME = "completebulkload"; 132 /** 133 * Whether to run validation on hfiles before loading. 134 */ 135 private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile"; 136 /** 137 * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because 138 * of compacting at server side 139 */ 140 public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family"; 141 142 public static final String FAIL_IF_NEED_SPLIT_HFILE = 143 "hbase.loadincremental.fail.if.need.split.hfile"; 144 145 // We use a '.' prefix which is ignored when walking directory trees 146 // above. It is invalid family name. 147 static final String TMP_DIR = ".tmp"; 148 149 private int maxFilesPerRegionPerFamily; 150 private boolean assignSeqIds; 151 private boolean bulkLoadByFamily; 152 153 // Source delegation token 154 private FsDelegationToken fsDelegationToken; 155 private UserProvider userProvider; 156 private int nrThreads; 157 private final AtomicInteger numRetries = new AtomicInteger(0); 158 private String bulkToken; 159 160 private List<String> clusterIds = new ArrayList<>(); 161 private boolean replicate = true; 162 private boolean failIfNeedSplitHFile = false; 163 164 public BulkLoadHFilesTool(Configuration conf) { 165 // make a copy, just to be sure we're not overriding someone else's config 166 super(new Configuration(conf)); 167 initialize(); 168 } 169 170 public void initialize() { 171 Configuration conf = getConf(); 172 // disable blockcache for tool invocation, see HBASE-10500 173 conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0); 174 userProvider = UserProvider.instantiate(conf); 175 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 176 assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true); 177 maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); 178 nrThreads = 179 conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); 180 bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false); 181 failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false); 182 } 183 184 // Initialize a thread pool 185 private ExecutorService createExecutorService() { 186 ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS, 187 new LinkedBlockingQueue<>(), 188 new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build()); 189 pool.allowCoreThreadTimeOut(true); 190 return pool; 191 } 192 193 private boolean isCreateTable() { 194 return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes")); 195 } 196 197 private boolean isSilence() { 198 return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, "")); 199 } 200 201 private boolean isAlwaysCopyFiles() { 202 return getConf().getBoolean(ALWAYS_COPY_FILES, false); 203 } 204 205 private static boolean shouldCopyHFileMetaKey(byte[] key) { 206 // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085 207 if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) { 208 return false; 209 } 210 211 return !HFileInfo.isReservedFileInfoKey(key); 212 } 213 214 /** 215 * Checks whether there is any invalid family name in HFiles to be bulk loaded. 216 */ 217 private static void validateFamiliesInHFiles(TableDescriptor tableDesc, 218 Deque<LoadQueueItem> queue, boolean silence) throws IOException { 219 Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies()) 220 .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet()); 221 List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily())) 222 .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList()); 223 if (unmatchedFamilies.size() > 0) { 224 String msg = 225 "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " 226 + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName() 227 + " are: " + familyNames; 228 LOG.error(msg); 229 if (!silence) { 230 throw new IOException(msg); 231 } 232 } 233 } 234 235 /** 236 * Populate the Queue with given HFiles 237 */ 238 private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) { 239 map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add)); 240 } 241 242 private interface BulkHFileVisitor<TFamily> { 243 244 TFamily bulkFamily(byte[] familyName) throws IOException; 245 246 void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException; 247 } 248 249 /** 250 * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and 251 * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES} 252 * to false. 253 */ 254 private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir, 255 BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException { 256 FileStatus[] familyDirStatuses = fs.listStatus(bulkDir); 257 for (FileStatus familyStat : familyDirStatuses) { 258 if (!familyStat.isDirectory()) { 259 LOG.warn("Skipping non-directory " + familyStat.getPath()); 260 continue; 261 } 262 Path familyDir = familyStat.getPath(); 263 byte[] familyName = Bytes.toBytes(familyDir.getName()); 264 // Skip invalid family 265 try { 266 ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName); 267 } catch (IllegalArgumentException e) { 268 LOG.warn("Skipping invalid " + familyStat.getPath()); 269 continue; 270 } 271 TFamily family = visitor.bulkFamily(familyName); 272 273 FileStatus[] hfileStatuses = fs.listStatus(familyDir); 274 for (FileStatus hfileStatus : hfileStatuses) { 275 if (!fs.isFile(hfileStatus.getPath())) { 276 LOG.warn("Skipping non-file " + hfileStatus); 277 continue; 278 } 279 280 Path hfile = hfileStatus.getPath(); 281 // Skip "_", reference, HFileLink 282 String fileName = hfile.getName(); 283 if (fileName.startsWith("_")) { 284 continue; 285 } 286 if (StoreFileInfo.isReference(fileName)) { 287 LOG.warn("Skipping reference " + fileName); 288 continue; 289 } 290 if (HFileLink.isHFileLink(fileName)) { 291 LOG.warn("Skipping HFileLink " + fileName); 292 continue; 293 } 294 295 // Validate HFile Format if needed 296 if (validateHFile) { 297 try { 298 if (!HFile.isHFileFormat(fs, hfile)) { 299 LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping"); 300 continue; 301 } 302 } catch (FileNotFoundException e) { 303 LOG.warn("the file " + hfile + " was removed"); 304 continue; 305 } 306 } 307 308 visitor.bulkHFile(family, hfileStatus); 309 } 310 } 311 } 312 313 /** 314 * Walk the given directory for all HFiles, and return a Queue containing all such files. 315 */ 316 private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir, 317 boolean validateHFile) throws IOException { 318 visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() { 319 @Override 320 public byte[] bulkFamily(final byte[] familyName) { 321 return familyName; 322 } 323 324 @Override 325 public void bulkHFile(final byte[] family, final FileStatus hfile) { 326 long length = hfile.getLen(); 327 if ( 328 length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE) 329 ) { 330 LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length 331 + " bytes can be problematic as it may lead to oversplitting."); 332 } 333 ret.add(new LoadQueueItem(family, hfile.getPath())); 334 } 335 }, validateHFile); 336 } 337 338 /** 339 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 340 * passed directory and validates whether the prepared queue has all the valid table column 341 * families in it. 342 * @param map map of family to List of hfiles 343 * @param tableName table to which hfiles should be loaded 344 * @param queue queue which needs to be loaded into the table 345 * @param silence true to ignore unmatched column families 346 * @throws IOException If any I/O or network error occurred 347 */ 348 public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName, 349 Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException { 350 populateLoadQueue(queue, map); 351 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 352 silence); 353 } 354 355 /** 356 * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the 357 * passed directory and validates whether the prepared queue has all the valid table column 358 * families in it. 359 * @param hfilesDir directory containing list of hfiles to be loaded into the table 360 * @param queue queue which needs to be loaded into the table 361 * @param validateHFile if true hfiles will be validated for its format 362 * @param silence true to ignore unmatched column families 363 * @throws IOException If any I/O or network error occurred 364 */ 365 public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn, 366 TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile, 367 boolean silence) throws IOException { 368 discoverLoadQueue(conf, queue, hfilesDir, validateHFile); 369 validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue, 370 silence); 371 } 372 373 /** 374 * Used by the replication sink to load the hfiles from the source cluster. It does the following, 375 * <ol> 376 * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)} 377 * </li> 378 * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)} 379 * </li> 380 * </ol> 381 * @param conn Connection to use 382 * @param tableName Table to which these hfiles should be loaded to 383 * @param queue {@code LoadQueueItem} has hfiles yet to be loaded 384 */ 385 public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName, 386 Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException { 387 ExecutorService pool = createExecutorService(); 388 try { 389 Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool, 390 queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst(); 391 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null); 392 } finally { 393 pool.shutdown(); 394 } 395 } 396 397 /** 398 * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of 399 * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To 400 * maintain row atomicity guarantees, region server side should succeed atomically and fails 401 * atomically. 402 * @param conn Connection to use 403 * @param tableName Table to which these hfiles should be loaded to 404 * @param copyFiles whether replicate to peer cluster while bulkloading 405 * @param first the start key of region 406 * @param lqis hfiles should be loaded 407 * @return empty list if success, list of items to retry on recoverable failure 408 */ 409 @InterfaceAudience.Private 410 protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad( 411 final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles, 412 final byte[] first, Collection<LoadQueueItem> lqis) { 413 List<Pair<byte[], String>> familyPaths = 414 lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString())) 415 .collect(Collectors.toList()); 416 CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>(); 417 FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, 418 fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), 419 (loaded, error) -> { 420 if (error != null) { 421 LOG.error("Encountered unrecoverable error from region server", error); 422 if ( 423 getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) 424 && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 425 HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) 426 ) { 427 LOG.warn("Will attempt to retry loading failed HFiles. Retry #" 428 + numRetries.incrementAndGet()); 429 // return lqi's to retry 430 future.complete(lqis); 431 } else { 432 LOG.error(RETRY_ON_IO_EXCEPTION 433 + " is disabled or we have reached retry limit. Unable to recover"); 434 future.completeExceptionally(error); 435 } 436 } else { 437 if (loaded) { 438 future.complete(Collections.emptyList()); 439 } else { 440 LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) 441 + " into table " + tableName + " with files " + lqis 442 + " failed. This is recoverable and they will be retried."); 443 // return lqi's to retry 444 future.complete(lqis); 445 } 446 } 447 }); 448 return future; 449 } 450 451 /** 452 * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are 453 * re-queued for another pass with the groupOrSplitPhase. 454 * <p/> 455 * protected for testing. 456 */ 457 @InterfaceAudience.Private 458 protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName, 459 Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles, 460 Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException { 461 // atomically bulk load the groups. 462 List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>(); 463 for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap() 464 .entrySet()) { 465 byte[] first = entry.getKey().array(); 466 final Collection<LoadQueueItem> lqis = entry.getValue(); 467 if (bulkLoadByFamily) { 468 groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures 469 .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue))); 470 } else { 471 loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis)); 472 } 473 if (item2RegionMap != null) { 474 for (LoadQueueItem lqi : lqis) { 475 item2RegionMap.put(lqi, entry.getKey()); 476 } 477 } 478 } 479 480 // get all the results. 481 for (Future<Collection<LoadQueueItem>> future : loadingFutures) { 482 try { 483 Collection<LoadQueueItem> toRetry = future.get(); 484 485 if (item2RegionMap != null) { 486 for (LoadQueueItem lqi : toRetry) { 487 item2RegionMap.remove(lqi); 488 } 489 } 490 // LQIs that are requeued to be regrouped. 491 queue.addAll(toRetry); 492 } catch (ExecutionException e1) { 493 Throwable t = e1.getCause(); 494 if (t instanceof IOException) { 495 // At this point something unrecoverable has happened. 496 // TODO Implement bulk load recovery 497 throw new IOException("BulkLoad encountered an unrecoverable problem", t); 498 } 499 LOG.error("Unexpected execution exception during bulk load", e1); 500 throw new IllegalStateException(t); 501 } catch (InterruptedException e1) { 502 LOG.error("Unexpected interrupted exception during bulk load", e1); 503 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 504 } 505 } 506 } 507 508 private Map<byte[], Collection<LoadQueueItem>> 509 groupByFamilies(Collection<LoadQueueItem> itemsInRegion) { 510 Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR); 511 itemsInRegion.forEach(item -> families2Queue 512 .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item)); 513 return families2Queue; 514 } 515 516 private boolean 517 checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) { 518 for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) { 519 Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 520 for (LoadQueueItem lqi : e.getValue()) { 521 MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt()); 522 count.increment(); 523 if (count.intValue() > maxFilesPerRegionPerFamily) { 524 LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family " 525 + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key " 526 + Bytes.toStringBinary(e.getKey())); 527 return false; 528 } 529 } 530 } 531 return true; 532 } 533 534 /** 535 * @param conn the HBase cluster connection 536 * @param tableName the table name of the table to load into 537 * @param pool the ExecutorService 538 * @param queue the queue for LoadQueueItem 539 * @param startEndKeys start and end keys 540 * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles. 541 */ 542 private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase( 543 AsyncClusterConnection conn, TableName tableName, ExecutorService pool, 544 Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 545 // <region start key, LQI> need synchronized only within this scope of this 546 // phase because of the puts that happen in futures. 547 Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create(); 548 final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs); 549 Set<String> missingHFiles = new HashSet<>(); 550 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = 551 new Pair<>(regionGroups, missingHFiles); 552 553 // drain LQIs and figure out bulk load groups 554 Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>(); 555 while (!queue.isEmpty()) { 556 final LoadQueueItem item = queue.remove(); 557 final Callable<Pair<List<LoadQueueItem>, String>> call = 558 () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys); 559 splittingFutures.add(pool.submit(call)); 560 } 561 // get all the results. All grouping and splitting must finish before 562 // we can attempt the atomic loads. 563 for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) { 564 try { 565 Pair<List<LoadQueueItem>, String> splits = lqis.get(); 566 if (splits != null) { 567 if (splits.getFirst() != null) { 568 queue.addAll(splits.getFirst()); 569 } else { 570 missingHFiles.add(splits.getSecond()); 571 } 572 } 573 } catch (ExecutionException e1) { 574 Throwable t = e1.getCause(); 575 if (t instanceof IOException) { 576 LOG.error("IOException during splitting", e1); 577 throw (IOException) t; // would have been thrown if not parallelized, 578 } 579 LOG.error("Unexpected execution exception during splitting", e1); 580 throw new IllegalStateException(t); 581 } catch (InterruptedException e1) { 582 LOG.error("Unexpected interrupted exception during splitting", e1); 583 throw (InterruptedIOException) new InterruptedIOException().initCause(e1); 584 } 585 } 586 return pair; 587 } 588 589 // unique file name for the table 590 private String getUniqueName() { 591 return UUID.randomUUID().toString().replaceAll("-", ""); 592 } 593 594 private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item, 595 TableDescriptor tableDesc, byte[] splitKey) throws IOException { 596 Path hfilePath = item.getFilePath(); 597 byte[] family = item.getFamily(); 598 Path tmpDir = hfilePath.getParent(); 599 if (!tmpDir.getName().equals(TMP_DIR)) { 600 tmpDir = new Path(tmpDir, TMP_DIR); 601 } 602 603 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting..."); 604 605 String uniqueName = getUniqueName(); 606 ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family); 607 608 Path botOut = new Path(tmpDir, uniqueName + ".bottom"); 609 Path topOut = new Path(tmpDir, uniqueName + ".top"); 610 611 splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut); 612 613 FileSystem fs = tmpDir.getFileSystem(getConf()); 614 fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx")); 615 fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx")); 616 fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx")); 617 618 // Add these back at the *front* of the queue, so there's a lower 619 // chance that the region will just split again before we get there. 620 List<LoadQueueItem> lqis = new ArrayList<>(2); 621 lqis.add(new LoadQueueItem(family, botOut)); 622 lqis.add(new LoadQueueItem(family, topOut)); 623 624 // If the current item is already the result of previous splits, 625 // we don't need it anymore. Clean up to save space. 626 // It is not part of the original input files. 627 try { 628 if (tmpDir.getName().equals(TMP_DIR)) { 629 fs.delete(hfilePath, false); 630 } 631 } catch (IOException e) { 632 LOG.warn("Unable to delete temporary split file " + hfilePath); 633 } 634 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut); 635 return lqis; 636 } 637 638 /** 639 * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending 640 * order by start key 641 * @param key the key need to find which region belong to 642 * @return region index 643 */ 644 private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) { 645 int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW), 646 (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst())); 647 if (idx < 0) { 648 // not on boundary, returns -(insertion index). Calculate region it 649 // would be in. 650 idx = -(idx + 1) - 1; 651 } 652 return idx; 653 } 654 655 /** 656 * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then 657 * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the 658 * next region. 3) if the endkey of the last region is not empty. 659 */ 660 private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys, 661 TableName tableName) throws IOException { 662 if (idx < 0) { 663 throw new IOException("The first region info for table " + tableName 664 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 665 } else if ( 666 (idx == startEndKeys.size() - 1) 667 && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY) 668 ) { 669 throw new IOException("The last region info for table " + tableName 670 + " can't be found in hbase:meta.Please use hbck tool to fix it first."); 671 } else if ( 672 idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(), 673 startEndKeys.get(idx + 1).getFirst()) == 0) 674 ) { 675 throw new IOException("The endkey of one region for table " + tableName 676 + " is not equal to the startkey of the next region in hbase:meta." 677 + "Please use hbck tool to fix it first."); 678 } 679 } 680 681 /** 682 * Attempt to assign the given load queue item into its target region group. If the hfile boundary 683 * no longer fits into a region, physically splits the hfile such that the new bottom half will 684 * fit and returns the list of LQI's corresponding to the resultant hfiles. 685 * <p/> 686 * protected for testing 687 * @throws IOException if an IO failure is encountered 688 */ 689 @InterfaceAudience.Private 690 protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn, 691 TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item, 692 List<Pair<byte[], byte[]>> startEndKeys) throws IOException { 693 Path hfilePath = item.getFilePath(); 694 Optional<byte[]> first, last; 695 try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath, 696 CacheConfig.DISABLED, true, getConf())) { 697 first = hfr.getFirstRowKey(); 698 last = hfr.getLastRowKey(); 699 } catch (FileNotFoundException fnfe) { 700 LOG.debug("encountered", fnfe); 701 return new Pair<>(null, hfilePath.getName()); 702 } 703 704 LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) 705 + " last=" + last.map(Bytes::toStringBinary)); 706 if (!first.isPresent() || !last.isPresent()) { 707 assert !first.isPresent() && !last.isPresent(); 708 // TODO what if this is due to a bad HFile? 709 LOG.info("hfile " + hfilePath + " has no entries, skipping"); 710 return null; 711 } 712 if (Bytes.compareTo(first.get(), last.get()) > 0) { 713 throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) 714 + " > " + Bytes.toStringBinary(last.get())); 715 } 716 int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get()); 717 checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName); 718 boolean lastKeyInRange = 719 Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes 720 .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY); 721 if (!lastKeyInRange) { 722 if (failIfNeedSplitHFile) { 723 throw new IOException( 724 "The key range of hfile=" + hfilePath + " fits into no region. " + "And because " 725 + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps."); 726 } 727 int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get()); 728 int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2; 729 // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger 730 // than hfile.endkey w/o this check 731 if (splitIdx != firstKeyRegionIdx) { 732 checkRegionIndexValid(splitIdx, startEndKeys, tableName); 733 } 734 byte[] splitPoint = startEndKeys.get(splitIdx).getSecond(); 735 List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item, 736 FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint); 737 738 return new Pair<>(lqis, null); 739 } 740 741 // group regions. 742 regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item); 743 return null; 744 } 745 746 /** 747 * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata, 748 * recreating bloom filters, etc. 749 */ 750 @InterfaceAudience.Private 751 static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile, 752 ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut) 753 throws IOException { 754 // Open reader with no block cache, and not in-memory 755 Reference topReference = Reference.createTopReference(splitKey); 756 Reference bottomReference = Reference.createBottomReference(splitKey); 757 758 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc); 759 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc); 760 } 761 762 private static StoreFileWriter initStoreFileWriter(Configuration conf, Cell cell, 763 HFileContext hFileContext, CacheConfig cacheConf, BloomType bloomFilterType, FileSystem fs, 764 Path outFile, AsyncTableRegionLocator loc) throws IOException { 765 if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) { 766 byte[] rowKey = CellUtil.cloneRow(cell); 767 HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey)); 768 InetSocketAddress[] favoredNodes = null; 769 if (null == hRegionLocation) { 770 LOG.warn("Failed get region location for rowkey {} , Using writer without favoured nodes.", 771 Bytes.toString(rowKey)); 772 return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 773 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 774 } else { 775 LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); 776 InetSocketAddress initialIsa = 777 new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort()); 778 if (initialIsa.isUnresolved()) { 779 LOG.warn("Failed get location for region {} , Using writer without favoured nodes.", 780 hRegionLocation); 781 return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 782 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 783 } else { 784 LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString()); 785 favoredNodes = new InetSocketAddress[] { initialIsa }; 786 return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 787 .withBloomType(bloomFilterType).withFileContext(hFileContext) 788 .withFavoredNodes(favoredNodes).build(); 789 } 790 } 791 } else { 792 return new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile) 793 .withBloomType(bloomFilterType).withFileContext(hFileContext).build(); 794 } 795 } 796 797 /** 798 * Copy half of an HFile into a new HFile with favored nodes. 799 */ 800 private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile, 801 Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc) 802 throws IOException { 803 FileSystem fs = inFile.getFileSystem(conf); 804 CacheConfig cacheConf = CacheConfig.DISABLED; 805 StoreFileReader halfReader = null; 806 StoreFileWriter halfWriter = null; 807 try { 808 ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build(); 809 StoreFileInfo storeFileInfo = 810 new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference); 811 storeFileInfo.initHFileInfo(context); 812 halfReader = storeFileInfo.createReader(context, cacheConf); 813 storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader()); 814 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo(); 815 816 int blocksize = familyDescriptor.getBlocksize(); 817 Algorithm compression = familyDescriptor.getCompressionType(); 818 BloomType bloomFilterType = familyDescriptor.getBloomFilterType(); 819 HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) 820 .withChecksumType(StoreUtils.getChecksumType(conf)) 821 .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize) 822 .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true) 823 .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); 824 825 try (StoreFileScanner scanner = 826 halfReader.getStoreFileScanner(false, false, false, Long.MAX_VALUE, 0, false)) { 827 scanner.seek(KeyValue.LOWESTKEY); 828 for (;;) { 829 ExtendedCell cell = scanner.next(); 830 if (cell == null) { 831 break; 832 } 833 if (halfWriter == null) { 834 // init halfwriter 835 halfWriter = initStoreFileWriter(conf, cell, hFileContext, cacheConf, bloomFilterType, 836 fs, outFile, loc); 837 } 838 halfWriter.append(cell); 839 } 840 } 841 for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) { 842 if (shouldCopyHFileMetaKey(entry.getKey())) { 843 halfWriter.appendFileInfo(entry.getKey(), entry.getValue()); 844 } 845 } 846 } finally { 847 if (halfReader != null) { 848 try { 849 halfReader.close(cacheConf.shouldEvictOnClose()); 850 } catch (IOException e) { 851 LOG.warn("failed to close hfile reader for " + inFile, e); 852 } 853 } 854 if (halfWriter != null) { 855 halfWriter.close(); 856 } 857 } 858 } 859 860 /** 861 * Infers region boundaries for a new table. 862 * <p/> 863 * Parameter: <br/> 864 * bdryMap is a map between keys to an integer belonging to {+1, -1} 865 * <ul> 866 * <li>If a key is a start key of a file, then it maps to +1</li> 867 * <li>If a key is an end key of a file, then it maps to -1</li> 868 * </ul> 869 * <p> 870 * Algo:<br/> 871 * <ol> 872 * <li>Poll on the keys in order: 873 * <ol type="a"> 874 * <li>Keep adding the mapped values to these keys (runningSum)</li> 875 * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a 876 * boundary list.</li> 877 * </ol> 878 * </li> 879 * <li>Return the boundary list.</li> 880 * </ol> 881 */ 882 public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) { 883 List<byte[]> keysArray = new ArrayList<>(); 884 int runningValue = 0; 885 byte[] currStartKey = null; 886 boolean firstBoundary = true; 887 888 for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) { 889 if (runningValue == 0) { 890 currStartKey = item.getKey(); 891 } 892 runningValue += item.getValue(); 893 if (runningValue == 0) { 894 if (!firstBoundary) { 895 keysArray.add(currStartKey); 896 } 897 firstBoundary = false; 898 } 899 } 900 901 return keysArray.toArray(new byte[0][]); 902 } 903 904 /** 905 * If the table is created for the first time, then "completebulkload" reads the files twice. More 906 * modifications necessary if we want to avoid doing it. 907 */ 908 private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException { 909 final FileSystem fs = hfofDir.getFileSystem(getConf()); 910 911 // Add column families 912 // Build a set of keys 913 List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>(); 914 SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR); 915 visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() { 916 @Override 917 public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) { 918 ColumnFamilyDescriptorBuilder builder = 919 ColumnFamilyDescriptorBuilder.newBuilder(familyName); 920 familyBuilders.add(builder); 921 return builder; 922 } 923 924 @Override 925 public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus) 926 throws IOException { 927 Path hfile = hfileStatus.getPath(); 928 try (HFile.Reader reader = 929 HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) { 930 if (builder.getCompressionType() != reader.getFileContext().getCompression()) { 931 builder.setCompressionType(reader.getFileContext().getCompression()); 932 LOG.info("Setting compression " + reader.getFileContext().getCompression().name() 933 + " for family " + builder.getNameAsString()); 934 } 935 byte[] first = reader.getFirstRowKey().get(); 936 byte[] last = reader.getLastRowKey().get(); 937 938 LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" 939 + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last)); 940 941 // To eventually infer start key-end key boundaries 942 Integer value = map.getOrDefault(first, 0); 943 map.put(first, value + 1); 944 945 value = map.containsKey(last) ? map.get(last) : 0; 946 map.put(last, value - 1); 947 } 948 } 949 }, true); 950 951 byte[][] keys = inferBoundaries(map); 952 TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName); 953 familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build) 954 .forEachOrdered(tdBuilder::setColumnFamily); 955 FutureUtils.get(admin.createTable(tdBuilder.build(), keys)); 956 957 LOG.info("Table " + tableName + " is available!!"); 958 } 959 960 private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn, 961 TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile) 962 throws IOException { 963 int count = 0; 964 965 fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf())); 966 bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName)); 967 Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null; 968 969 Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>(); 970 // Assumes that region splits can happen while this occurs. 971 while (!queue.isEmpty()) { 972 // need to reload split keys each iteration. 973 final List<Pair<byte[], byte[]>> startEndKeys = 974 FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys()); 975 if (count != 0) { 976 LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with " 977 + queue.size() + " files remaining to group or split"); 978 } 979 980 int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 981 maxRetries = Math.max(maxRetries, startEndKeys.size() + 1); 982 if (maxRetries != 0 && count >= maxRetries) { 983 throw new IOException( 984 "Retry attempted " + count + " times without completing, bailing out"); 985 } 986 count++; 987 988 // Using ByteBuffer for byte[] equality semantics 989 pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys); 990 Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst(); 991 992 if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { 993 // Error is logged inside checkHFilesCountPerRegionPerFamily. 994 throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily 995 + " hfiles to one family of one region"); 996 } 997 998 bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap); 999 1000 // NOTE: The next iteration's split / group could happen in parallel to 1001 // atomic bulkloads assuming that there are splits and no merges, and 1002 // that we can atomically pull out the groups we want to retry. 1003 } 1004 1005 return item2RegionMap; 1006 } 1007 1008 private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue, 1009 ExecutorService pool) throws IOException { 1010 fsDelegationToken.releaseDelegationToken(); 1011 if (bulkToken != null) { 1012 conn.cleanupBulkLoad(tableName, bulkToken); 1013 } 1014 if (pool != null) { 1015 pool.shutdown(); 1016 } 1017 if (!queue.isEmpty()) { 1018 StringBuilder err = new StringBuilder(); 1019 err.append("-------------------------------------------------\n"); 1020 err.append("Bulk load aborted with some files not yet loaded:\n"); 1021 err.append("-------------------------------------------------\n"); 1022 for (LoadQueueItem q : queue) { 1023 err.append(" ").append(q.getFilePath()).append('\n'); 1024 } 1025 LOG.error(err.toString()); 1026 } 1027 } 1028 1029 /** 1030 * Perform a bulk load of the given map of families to hfiles into the given pre-existing table. 1031 * This method is not threadsafe. 1032 * @param map map of family to List of hfiles 1033 * @param tableName table to load the hfiles 1034 * @param silence true to ignore unmatched column families 1035 * @param copyFile always copy hfiles if true 1036 */ 1037 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 1038 TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile) 1039 throws IOException { 1040 tableExists(conn, tableName); 1041 // LQI queue does not need to be threadsafe -- all operations on this queue 1042 // happen in this thread 1043 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 1044 ExecutorService pool = null; 1045 try { 1046 prepareHFileQueue(conn, tableName, map, queue, silence); 1047 if (queue.isEmpty()) { 1048 LOG.warn("Bulk load operation did not get any files to load"); 1049 return Collections.emptyMap(); 1050 } 1051 pool = createExecutorService(); 1052 return performBulkLoad(conn, tableName, queue, pool, copyFile); 1053 } finally { 1054 cleanup(conn, tableName, queue, pool); 1055 } 1056 } 1057 1058 /** 1059 * Perform a bulk load of the given directory into the given pre-existing table. This method is 1060 * not threadsafe. 1061 * @param tableName table to load the hfiles 1062 * @param hfofDir the directory that was provided as the output path of a job using 1063 * HFileOutputFormat 1064 * @param silence true to ignore unmatched column families 1065 * @param copyFile always copy hfiles if true 1066 */ 1067 private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn, 1068 TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException { 1069 tableExists(conn, tableName); 1070 1071 /* 1072 * Checking hfile format is a time-consuming operation, we should have an option to skip this 1073 * step when bulkloading millions of HFiles. See HBASE-13985. 1074 */ 1075 boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true); 1076 if (!validateHFile) { 1077 LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " 1078 + "are not correct. If you fail to read data from your table after using this " 1079 + "option, consider removing the files and bulkload again without this option. " 1080 + "See HBASE-13985"); 1081 } 1082 // LQI queue does not need to be threadsafe -- all operations on this queue 1083 // happen in this thread 1084 Deque<LoadQueueItem> queue = new ArrayDeque<>(); 1085 ExecutorService pool = null; 1086 try { 1087 prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence); 1088 1089 if (queue.isEmpty()) { 1090 LOG.warn( 1091 "Bulk load operation did not find any files to load in directory {}. " 1092 + "Does it contain files in subdirectories that correspond to column family names?", 1093 (hfofDir != null ? hfofDir.toUri().toString() : "")); 1094 return Collections.emptyMap(); 1095 } 1096 pool = createExecutorService(); 1097 return performBulkLoad(conn, tableName, queue, pool, copyFile); 1098 } finally { 1099 cleanup(conn, tableName, queue, pool); 1100 } 1101 } 1102 1103 @Override 1104 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, 1105 Map<byte[], List<Path>> family2Files) throws IOException { 1106 try (AsyncClusterConnection conn = ClusterConnectionFactory 1107 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1108 return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles()); 1109 } 1110 } 1111 1112 @Override 1113 public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException { 1114 try (AsyncClusterConnection conn = ClusterConnectionFactory 1115 .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) { 1116 AsyncAdmin admin = conn.getAdmin(); 1117 if (!FutureUtils.get(admin.tableExists(tableName))) { 1118 if (isCreateTable()) { 1119 createTable(tableName, dir, admin); 1120 } else { 1121 throwAndLogTableNotFoundException(tableName); 1122 } 1123 } 1124 return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles()); 1125 } 1126 } 1127 1128 /** 1129 * @throws TableNotFoundException if table does not exist. 1130 */ 1131 private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException { 1132 if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) { 1133 throwAndLogTableNotFoundException(tableName); 1134 } 1135 } 1136 1137 private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException { 1138 String errorMsg = format("Table '%s' does not exist.", tn); 1139 LOG.error(errorMsg); 1140 throw new TableNotFoundException(errorMsg); 1141 } 1142 1143 public void setBulkToken(String bulkToken) { 1144 this.bulkToken = bulkToken; 1145 } 1146 1147 public void setClusterIds(List<String> clusterIds) { 1148 this.clusterIds = clusterIds; 1149 } 1150 1151 private void usage() { 1152 System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] " 1153 + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n" 1154 + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- " 1155 + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D" 1156 + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target " 1157 + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY 1158 + "=yes to ignore unmatched column families.\n" 1159 + " -loadTable for when directory of files to load has a depth of 3; target table must " 1160 + "exist;\n" + " must be last of the options on command line.\n" 1161 + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for " 1162 + "documentation.\n"); 1163 } 1164 1165 @Override 1166 public int run(String[] args) throws Exception { 1167 if (args.length != 2 && args.length != 3) { 1168 usage(); 1169 return -1; 1170 } 1171 // Re-initialize to apply -D options from the command line parameters 1172 initialize(); 1173 Path dirPath = new Path(args[0]); 1174 TableName tableName = TableName.valueOf(args[1]); 1175 if (args.length == 2) { 1176 return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1; 1177 } else { 1178 Map<byte[], List<Path>> family2Files = Maps.newHashMap(); 1179 FileSystem fs = FileSystem.get(getConf()); 1180 for (FileStatus regionDir : fs.listStatus(dirPath)) { 1181 FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { 1182 Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); 1183 byte[] familyName = Bytes.toBytes(family); 1184 if (family2Files.containsKey(familyName)) { 1185 family2Files.get(familyName).add(path); 1186 } else { 1187 family2Files.put(familyName, Lists.newArrayList(path)); 1188 } 1189 }); 1190 } 1191 return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1; 1192 } 1193 } 1194 1195 public static void main(String[] args) throws Exception { 1196 Configuration conf = HBaseConfiguration.create(); 1197 int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); 1198 System.exit(ret); 1199 } 1200 1201 @Override 1202 public void disableReplication() { 1203 this.replicate = false; 1204 } 1205 1206 @Override 1207 public boolean isReplicationDisabled() { 1208 return !this.replicate; 1209 } 1210}