001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static java.lang.String.format;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.nio.ByteBuffer;
026import java.util.ArrayDeque;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Deque;
032import java.util.HashMap;
033import java.util.HashSet;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Optional;
038import java.util.Set;
039import java.util.SortedMap;
040import java.util.TreeMap;
041import java.util.UUID;
042import java.util.concurrent.Callable;
043import java.util.concurrent.CompletableFuture;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.Future;
047import java.util.concurrent.LinkedBlockingQueue;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.atomic.AtomicInteger;
051import java.util.stream.Collectors;
052import org.apache.commons.lang3.mutable.MutableInt;
053import org.apache.hadoop.conf.Configuration;
054import org.apache.hadoop.conf.Configured;
055import org.apache.hadoop.fs.FileStatus;
056import org.apache.hadoop.fs.FileSystem;
057import org.apache.hadoop.fs.Path;
058import org.apache.hadoop.fs.permission.FsPermission;
059import org.apache.hadoop.hbase.HBaseConfiguration;
060import org.apache.hadoop.hbase.HBaseInterfaceAudience;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.TableNotFoundException;
064import org.apache.hadoop.hbase.client.AsyncAdmin;
065import org.apache.hadoop.hbase.client.AsyncClusterConnection;
066import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
067import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
069import org.apache.hadoop.hbase.client.TableDescriptor;
070import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
071import org.apache.hadoop.hbase.io.HFileLink;
072import org.apache.hadoop.hbase.io.HalfStoreFileReader;
073import org.apache.hadoop.hbase.io.Reference;
074import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
075import org.apache.hadoop.hbase.io.hfile.CacheConfig;
076import org.apache.hadoop.hbase.io.hfile.HFile;
077import org.apache.hadoop.hbase.io.hfile.HFileContext;
078import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
079import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
080import org.apache.hadoop.hbase.io.hfile.HFileInfo;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.io.hfile.ReaderContext;
083import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
084import org.apache.hadoop.hbase.regionserver.BloomType;
085import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
086import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
087import org.apache.hadoop.hbase.regionserver.StoreUtils;
088import org.apache.hadoop.hbase.security.UserProvider;
089import org.apache.hadoop.hbase.security.token.FsDelegationToken;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.FSVisitor;
092import org.apache.hadoop.hbase.util.FutureUtils;
093import org.apache.hadoop.hbase.util.Pair;
094import org.apache.hadoop.util.Tool;
095import org.apache.hadoop.util.ToolRunner;
096import org.apache.yetus.audience.InterfaceAudience;
097import org.slf4j.Logger;
098import org.slf4j.LoggerFactory;
099
100import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
101import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
102import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
103import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
104import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
105import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
106
107/**
108 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
109 * tool.
110 */
111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
112public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool {
113
114  private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
115
116  public static final String NAME = "completebulkload";
117  /**
118   * Whether to run validation on hfiles before loading.
119   */
120  private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
121  /**
122   * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
123   * of compacting at server side
124   */
125  public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
126
127  public static final String FAIL_IF_NEED_SPLIT_HFILE =
128    "hbase.loadincremental.fail.if.need.split.hfile";
129
130  // We use a '.' prefix which is ignored when walking directory trees
131  // above. It is invalid family name.
132  static final String TMP_DIR = ".tmp";
133
134  private int maxFilesPerRegionPerFamily;
135  private boolean assignSeqIds;
136  private boolean bulkLoadByFamily;
137
138  // Source delegation token
139  private FsDelegationToken fsDelegationToken;
140  private UserProvider userProvider;
141  private int nrThreads;
142  private final AtomicInteger numRetries = new AtomicInteger(0);
143  private String bulkToken;
144
145  private List<String> clusterIds = new ArrayList<>();
146  private boolean replicate = true;
147  private boolean failIfNeedSplitHFile = false;
148
149  public BulkLoadHFilesTool(Configuration conf) {
150    // make a copy, just to be sure we're not overriding someone else's config
151    super(new Configuration(conf));
152    initialize();
153  }
154
155  public void initialize() {
156    Configuration conf = getConf();
157    // disable blockcache for tool invocation, see HBASE-10500
158    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
159    userProvider = UserProvider.instantiate(conf);
160    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
161    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
162    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
163    nrThreads =
164      conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
165    bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
166    failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false);
167  }
168
169  // Initialize a thread pool
170  private ExecutorService createExecutorService() {
171    ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
172      new LinkedBlockingQueue<>(),
173      new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build());
174    pool.allowCoreThreadTimeOut(true);
175    return pool;
176  }
177
178  private boolean isCreateTable() {
179    return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
180  }
181
182  private boolean isSilence() {
183    return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
184  }
185
186  private boolean isAlwaysCopyFiles() {
187    return getConf().getBoolean(ALWAYS_COPY_FILES, false);
188  }
189
190  private static boolean shouldCopyHFileMetaKey(byte[] key) {
191    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
192    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
193      return false;
194    }
195
196    return !HFileInfo.isReservedFileInfoKey(key);
197  }
198
199  /**
200   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
201   */
202  private static void validateFamiliesInHFiles(TableDescriptor tableDesc,
203    Deque<LoadQueueItem> queue, boolean silence) throws IOException {
204    Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies())
205      .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
206    List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
207      .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
208    if (unmatchedFamilies.size() > 0) {
209      String msg =
210        "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
211          + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName()
212          + " are: " + familyNames;
213      LOG.error(msg);
214      if (!silence) {
215        throw new IOException(msg);
216      }
217    }
218  }
219
220  /**
221   * Populate the Queue with given HFiles
222   */
223  private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
224    map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
225  }
226
227  private interface BulkHFileVisitor<TFamily> {
228
229    TFamily bulkFamily(byte[] familyName) throws IOException;
230
231    void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
232  }
233
234  /**
235   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
236   * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES}
237   * to false.
238   */
239  private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
240    BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
241    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
242    for (FileStatus familyStat : familyDirStatuses) {
243      if (!familyStat.isDirectory()) {
244        LOG.warn("Skipping non-directory " + familyStat.getPath());
245        continue;
246      }
247      Path familyDir = familyStat.getPath();
248      byte[] familyName = Bytes.toBytes(familyDir.getName());
249      // Skip invalid family
250      try {
251        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
252      } catch (IllegalArgumentException e) {
253        LOG.warn("Skipping invalid " + familyStat.getPath());
254        continue;
255      }
256      TFamily family = visitor.bulkFamily(familyName);
257
258      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
259      for (FileStatus hfileStatus : hfileStatuses) {
260        if (!fs.isFile(hfileStatus.getPath())) {
261          LOG.warn("Skipping non-file " + hfileStatus);
262          continue;
263        }
264
265        Path hfile = hfileStatus.getPath();
266        // Skip "_", reference, HFileLink
267        String fileName = hfile.getName();
268        if (fileName.startsWith("_")) {
269          continue;
270        }
271        if (StoreFileInfo.isReference(fileName)) {
272          LOG.warn("Skipping reference " + fileName);
273          continue;
274        }
275        if (HFileLink.isHFileLink(fileName)) {
276          LOG.warn("Skipping HFileLink " + fileName);
277          continue;
278        }
279
280        // Validate HFile Format if needed
281        if (validateHFile) {
282          try {
283            if (!HFile.isHFileFormat(fs, hfile)) {
284              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
285              continue;
286            }
287          } catch (FileNotFoundException e) {
288            LOG.warn("the file " + hfile + " was removed");
289            continue;
290          }
291        }
292
293        visitor.bulkHFile(family, hfileStatus);
294      }
295    }
296  }
297
298  /**
299   * Walk the given directory for all HFiles, and return a Queue containing all such files.
300   */
301  private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir,
302    boolean validateHFile) throws IOException {
303    visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() {
304      @Override
305      public byte[] bulkFamily(final byte[] familyName) {
306        return familyName;
307      }
308
309      @Override
310      public void bulkHFile(final byte[] family, final FileStatus hfile) {
311        long length = hfile.getLen();
312        if (
313          length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
314        ) {
315          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
316            + " bytes can be problematic as it may lead to oversplitting.");
317        }
318        ret.add(new LoadQueueItem(family, hfile.getPath()));
319      }
320    }, validateHFile);
321  }
322
323  /**
324   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
325   * passed directory and validates whether the prepared queue has all the valid table column
326   * families in it.
327   * @param map       map of family to List of hfiles
328   * @param tableName table to which hfiles should be loaded
329   * @param queue     queue which needs to be loaded into the table
330   * @param silence   true to ignore unmatched column families
331   * @throws IOException If any I/O or network error occurred
332   */
333  public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName,
334    Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException {
335    populateLoadQueue(queue, map);
336    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
337      silence);
338  }
339
340  /**
341   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
342   * passed directory and validates whether the prepared queue has all the valid table column
343   * families in it.
344   * @param hfilesDir     directory containing list of hfiles to be loaded into the table
345   * @param queue         queue which needs to be loaded into the table
346   * @param validateHFile if true hfiles will be validated for its format
347   * @param silence       true to ignore unmatched column families
348   * @throws IOException If any I/O or network error occurred
349   */
350  public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn,
351    TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile,
352    boolean silence) throws IOException {
353    discoverLoadQueue(conf, queue, hfilesDir, validateHFile);
354    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
355      silence);
356  }
357
358  /**
359   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
360   * <ol>
361   * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)}
362   * </li>
363   * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)}
364   * </li>
365   * </ol>
366   * @param conn      Connection to use
367   * @param tableName Table to which these hfiles should be loaded to
368   * @param queue     {@code LoadQueueItem} has hfiles yet to be loaded
369   */
370  public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName,
371    Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException {
372    ExecutorService pool = createExecutorService();
373    try {
374      Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool,
375        queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst();
376      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
377    } finally {
378      pool.shutdown();
379    }
380  }
381
382  /**
383   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
384   * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
385   * maintain row atomicity guarantees, region server side should succeed atomically and fails
386   * atomically.
387   * @param conn      Connection to use
388   * @param tableName Table to which these hfiles should be loaded to
389   * @param copyFiles whether replicate to peer cluster while bulkloading
390   * @param first     the start key of region
391   * @param lqis      hfiles should be loaded
392   * @return empty list if success, list of items to retry on recoverable failure
393   */
394  @InterfaceAudience.Private
395  protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
396    final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
397    final byte[] first, Collection<LoadQueueItem> lqis) {
398    List<Pair<byte[], String>> familyPaths =
399      lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
400        .collect(Collectors.toList());
401    CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
402    FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
403      fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
404      (loaded, error) -> {
405        if (error != null) {
406          LOG.error("Encountered unrecoverable error from region server", error);
407          if (
408            getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
409              && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
410                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
411          ) {
412            LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
413              + numRetries.incrementAndGet());
414            // return lqi's to retry
415            future.complete(lqis);
416          } else {
417            LOG.error(RETRY_ON_IO_EXCEPTION
418              + " is disabled or we have reached retry limit. Unable to recover");
419            future.completeExceptionally(error);
420          }
421        } else {
422          if (loaded) {
423            future.complete(Collections.emptyList());
424          } else {
425            LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
426              + " into table " + tableName + " with files " + lqis
427              + " failed.  This is recoverable and they will be retried.");
428            // return lqi's to retry
429            future.complete(lqis);
430          }
431        }
432      });
433    return future;
434  }
435
436  /**
437   * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
438   * re-queued for another pass with the groupOrSplitPhase.
439   * <p/>
440   * protected for testing.
441   */
442  @InterfaceAudience.Private
443  protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
444    Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles,
445    Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
446    // atomically bulk load the groups.
447    List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
448    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
449      .entrySet()) {
450      byte[] first = entry.getKey().array();
451      final Collection<LoadQueueItem> lqis = entry.getValue();
452      if (bulkLoadByFamily) {
453        groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
454          .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
455      } else {
456        loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
457      }
458      if (item2RegionMap != null) {
459        for (LoadQueueItem lqi : lqis) {
460          item2RegionMap.put(lqi, entry.getKey());
461        }
462      }
463    }
464
465    // get all the results.
466    for (Future<Collection<LoadQueueItem>> future : loadingFutures) {
467      try {
468        Collection<LoadQueueItem> toRetry = future.get();
469
470        if (item2RegionMap != null) {
471          for (LoadQueueItem lqi : toRetry) {
472            item2RegionMap.remove(lqi);
473          }
474        }
475        // LQIs that are requeued to be regrouped.
476        queue.addAll(toRetry);
477      } catch (ExecutionException e1) {
478        Throwable t = e1.getCause();
479        if (t instanceof IOException) {
480          // At this point something unrecoverable has happened.
481          // TODO Implement bulk load recovery
482          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
483        }
484        LOG.error("Unexpected execution exception during bulk load", e1);
485        throw new IllegalStateException(t);
486      } catch (InterruptedException e1) {
487        LOG.error("Unexpected interrupted exception during bulk load", e1);
488        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
489      }
490    }
491  }
492
493  private Map<byte[], Collection<LoadQueueItem>>
494    groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
495    Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
496    itemsInRegion.forEach(item -> families2Queue
497      .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
498    return families2Queue;
499  }
500
501  private boolean
502    checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
503    for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
504      Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
505      for (LoadQueueItem lqi : e.getValue()) {
506        MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
507        count.increment();
508        if (count.intValue() > maxFilesPerRegionPerFamily) {
509          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family "
510            + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key "
511            + Bytes.toStringBinary(e.getKey()));
512          return false;
513        }
514      }
515    }
516    return true;
517  }
518
519  /**
520   * @param conn         the HBase cluster connection
521   * @param tableName    the table name of the table to load into
522   * @param pool         the ExecutorService
523   * @param queue        the queue for LoadQueueItem
524   * @param startEndKeys start and end keys
525   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
526   */
527  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
528    AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
529    Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
530    // <region start key, LQI> need synchronized only within this scope of this
531    // phase because of the puts that happen in futures.
532    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
533    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
534    Set<String> missingHFiles = new HashSet<>();
535    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
536      new Pair<>(regionGroups, missingHFiles);
537
538    // drain LQIs and figure out bulk load groups
539    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
540    while (!queue.isEmpty()) {
541      final LoadQueueItem item = queue.remove();
542
543      final Callable<Pair<List<LoadQueueItem>, String>> call =
544        () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
545      splittingFutures.add(pool.submit(call));
546    }
547    // get all the results. All grouping and splitting must finish before
548    // we can attempt the atomic loads.
549    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
550      try {
551        Pair<List<LoadQueueItem>, String> splits = lqis.get();
552        if (splits != null) {
553          if (splits.getFirst() != null) {
554            queue.addAll(splits.getFirst());
555          } else {
556            missingHFiles.add(splits.getSecond());
557          }
558        }
559      } catch (ExecutionException e1) {
560        Throwable t = e1.getCause();
561        if (t instanceof IOException) {
562          LOG.error("IOException during splitting", e1);
563          throw (IOException) t; // would have been thrown if not parallelized,
564        }
565        LOG.error("Unexpected execution exception during splitting", e1);
566        throw new IllegalStateException(t);
567      } catch (InterruptedException e1) {
568        LOG.error("Unexpected interrupted exception during splitting", e1);
569        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
570      }
571    }
572    return pair;
573  }
574
575  // unique file name for the table
576  private String getUniqueName() {
577    return UUID.randomUUID().toString().replaceAll("-", "");
578  }
579
580  private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
581    byte[] splitKey) throws IOException {
582    Path hfilePath = item.getFilePath();
583    byte[] family = item.getFamily();
584    Path tmpDir = hfilePath.getParent();
585    if (!tmpDir.getName().equals(TMP_DIR)) {
586      tmpDir = new Path(tmpDir, TMP_DIR);
587    }
588
589    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
590
591    String uniqueName = getUniqueName();
592    ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family);
593
594    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
595    Path topOut = new Path(tmpDir, uniqueName + ".top");
596    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
597
598    FileSystem fs = tmpDir.getFileSystem(getConf());
599    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
600    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
601    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
602
603    // Add these back at the *front* of the queue, so there's a lower
604    // chance that the region will just split again before we get there.
605    List<LoadQueueItem> lqis = new ArrayList<>(2);
606    lqis.add(new LoadQueueItem(family, botOut));
607    lqis.add(new LoadQueueItem(family, topOut));
608
609    // If the current item is already the result of previous splits,
610    // we don't need it anymore. Clean up to save space.
611    // It is not part of the original input files.
612    try {
613      if (tmpDir.getName().equals(TMP_DIR)) {
614        fs.delete(hfilePath, false);
615      }
616    } catch (IOException e) {
617      LOG.warn("Unable to delete temporary split file " + hfilePath);
618    }
619    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
620    return lqis;
621  }
622
623  /**
624   * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
625   *                     order by start key
626   * @param key          the key need to find which region belong to
627   * @return region index
628   */
629  private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) {
630    int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW),
631      (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
632    if (idx < 0) {
633      // not on boundary, returns -(insertion index). Calculate region it
634      // would be in.
635      idx = -(idx + 1) - 1;
636    }
637    return idx;
638  }
639
640  /**
641   * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then
642   * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the
643   * next region. 3) if the endkey of the last region is not empty.
644   */
645  private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys,
646    TableName tableName) throws IOException {
647    if (idx < 0) {
648      throw new IOException("The first region info for table " + tableName
649        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
650    } else if (
651      (idx == startEndKeys.size() - 1)
652        && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY)
653    ) {
654      throw new IOException("The last region info for table " + tableName
655        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
656    } else if (
657      idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(),
658        startEndKeys.get(idx + 1).getFirst()) == 0)
659    ) {
660      throw new IOException("The endkey of one region for table " + tableName
661        + " is not equal to the startkey of the next region in hbase:meta."
662        + "Please use hbck tool to fix it first.");
663    }
664  }
665
666  /**
667   * Attempt to assign the given load queue item into its target region group. If the hfile boundary
668   * no longer fits into a region, physically splits the hfile such that the new bottom half will
669   * fit and returns the list of LQI's corresponding to the resultant hfiles.
670   * <p/>
671   * protected for testing
672   * @throws IOException if an IO failure is encountered
673   */
674  @InterfaceAudience.Private
675  protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
676    TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
677    List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
678    Path hfilePath = item.getFilePath();
679    Optional<byte[]> first, last;
680    try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
681      CacheConfig.DISABLED, true, getConf())) {
682      first = hfr.getFirstRowKey();
683      last = hfr.getLastRowKey();
684    } catch (FileNotFoundException fnfe) {
685      LOG.debug("encountered", fnfe);
686      return new Pair<>(null, hfilePath.getName());
687    }
688
689    LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary)
690      + " last=" + last.map(Bytes::toStringBinary));
691    if (!first.isPresent() || !last.isPresent()) {
692      assert !first.isPresent() && !last.isPresent();
693      // TODO what if this is due to a bad HFile?
694      LOG.info("hfile " + hfilePath + " has no entries, skipping");
695      return null;
696    }
697    if (Bytes.compareTo(first.get(), last.get()) > 0) {
698      throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
699        + " > " + Bytes.toStringBinary(last.get()));
700    }
701    int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
702    checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName);
703    boolean lastKeyInRange =
704      Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
705        .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
706    if (!lastKeyInRange) {
707      if (failIfNeedSplitHFile) {
708        throw new IOException(
709          "The key range of hfile=" + hfilePath + " fits into no region. " + "And because "
710            + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps.");
711      }
712      int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
713      int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
714      // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
715      // than hfile.endkey w/o this check
716      if (splitIdx != firstKeyRegionIdx) {
717        checkRegionIndexValid(splitIdx, startEndKeys, tableName);
718      }
719      byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
720      List<LoadQueueItem> lqis =
721        splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
722      return new Pair<>(lqis, null);
723    }
724
725    // group regions.
726    regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item);
727    return null;
728  }
729
730  /**
731   * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
732   * filters, etc.
733   */
734  @InterfaceAudience.Private
735  static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
736    byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
737    // Open reader with no block cache, and not in-memory
738    Reference topReference = Reference.createTopReference(splitKey);
739    Reference bottomReference = Reference.createBottomReference(splitKey);
740
741    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
742    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
743  }
744
745  /**
746   * Copy half of an HFile into a new HFile.
747   */
748  private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
749    Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
750    FileSystem fs = inFile.getFileSystem(conf);
751    CacheConfig cacheConf = CacheConfig.DISABLED;
752    HalfStoreFileReader halfReader = null;
753    StoreFileWriter halfWriter = null;
754    try {
755      ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
756      StoreFileInfo storeFileInfo =
757        new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
758      storeFileInfo.initHFileInfo(context);
759      halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
760      storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
761      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
762
763      int blocksize = familyDescriptor.getBlocksize();
764      Algorithm compression = familyDescriptor.getCompressionType();
765      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
766      HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
767        .withChecksumType(StoreUtils.getChecksumType(conf))
768        .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
769        .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
770        .build();
771      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
772        .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
773      HFileScanner scanner = halfReader.getScanner(false, false, false);
774      scanner.seekTo();
775      do {
776        halfWriter.append(scanner.getCell());
777      } while (scanner.next());
778
779      for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
780        if (shouldCopyHFileMetaKey(entry.getKey())) {
781          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
782        }
783      }
784    } finally {
785      if (halfReader != null) {
786        try {
787          halfReader.close(cacheConf.shouldEvictOnClose());
788        } catch (IOException e) {
789          LOG.warn("failed to close hfile reader for " + inFile, e);
790        }
791      }
792      if (halfWriter != null) {
793        halfWriter.close();
794      }
795    }
796  }
797
798  /**
799   * Infers region boundaries for a new table.
800   * <p/>
801   * Parameter: <br/>
802   * bdryMap is a map between keys to an integer belonging to {+1, -1}
803   * <ul>
804   * <li>If a key is a start key of a file, then it maps to +1</li>
805   * <li>If a key is an end key of a file, then it maps to -1</li>
806   * </ul>
807   * <p>
808   * Algo:<br/>
809   * <ol>
810   * <li>Poll on the keys in order:
811   * <ol type="a">
812   * <li>Keep adding the mapped values to these keys (runningSum)</li>
813   * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
814   * boundary list.</li>
815   * </ol>
816   * </li>
817   * <li>Return the boundary list.</li>
818   * </ol>
819   */
820  public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
821    List<byte[]> keysArray = new ArrayList<>();
822    int runningValue = 0;
823    byte[] currStartKey = null;
824    boolean firstBoundary = true;
825
826    for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
827      if (runningValue == 0) {
828        currStartKey = item.getKey();
829      }
830      runningValue += item.getValue();
831      if (runningValue == 0) {
832        if (!firstBoundary) {
833          keysArray.add(currStartKey);
834        }
835        firstBoundary = false;
836      }
837    }
838
839    return keysArray.toArray(new byte[0][]);
840  }
841
842  /**
843   * If the table is created for the first time, then "completebulkload" reads the files twice. More
844   * modifications necessary if we want to avoid doing it.
845   */
846  private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException {
847    final FileSystem fs = hfofDir.getFileSystem(getConf());
848
849    // Add column families
850    // Build a set of keys
851    List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
852    SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
853    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
854      @Override
855      public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
856        ColumnFamilyDescriptorBuilder builder =
857          ColumnFamilyDescriptorBuilder.newBuilder(familyName);
858        familyBuilders.add(builder);
859        return builder;
860      }
861
862      @Override
863      public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
864        throws IOException {
865        Path hfile = hfileStatus.getPath();
866        try (HFile.Reader reader =
867          HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
868          if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
869            builder.setCompressionType(reader.getFileContext().getCompression());
870            LOG.info("Setting compression " + reader.getFileContext().getCompression().name()
871              + " for family " + builder.getNameAsString());
872          }
873          byte[] first = reader.getFirstRowKey().get();
874          byte[] last = reader.getLastRowKey().get();
875
876          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first="
877            + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
878
879          // To eventually infer start key-end key boundaries
880          Integer value = map.getOrDefault(first, 0);
881          map.put(first, value + 1);
882
883          value = map.containsKey(last) ? map.get(last) : 0;
884          map.put(last, value - 1);
885        }
886      }
887    }, true);
888
889    byte[][] keys = inferBoundaries(map);
890    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
891    familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
892      .forEachOrdered(tdBuilder::setColumnFamily);
893    FutureUtils.get(admin.createTable(tdBuilder.build(), keys));
894
895    LOG.info("Table " + tableName + " is available!!");
896  }
897
898  private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn,
899    TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile)
900    throws IOException {
901    int count = 0;
902
903    fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
904    bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName));
905    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
906
907    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
908    // Assumes that region splits can happen while this occurs.
909    while (!queue.isEmpty()) {
910      // need to reload split keys each iteration.
911      final List<Pair<byte[], byte[]>> startEndKeys =
912        FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys());
913      if (count != 0) {
914        LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with "
915          + queue.size() + " files remaining to group or split");
916      }
917
918      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
919      maxRetries = Math.max(maxRetries, startEndKeys.size() + 1);
920      if (maxRetries != 0 && count >= maxRetries) {
921        throw new IOException(
922          "Retry attempted " + count + " times without completing, bailing out");
923      }
924      count++;
925
926      // Using ByteBuffer for byte[] equality semantics
927      pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys);
928      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
929
930      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
931        // Error is logged inside checkHFilesCountPerRegionPerFamily.
932        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
933          + " hfiles to one family of one region");
934      }
935
936      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap);
937
938      // NOTE: The next iteration's split / group could happen in parallel to
939      // atomic bulkloads assuming that there are splits and no merges, and
940      // that we can atomically pull out the groups we want to retry.
941    }
942
943    return item2RegionMap;
944  }
945
946  private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue,
947    ExecutorService pool) throws IOException {
948    fsDelegationToken.releaseDelegationToken();
949    if (bulkToken != null) {
950      conn.cleanupBulkLoad(tableName, bulkToken);
951    }
952    if (pool != null) {
953      pool.shutdown();
954    }
955    if (!queue.isEmpty()) {
956      StringBuilder err = new StringBuilder();
957      err.append("-------------------------------------------------\n");
958      err.append("Bulk load aborted with some files not yet loaded:\n");
959      err.append("-------------------------------------------------\n");
960      for (LoadQueueItem q : queue) {
961        err.append("  ").append(q.getFilePath()).append('\n');
962      }
963      LOG.error(err.toString());
964    }
965  }
966
967  /**
968   * Perform a bulk load of the given map of families to hfiles into the given pre-existing table.
969   * This method is not threadsafe.
970   * @param map       map of family to List of hfiles
971   * @param tableName table to load the hfiles
972   * @param silence   true to ignore unmatched column families
973   * @param copyFile  always copy hfiles if true
974   */
975  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
976    TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile)
977    throws IOException {
978    tableExists(conn, tableName);
979    // LQI queue does not need to be threadsafe -- all operations on this queue
980    // happen in this thread
981    Deque<LoadQueueItem> queue = new ArrayDeque<>();
982    ExecutorService pool = null;
983    try {
984      prepareHFileQueue(conn, tableName, map, queue, silence);
985      if (queue.isEmpty()) {
986        LOG.warn("Bulk load operation did not get any files to load");
987        return Collections.emptyMap();
988      }
989      pool = createExecutorService();
990      return performBulkLoad(conn, tableName, queue, pool, copyFile);
991    } finally {
992      cleanup(conn, tableName, queue, pool);
993    }
994  }
995
996  /**
997   * Perform a bulk load of the given directory into the given pre-existing table. This method is
998   * not threadsafe.
999   * @param tableName table to load the hfiles
1000   * @param hfofDir   the directory that was provided as the output path of a job using
1001   *                  HFileOutputFormat
1002   * @param silence   true to ignore unmatched column families
1003   * @param copyFile  always copy hfiles if true
1004   */
1005  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
1006    TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException {
1007    tableExists(conn, tableName);
1008
1009    /*
1010     * Checking hfile format is a time-consuming operation, we should have an option to skip this
1011     * step when bulkloading millions of HFiles. See HBASE-13985.
1012     */
1013    boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true);
1014    if (!validateHFile) {
1015      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files "
1016        + "are not correct. If you fail to read data from your table after using this "
1017        + "option, consider removing the files and bulkload again without this option. "
1018        + "See HBASE-13985");
1019    }
1020    // LQI queue does not need to be threadsafe -- all operations on this queue
1021    // happen in this thread
1022    Deque<LoadQueueItem> queue = new ArrayDeque<>();
1023    ExecutorService pool = null;
1024    try {
1025      prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence);
1026
1027      if (queue.isEmpty()) {
1028        LOG.warn(
1029          "Bulk load operation did not find any files to load in directory {}. "
1030            + "Does it contain files in subdirectories that correspond to column family names?",
1031          (hfofDir != null ? hfofDir.toUri().toString() : ""));
1032        return Collections.emptyMap();
1033      }
1034      pool = createExecutorService();
1035      return performBulkLoad(conn, tableName, queue, pool, copyFile);
1036    } finally {
1037      cleanup(conn, tableName, queue, pool);
1038    }
1039  }
1040
1041  @Override
1042  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
1043    Map<byte[], List<Path>> family2Files) throws IOException {
1044    try (AsyncClusterConnection conn = ClusterConnectionFactory
1045      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1046      return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles());
1047    }
1048  }
1049
1050  @Override
1051  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException {
1052    try (AsyncClusterConnection conn = ClusterConnectionFactory
1053      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1054      AsyncAdmin admin = conn.getAdmin();
1055      if (!FutureUtils.get(admin.tableExists(tableName))) {
1056        if (isCreateTable()) {
1057          createTable(tableName, dir, admin);
1058        } else {
1059          throwAndLogTableNotFoundException(tableName);
1060        }
1061      }
1062      return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles());
1063    }
1064  }
1065
1066  /**
1067   * @throws TableNotFoundException if table does not exist.
1068   */
1069  private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException {
1070    if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
1071      throwAndLogTableNotFoundException(tableName);
1072    }
1073  }
1074
1075  private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException {
1076    String errorMsg = format("Table '%s' does not exist.", tn);
1077    LOG.error(errorMsg);
1078    throw new TableNotFoundException(errorMsg);
1079  }
1080
1081  public void setBulkToken(String bulkToken) {
1082    this.bulkToken = bulkToken;
1083  }
1084
1085  public void setClusterIds(List<String> clusterIds) {
1086    this.clusterIds = clusterIds;
1087  }
1088
1089  private void usage() {
1090    System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
1091      + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
1092      + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- "
1093      + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D"
1094      + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target "
1095      + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY
1096      + "=yes to ignore unmatched column families.\n"
1097      + " -loadTable for when directory of files to load has a depth of 3; target table must "
1098      + "exist;\n" + " must be last of the options on command line.\n"
1099      + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for "
1100      + "documentation.\n");
1101  }
1102
1103  @Override
1104  public int run(String[] args) throws Exception {
1105    if (args.length != 2 && args.length != 3) {
1106      usage();
1107      return -1;
1108    }
1109    // Re-initialize to apply -D options from the command line parameters
1110    initialize();
1111    Path dirPath = new Path(args[0]);
1112    TableName tableName = TableName.valueOf(args[1]);
1113    if (args.length == 2) {
1114      return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1;
1115    } else {
1116      Map<byte[], List<Path>> family2Files = Maps.newHashMap();
1117      FileSystem fs = FileSystem.get(getConf());
1118      for (FileStatus regionDir : fs.listStatus(dirPath)) {
1119        FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
1120          Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
1121          byte[] familyName = Bytes.toBytes(family);
1122          if (family2Files.containsKey(familyName)) {
1123            family2Files.get(familyName).add(path);
1124          } else {
1125            family2Files.put(familyName, Lists.newArrayList(path));
1126          }
1127        });
1128      }
1129      return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1;
1130    }
1131  }
1132
1133  public static void main(String[] args) throws Exception {
1134    Configuration conf = HBaseConfiguration.create();
1135    int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
1136    System.exit(ret);
1137  }
1138
1139  @Override
1140  public void disableReplication() {
1141    this.replicate = false;
1142  }
1143
1144  @Override
1145  public boolean isReplicationDisabled() {
1146    return !this.replicate;
1147  }
1148}