001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.tool;
019
020import static java.lang.String.format;
021
022import java.io.FileNotFoundException;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.net.InetSocketAddress;
026import java.nio.ByteBuffer;
027import java.util.ArrayDeque;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.Collections;
032import java.util.Deque;
033import java.util.HashMap;
034import java.util.HashSet;
035import java.util.List;
036import java.util.Map;
037import java.util.Map.Entry;
038import java.util.Optional;
039import java.util.Set;
040import java.util.SortedMap;
041import java.util.TreeMap;
042import java.util.UUID;
043import java.util.concurrent.Callable;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ExecutionException;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.Future;
048import java.util.concurrent.LinkedBlockingQueue;
049import java.util.concurrent.ThreadPoolExecutor;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.stream.Collectors;
053import org.apache.commons.lang3.mutable.MutableInt;
054import org.apache.hadoop.conf.Configuration;
055import org.apache.hadoop.conf.Configured;
056import org.apache.hadoop.fs.FileStatus;
057import org.apache.hadoop.fs.FileSystem;
058import org.apache.hadoop.fs.Path;
059import org.apache.hadoop.fs.permission.FsPermission;
060import org.apache.hadoop.hbase.Cell;
061import org.apache.hadoop.hbase.CellUtil;
062import org.apache.hadoop.hbase.HBaseConfiguration;
063import org.apache.hadoop.hbase.HBaseInterfaceAudience;
064import org.apache.hadoop.hbase.HConstants;
065import org.apache.hadoop.hbase.HRegionLocation;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.TableNotFoundException;
068import org.apache.hadoop.hbase.client.AsyncAdmin;
069import org.apache.hadoop.hbase.client.AsyncClusterConnection;
070import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
071import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
072import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
073import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
074import org.apache.hadoop.hbase.client.TableDescriptor;
075import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
076import org.apache.hadoop.hbase.io.HFileLink;
077import org.apache.hadoop.hbase.io.HalfStoreFileReader;
078import org.apache.hadoop.hbase.io.Reference;
079import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
080import org.apache.hadoop.hbase.io.hfile.CacheConfig;
081import org.apache.hadoop.hbase.io.hfile.HFile;
082import org.apache.hadoop.hbase.io.hfile.HFileContext;
083import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
084import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
085import org.apache.hadoop.hbase.io.hfile.HFileInfo;
086import org.apache.hadoop.hbase.io.hfile.HFileScanner;
087import org.apache.hadoop.hbase.io.hfile.ReaderContext;
088import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
091import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
092import org.apache.hadoop.hbase.regionserver.StoreUtils;
093import org.apache.hadoop.hbase.security.UserProvider;
094import org.apache.hadoop.hbase.security.token.FsDelegationToken;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
097import org.apache.hadoop.hbase.util.FSVisitor;
098import org.apache.hadoop.hbase.util.FutureUtils;
099import org.apache.hadoop.hbase.util.Pair;
100import org.apache.hadoop.util.Tool;
101import org.apache.hadoop.util.ToolRunner;
102import org.apache.yetus.audience.InterfaceAudience;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
107import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
108import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
109import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
110import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
111import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
112
113/**
114 * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
115 * tool.
116 */
117@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
118public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool {
119
120  private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
121
122  /**
123   * Keep locality while generating HFiles for bulkload. See HBASE-12596
124   */
125  public static final String LOCALITY_SENSITIVE_CONF_KEY =
126    "hbase.bulkload.locality.sensitive.enabled";
127  private static final boolean DEFAULT_LOCALITY_SENSITIVE = true;
128
129  public static final String NAME = "completebulkload";
130  /**
131   * Whether to run validation on hfiles before loading.
132   */
133  private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
134  /**
135   * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
136   * of compacting at server side
137   */
138  public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
139
140  public static final String FAIL_IF_NEED_SPLIT_HFILE =
141    "hbase.loadincremental.fail.if.need.split.hfile";
142
143  // We use a '.' prefix which is ignored when walking directory trees
144  // above. It is invalid family name.
145  static final String TMP_DIR = ".tmp";
146
147  private int maxFilesPerRegionPerFamily;
148  private boolean assignSeqIds;
149  private boolean bulkLoadByFamily;
150
151  // Source delegation token
152  private FsDelegationToken fsDelegationToken;
153  private UserProvider userProvider;
154  private int nrThreads;
155  private final AtomicInteger numRetries = new AtomicInteger(0);
156  private String bulkToken;
157
158  private List<String> clusterIds = new ArrayList<>();
159  private boolean replicate = true;
160  private boolean failIfNeedSplitHFile = false;
161
162  public BulkLoadHFilesTool(Configuration conf) {
163    // make a copy, just to be sure we're not overriding someone else's config
164    super(new Configuration(conf));
165    initialize();
166  }
167
168  public void initialize() {
169    Configuration conf = getConf();
170    // disable blockcache for tool invocation, see HBASE-10500
171    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
172    userProvider = UserProvider.instantiate(conf);
173    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
174    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
175    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
176    nrThreads =
177      conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
178    bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
179    failIfNeedSplitHFile = conf.getBoolean(FAIL_IF_NEED_SPLIT_HFILE, false);
180  }
181
182  // Initialize a thread pool
183  private ExecutorService createExecutorService() {
184    ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
185      new LinkedBlockingQueue<>(),
186      new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").setDaemon(true).build());
187    pool.allowCoreThreadTimeOut(true);
188    return pool;
189  }
190
191  private boolean isCreateTable() {
192    return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
193  }
194
195  private boolean isSilence() {
196    return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
197  }
198
199  private boolean isAlwaysCopyFiles() {
200    return getConf().getBoolean(ALWAYS_COPY_FILES, false);
201  }
202
203  private static boolean shouldCopyHFileMetaKey(byte[] key) {
204    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
205    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
206      return false;
207    }
208
209    return !HFileInfo.isReservedFileInfoKey(key);
210  }
211
212  /**
213   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
214   */
215  private static void validateFamiliesInHFiles(TableDescriptor tableDesc,
216    Deque<LoadQueueItem> queue, boolean silence) throws IOException {
217    Set<String> familyNames = Arrays.stream(tableDesc.getColumnFamilies())
218      .map(ColumnFamilyDescriptor::getNameAsString).collect(Collectors.toSet());
219    List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
220      .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
221    if (unmatchedFamilies.size() > 0) {
222      String msg =
223        "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
224          + unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName()
225          + " are: " + familyNames;
226      LOG.error(msg);
227      if (!silence) {
228        throw new IOException(msg);
229      }
230    }
231  }
232
233  /**
234   * Populate the Queue with given HFiles
235   */
236  private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
237    map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
238  }
239
240  private interface BulkHFileVisitor<TFamily> {
241
242    TFamily bulkFamily(byte[] familyName) throws IOException;
243
244    void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
245  }
246
247  /**
248   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
249   * skip non-valid hfiles by default, or skip this validation by setting {@link #VALIDATE_HFILES}
250   * to false.
251   */
252  private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
253    BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
254    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
255    for (FileStatus familyStat : familyDirStatuses) {
256      if (!familyStat.isDirectory()) {
257        LOG.warn("Skipping non-directory " + familyStat.getPath());
258        continue;
259      }
260      Path familyDir = familyStat.getPath();
261      byte[] familyName = Bytes.toBytes(familyDir.getName());
262      // Skip invalid family
263      try {
264        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
265      } catch (IllegalArgumentException e) {
266        LOG.warn("Skipping invalid " + familyStat.getPath());
267        continue;
268      }
269      TFamily family = visitor.bulkFamily(familyName);
270
271      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
272      for (FileStatus hfileStatus : hfileStatuses) {
273        if (!fs.isFile(hfileStatus.getPath())) {
274          LOG.warn("Skipping non-file " + hfileStatus);
275          continue;
276        }
277
278        Path hfile = hfileStatus.getPath();
279        // Skip "_", reference, HFileLink
280        String fileName = hfile.getName();
281        if (fileName.startsWith("_")) {
282          continue;
283        }
284        if (StoreFileInfo.isReference(fileName)) {
285          LOG.warn("Skipping reference " + fileName);
286          continue;
287        }
288        if (HFileLink.isHFileLink(fileName)) {
289          LOG.warn("Skipping HFileLink " + fileName);
290          continue;
291        }
292
293        // Validate HFile Format if needed
294        if (validateHFile) {
295          try {
296            if (!HFile.isHFileFormat(fs, hfile)) {
297              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
298              continue;
299            }
300          } catch (FileNotFoundException e) {
301            LOG.warn("the file " + hfile + " was removed");
302            continue;
303          }
304        }
305
306        visitor.bulkHFile(family, hfileStatus);
307      }
308    }
309  }
310
311  /**
312   * Walk the given directory for all HFiles, and return a Queue containing all such files.
313   */
314  private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir,
315    boolean validateHFile) throws IOException {
316    visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() {
317      @Override
318      public byte[] bulkFamily(final byte[] familyName) {
319        return familyName;
320      }
321
322      @Override
323      public void bulkHFile(final byte[] family, final FileStatus hfile) {
324        long length = hfile.getLen();
325        if (
326          length > conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)
327        ) {
328          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
329            + " bytes can be problematic as it may lead to oversplitting.");
330        }
331        ret.add(new LoadQueueItem(family, hfile.getPath()));
332      }
333    }, validateHFile);
334  }
335
336  /**
337   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
338   * passed directory and validates whether the prepared queue has all the valid table column
339   * families in it.
340   * @param map       map of family to List of hfiles
341   * @param tableName table to which hfiles should be loaded
342   * @param queue     queue which needs to be loaded into the table
343   * @param silence   true to ignore unmatched column families
344   * @throws IOException If any I/O or network error occurred
345   */
346  public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName,
347    Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException {
348    populateLoadQueue(queue, map);
349    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
350      silence);
351  }
352
353  /**
354   * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
355   * passed directory and validates whether the prepared queue has all the valid table column
356   * families in it.
357   * @param hfilesDir     directory containing list of hfiles to be loaded into the table
358   * @param queue         queue which needs to be loaded into the table
359   * @param validateHFile if true hfiles will be validated for its format
360   * @param silence       true to ignore unmatched column families
361   * @throws IOException If any I/O or network error occurred
362   */
363  public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn,
364    TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile,
365    boolean silence) throws IOException {
366    discoverLoadQueue(conf, queue, hfilesDir, validateHFile);
367    validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
368      silence);
369  }
370
371  /**
372   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
373   * <ol>
374   * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)}
375   * </li>
376   * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)}
377   * </li>
378   * </ol>
379   * @param conn      Connection to use
380   * @param tableName Table to which these hfiles should be loaded to
381   * @param queue     {@code LoadQueueItem} has hfiles yet to be loaded
382   */
383  public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName,
384    Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException {
385    ExecutorService pool = createExecutorService();
386    try {
387      Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool,
388        queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst();
389      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
390    } finally {
391      pool.shutdown();
392    }
393  }
394
395  /**
396   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
397   * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
398   * maintain row atomicity guarantees, region server side should succeed atomically and fails
399   * atomically.
400   * @param conn      Connection to use
401   * @param tableName Table to which these hfiles should be loaded to
402   * @param copyFiles whether replicate to peer cluster while bulkloading
403   * @param first     the start key of region
404   * @param lqis      hfiles should be loaded
405   * @return empty list if success, list of items to retry on recoverable failure
406   */
407  @InterfaceAudience.Private
408  protected CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
409    final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
410    final byte[] first, Collection<LoadQueueItem> lqis) {
411    List<Pair<byte[], String>> familyPaths =
412      lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
413        .collect(Collectors.toList());
414    CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
415    FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
416      fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
417      (loaded, error) -> {
418        if (error != null) {
419          LOG.error("Encountered unrecoverable error from region server", error);
420          if (
421            getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
422              && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
423                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)
424          ) {
425            LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
426              + numRetries.incrementAndGet());
427            // return lqi's to retry
428            future.complete(lqis);
429          } else {
430            LOG.error(RETRY_ON_IO_EXCEPTION
431              + " is disabled or we have reached retry limit. Unable to recover");
432            future.completeExceptionally(error);
433          }
434        } else {
435          if (loaded) {
436            future.complete(Collections.emptyList());
437          } else {
438            LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
439              + " into table " + tableName + " with files " + lqis
440              + " failed.  This is recoverable and they will be retried.");
441            // return lqi's to retry
442            future.complete(lqis);
443          }
444        }
445      });
446    return future;
447  }
448
449  /**
450   * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
451   * re-queued for another pass with the groupOrSplitPhase.
452   * <p/>
453   * protected for testing.
454   */
455  @InterfaceAudience.Private
456  protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
457    Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFiles,
458    Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
459    // atomically bulk load the groups.
460    List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
461    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
462      .entrySet()) {
463      byte[] first = entry.getKey().array();
464      final Collection<LoadQueueItem> lqis = entry.getValue();
465      if (bulkLoadByFamily) {
466        groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
467          .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
468      } else {
469        loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
470      }
471      if (item2RegionMap != null) {
472        for (LoadQueueItem lqi : lqis) {
473          item2RegionMap.put(lqi, entry.getKey());
474        }
475      }
476    }
477
478    // get all the results.
479    for (Future<Collection<LoadQueueItem>> future : loadingFutures) {
480      try {
481        Collection<LoadQueueItem> toRetry = future.get();
482
483        if (item2RegionMap != null) {
484          for (LoadQueueItem lqi : toRetry) {
485            item2RegionMap.remove(lqi);
486          }
487        }
488        // LQIs that are requeued to be regrouped.
489        queue.addAll(toRetry);
490      } catch (ExecutionException e1) {
491        Throwable t = e1.getCause();
492        if (t instanceof IOException) {
493          // At this point something unrecoverable has happened.
494          // TODO Implement bulk load recovery
495          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
496        }
497        LOG.error("Unexpected execution exception during bulk load", e1);
498        throw new IllegalStateException(t);
499      } catch (InterruptedException e1) {
500        LOG.error("Unexpected interrupted exception during bulk load", e1);
501        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
502      }
503    }
504  }
505
506  private Map<byte[], Collection<LoadQueueItem>>
507    groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
508    Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
509    itemsInRegion.forEach(item -> families2Queue
510      .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
511    return families2Queue;
512  }
513
514  private boolean
515    checkHFilesCountPerRegionPerFamily(final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
516    for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
517      Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
518      for (LoadQueueItem lqi : e.getValue()) {
519        MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
520        count.increment();
521        if (count.intValue() > maxFilesPerRegionPerFamily) {
522          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to family "
523            + Bytes.toStringBinary(lqi.getFamily()) + " of region with start key "
524            + Bytes.toStringBinary(e.getKey()));
525          return false;
526        }
527      }
528    }
529    return true;
530  }
531
532  /**
533   * @param conn         the HBase cluster connection
534   * @param tableName    the table name of the table to load into
535   * @param pool         the ExecutorService
536   * @param queue        the queue for LoadQueueItem
537   * @param startEndKeys start and end keys
538   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
539   */
540  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
541    AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
542    Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
543    // <region start key, LQI> need synchronized only within this scope of this
544    // phase because of the puts that happen in futures.
545    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
546    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
547    Set<String> missingHFiles = new HashSet<>();
548    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
549      new Pair<>(regionGroups, missingHFiles);
550
551    // drain LQIs and figure out bulk load groups
552    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
553    while (!queue.isEmpty()) {
554      final LoadQueueItem item = queue.remove();
555      final Callable<Pair<List<LoadQueueItem>, String>> call =
556        () -> groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
557      splittingFutures.add(pool.submit(call));
558    }
559    // get all the results. All grouping and splitting must finish before
560    // we can attempt the atomic loads.
561    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
562      try {
563        Pair<List<LoadQueueItem>, String> splits = lqis.get();
564        if (splits != null) {
565          if (splits.getFirst() != null) {
566            queue.addAll(splits.getFirst());
567          } else {
568            missingHFiles.add(splits.getSecond());
569          }
570        }
571      } catch (ExecutionException e1) {
572        Throwable t = e1.getCause();
573        if (t instanceof IOException) {
574          LOG.error("IOException during splitting", e1);
575          throw (IOException) t; // would have been thrown if not parallelized,
576        }
577        LOG.error("Unexpected execution exception during splitting", e1);
578        throw new IllegalStateException(t);
579      } catch (InterruptedException e1) {
580        LOG.error("Unexpected interrupted exception during splitting", e1);
581        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
582      }
583    }
584    return pair;
585  }
586
587  // unique file name for the table
588  private String getUniqueName() {
589    return UUID.randomUUID().toString().replaceAll("-", "");
590  }
591
592  private List<LoadQueueItem> splitStoreFile(AsyncTableRegionLocator loc, LoadQueueItem item,
593    TableDescriptor tableDesc, byte[] splitKey) throws IOException {
594    Path hfilePath = item.getFilePath();
595    byte[] family = item.getFamily();
596    Path tmpDir = hfilePath.getParent();
597    if (!tmpDir.getName().equals(TMP_DIR)) {
598      tmpDir = new Path(tmpDir, TMP_DIR);
599    }
600
601    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
602
603    String uniqueName = getUniqueName();
604    ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family);
605
606    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
607    Path topOut = new Path(tmpDir, uniqueName + ".top");
608
609    splitStoreFile(loc, getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
610
611    FileSystem fs = tmpDir.getFileSystem(getConf());
612    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
613    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
614    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
615
616    // Add these back at the *front* of the queue, so there's a lower
617    // chance that the region will just split again before we get there.
618    List<LoadQueueItem> lqis = new ArrayList<>(2);
619    lqis.add(new LoadQueueItem(family, botOut));
620    lqis.add(new LoadQueueItem(family, topOut));
621
622    // If the current item is already the result of previous splits,
623    // we don't need it anymore. Clean up to save space.
624    // It is not part of the original input files.
625    try {
626      if (tmpDir.getName().equals(TMP_DIR)) {
627        fs.delete(hfilePath, false);
628      }
629    } catch (IOException e) {
630      LOG.warn("Unable to delete temporary split file " + hfilePath);
631    }
632    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
633    return lqis;
634  }
635
636  /**
637   * @param startEndKeys the start/end keys of regions belong to this table, the list in ascending
638   *                     order by start key
639   * @param key          the key need to find which region belong to
640   * @return region index
641   */
642  private int getRegionIndex(List<Pair<byte[], byte[]>> startEndKeys, byte[] key) {
643    int idx = Collections.binarySearch(startEndKeys, Pair.newPair(key, HConstants.EMPTY_END_ROW),
644      (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
645    if (idx < 0) {
646      // not on boundary, returns -(insertion index). Calculate region it
647      // would be in.
648      idx = -(idx + 1) - 1;
649    }
650    return idx;
651  }
652
653  /**
654   * we can consider there is a region hole or overlap in following conditions. 1) if idx < 0,then
655   * first region info is lost. 2) if the endkey of a region is not equal to the startkey of the
656   * next region. 3) if the endkey of the last region is not empty.
657   */
658  private void checkRegionIndexValid(int idx, List<Pair<byte[], byte[]>> startEndKeys,
659    TableName tableName) throws IOException {
660    if (idx < 0) {
661      throw new IOException("The first region info for table " + tableName
662        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
663    } else if (
664      (idx == startEndKeys.size() - 1)
665        && !Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY)
666    ) {
667      throw new IOException("The last region info for table " + tableName
668        + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
669    } else if (
670      idx + 1 < startEndKeys.size() && !(Bytes.compareTo(startEndKeys.get(idx).getSecond(),
671        startEndKeys.get(idx + 1).getFirst()) == 0)
672    ) {
673      throw new IOException("The endkey of one region for table " + tableName
674        + " is not equal to the startkey of the next region in hbase:meta."
675        + "Please use hbck tool to fix it first.");
676    }
677  }
678
679  /**
680   * Attempt to assign the given load queue item into its target region group. If the hfile boundary
681   * no longer fits into a region, physically splits the hfile such that the new bottom half will
682   * fit and returns the list of LQI's corresponding to the resultant hfiles.
683   * <p/>
684   * protected for testing
685   * @throws IOException if an IO failure is encountered
686   */
687  @InterfaceAudience.Private
688  protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
689    TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
690    List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
691    Path hfilePath = item.getFilePath();
692    Optional<byte[]> first, last;
693    try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
694      CacheConfig.DISABLED, true, getConf())) {
695      first = hfr.getFirstRowKey();
696      last = hfr.getLastRowKey();
697    } catch (FileNotFoundException fnfe) {
698      LOG.debug("encountered", fnfe);
699      return new Pair<>(null, hfilePath.getName());
700    }
701
702    LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary)
703      + " last=" + last.map(Bytes::toStringBinary));
704    if (!first.isPresent() || !last.isPresent()) {
705      assert !first.isPresent() && !last.isPresent();
706      // TODO what if this is due to a bad HFile?
707      LOG.info("hfile " + hfilePath + " has no entries, skipping");
708      return null;
709    }
710    if (Bytes.compareTo(first.get(), last.get()) > 0) {
711      throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get())
712        + " > " + Bytes.toStringBinary(last.get()));
713    }
714    int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
715    checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, tableName);
716    boolean lastKeyInRange =
717      Bytes.compareTo(last.get(), startEndKeys.get(firstKeyRegionIdx).getSecond()) < 0 || Bytes
718        .equals(startEndKeys.get(firstKeyRegionIdx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
719    if (!lastKeyInRange) {
720      if (failIfNeedSplitHFile) {
721        throw new IOException(
722          "The key range of hfile=" + hfilePath + " fits into no region. " + "And because "
723            + FAIL_IF_NEED_SPLIT_HFILE + " was set to true, we just skip the next steps.");
724      }
725      int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
726      int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) / 2;
727      // make sure the splitPoint is valid in case region overlap occur, maybe the splitPoint bigger
728      // than hfile.endkey w/o this check
729      if (splitIdx != firstKeyRegionIdx) {
730        checkRegionIndexValid(splitIdx, startEndKeys, tableName);
731      }
732      byte[] splitPoint = startEndKeys.get(splitIdx).getSecond();
733      List<LoadQueueItem> lqis = splitStoreFile(conn.getRegionLocator(tableName), item,
734        FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), splitPoint);
735
736      return new Pair<>(lqis, null);
737    }
738
739    // group regions.
740    regionGroups.put(ByteBuffer.wrap(startEndKeys.get(firstKeyRegionIdx).getFirst()), item);
741    return null;
742  }
743
744  /**
745   * Split a storefile into a top and bottom half with favored nodes, maintaining the metadata,
746   * recreating bloom filters, etc.
747   */
748  @InterfaceAudience.Private
749  static void splitStoreFile(AsyncTableRegionLocator loc, Configuration conf, Path inFile,
750    ColumnFamilyDescriptor familyDesc, byte[] splitKey, Path bottomOut, Path topOut)
751    throws IOException {
752    // Open reader with no block cache, and not in-memory
753    Reference topReference = Reference.createTopReference(splitKey);
754    Reference bottomReference = Reference.createBottomReference(splitKey);
755
756    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc, loc);
757    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc, loc);
758  }
759
760  /**
761   * Copy half of an HFile into a new HFile with favored nodes.
762   */
763  private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
764    Reference reference, ColumnFamilyDescriptor familyDescriptor, AsyncTableRegionLocator loc)
765    throws IOException {
766    FileSystem fs = inFile.getFileSystem(conf);
767    CacheConfig cacheConf = CacheConfig.DISABLED;
768    HalfStoreFileReader halfReader = null;
769    StoreFileWriter halfWriter = null;
770    try {
771      ReaderContext context = new ReaderContextBuilder().withFileSystemAndPath(fs, inFile).build();
772      StoreFileInfo storeFileInfo =
773        new StoreFileInfo(conf, fs, fs.getFileStatus(inFile), reference);
774      storeFileInfo.initHFileInfo(context);
775      halfReader = (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
776      storeFileInfo.getHFileInfo().initMetaAndIndex(halfReader.getHFileReader());
777      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
778
779      int blocksize = familyDescriptor.getBlocksize();
780      Algorithm compression = familyDescriptor.getCompressionType();
781      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
782      HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
783        .withChecksumType(StoreUtils.getChecksumType(conf))
784        .withBytesPerCheckSum(StoreUtils.getBytesPerChecksum(conf)).withBlockSize(blocksize)
785        .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
786        .withCreateTime(EnvironmentEdgeManager.currentTime()).build();
787
788      HFileScanner scanner = halfReader.getScanner(false, false, false);
789      scanner.seekTo();
790      do {
791        final Cell cell = scanner.getCell();
792        if (null != halfWriter) {
793          halfWriter.append(cell);
794        } else {
795
796          // init halfwriter
797          if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
798            byte[] rowKey = CellUtil.cloneRow(cell);
799            HRegionLocation hRegionLocation = FutureUtils.get(loc.getRegionLocation(rowKey));
800            InetSocketAddress[] favoredNodes = null;
801            if (null == hRegionLocation) {
802              LOG.warn(
803                "Failed get region location for  rowkey {} , Using writer without favoured nodes.",
804                Bytes.toString(rowKey));
805              halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
806                .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
807            } else {
808              LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
809              InetSocketAddress initialIsa =
810                new InetSocketAddress(hRegionLocation.getHostname(), hRegionLocation.getPort());
811              if (initialIsa.isUnresolved()) {
812                LOG.warn("Failed get location for region {} , Using writer without favoured nodes.",
813                  hRegionLocation);
814                halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
815                  .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
816              } else {
817                LOG.debug("Use favored nodes writer: {}", initialIsa.getHostString());
818                favoredNodes = new InetSocketAddress[] { initialIsa };
819                halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
820                  .withBloomType(bloomFilterType).withFileContext(hFileContext)
821                  .withFavoredNodes(favoredNodes).build();
822              }
823            }
824          } else {
825            halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
826              .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
827          }
828          halfWriter.append(cell);
829        }
830
831      } while (scanner.next());
832
833      for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
834        if (shouldCopyHFileMetaKey(entry.getKey())) {
835          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
836        }
837      }
838    } finally {
839      if (halfReader != null) {
840        try {
841          halfReader.close(cacheConf.shouldEvictOnClose());
842        } catch (IOException e) {
843          LOG.warn("failed to close hfile reader for " + inFile, e);
844        }
845      }
846      if (halfWriter != null) {
847        halfWriter.close();
848      }
849    }
850  }
851
852  /**
853   * Infers region boundaries for a new table.
854   * <p/>
855   * Parameter: <br/>
856   * bdryMap is a map between keys to an integer belonging to {+1, -1}
857   * <ul>
858   * <li>If a key is a start key of a file, then it maps to +1</li>
859   * <li>If a key is an end key of a file, then it maps to -1</li>
860   * </ul>
861   * <p>
862   * Algo:<br/>
863   * <ol>
864   * <li>Poll on the keys in order:
865   * <ol type="a">
866   * <li>Keep adding the mapped values to these keys (runningSum)</li>
867   * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
868   * boundary list.</li>
869   * </ol>
870   * </li>
871   * <li>Return the boundary list.</li>
872   * </ol>
873   */
874  public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
875    List<byte[]> keysArray = new ArrayList<>();
876    int runningValue = 0;
877    byte[] currStartKey = null;
878    boolean firstBoundary = true;
879
880    for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
881      if (runningValue == 0) {
882        currStartKey = item.getKey();
883      }
884      runningValue += item.getValue();
885      if (runningValue == 0) {
886        if (!firstBoundary) {
887          keysArray.add(currStartKey);
888        }
889        firstBoundary = false;
890      }
891    }
892
893    return keysArray.toArray(new byte[0][]);
894  }
895
896  /**
897   * If the table is created for the first time, then "completebulkload" reads the files twice. More
898   * modifications necessary if we want to avoid doing it.
899   */
900  private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException {
901    final FileSystem fs = hfofDir.getFileSystem(getConf());
902
903    // Add column families
904    // Build a set of keys
905    List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
906    SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
907    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
908      @Override
909      public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
910        ColumnFamilyDescriptorBuilder builder =
911          ColumnFamilyDescriptorBuilder.newBuilder(familyName);
912        familyBuilders.add(builder);
913        return builder;
914      }
915
916      @Override
917      public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
918        throws IOException {
919        Path hfile = hfileStatus.getPath();
920        try (HFile.Reader reader =
921          HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
922          if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
923            builder.setCompressionType(reader.getFileContext().getCompression());
924            LOG.info("Setting compression " + reader.getFileContext().getCompression().name()
925              + " for family " + builder.getNameAsString());
926          }
927          byte[] first = reader.getFirstRowKey().get();
928          byte[] last = reader.getLastRowKey().get();
929
930          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first="
931            + Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
932
933          // To eventually infer start key-end key boundaries
934          Integer value = map.getOrDefault(first, 0);
935          map.put(first, value + 1);
936
937          value = map.containsKey(last) ? map.get(last) : 0;
938          map.put(last, value - 1);
939        }
940      }
941    }, true);
942
943    byte[][] keys = inferBoundaries(map);
944    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
945    familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
946      .forEachOrdered(tdBuilder::setColumnFamily);
947    FutureUtils.get(admin.createTable(tdBuilder.build(), keys));
948
949    LOG.info("Table " + tableName + " is available!!");
950  }
951
952  private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn,
953    TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile)
954    throws IOException {
955    int count = 0;
956
957    fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
958    bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName));
959    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
960
961    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
962    // Assumes that region splits can happen while this occurs.
963    while (!queue.isEmpty()) {
964      // need to reload split keys each iteration.
965      final List<Pair<byte[], byte[]>> startEndKeys =
966        FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys());
967      if (count != 0) {
968        LOG.info("Split occurred while grouping HFiles, retry attempt " + count + " with "
969          + queue.size() + " files remaining to group or split");
970      }
971
972      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
973      maxRetries = Math.max(maxRetries, startEndKeys.size() + 1);
974      if (maxRetries != 0 && count >= maxRetries) {
975        throw new IOException(
976          "Retry attempted " + count + " times without completing, bailing out");
977      }
978      count++;
979
980      // Using ByteBuffer for byte[] equality semantics
981      pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys);
982      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
983
984      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
985        // Error is logged inside checkHFilesCountPerRegionPerFamily.
986        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
987          + " hfiles to one family of one region");
988      }
989
990      bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap);
991
992      // NOTE: The next iteration's split / group could happen in parallel to
993      // atomic bulkloads assuming that there are splits and no merges, and
994      // that we can atomically pull out the groups we want to retry.
995    }
996
997    return item2RegionMap;
998  }
999
1000  private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue,
1001    ExecutorService pool) throws IOException {
1002    fsDelegationToken.releaseDelegationToken();
1003    if (bulkToken != null) {
1004      conn.cleanupBulkLoad(tableName, bulkToken);
1005    }
1006    if (pool != null) {
1007      pool.shutdown();
1008    }
1009    if (!queue.isEmpty()) {
1010      StringBuilder err = new StringBuilder();
1011      err.append("-------------------------------------------------\n");
1012      err.append("Bulk load aborted with some files not yet loaded:\n");
1013      err.append("-------------------------------------------------\n");
1014      for (LoadQueueItem q : queue) {
1015        err.append("  ").append(q.getFilePath()).append('\n');
1016      }
1017      LOG.error(err.toString());
1018    }
1019  }
1020
1021  /**
1022   * Perform a bulk load of the given map of families to hfiles into the given pre-existing table.
1023   * This method is not threadsafe.
1024   * @param map       map of family to List of hfiles
1025   * @param tableName table to load the hfiles
1026   * @param silence   true to ignore unmatched column families
1027   * @param copyFile  always copy hfiles if true
1028   */
1029  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
1030    TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile)
1031    throws IOException {
1032    tableExists(conn, tableName);
1033    // LQI queue does not need to be threadsafe -- all operations on this queue
1034    // happen in this thread
1035    Deque<LoadQueueItem> queue = new ArrayDeque<>();
1036    ExecutorService pool = null;
1037    try {
1038      prepareHFileQueue(conn, tableName, map, queue, silence);
1039      if (queue.isEmpty()) {
1040        LOG.warn("Bulk load operation did not get any files to load");
1041        return Collections.emptyMap();
1042      }
1043      pool = createExecutorService();
1044      return performBulkLoad(conn, tableName, queue, pool, copyFile);
1045    } finally {
1046      cleanup(conn, tableName, queue, pool);
1047    }
1048  }
1049
1050  /**
1051   * Perform a bulk load of the given directory into the given pre-existing table. This method is
1052   * not threadsafe.
1053   * @param tableName table to load the hfiles
1054   * @param hfofDir   the directory that was provided as the output path of a job using
1055   *                  HFileOutputFormat
1056   * @param silence   true to ignore unmatched column families
1057   * @param copyFile  always copy hfiles if true
1058   */
1059  private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
1060    TableName tableName, Path hfofDir, boolean silence, boolean copyFile) throws IOException {
1061    tableExists(conn, tableName);
1062
1063    /*
1064     * Checking hfile format is a time-consuming operation, we should have an option to skip this
1065     * step when bulkloading millions of HFiles. See HBASE-13985.
1066     */
1067    boolean validateHFile = getConf().getBoolean(VALIDATE_HFILES, true);
1068    if (!validateHFile) {
1069      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files "
1070        + "are not correct. If you fail to read data from your table after using this "
1071        + "option, consider removing the files and bulkload again without this option. "
1072        + "See HBASE-13985");
1073    }
1074    // LQI queue does not need to be threadsafe -- all operations on this queue
1075    // happen in this thread
1076    Deque<LoadQueueItem> queue = new ArrayDeque<>();
1077    ExecutorService pool = null;
1078    try {
1079      prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence);
1080
1081      if (queue.isEmpty()) {
1082        LOG.warn(
1083          "Bulk load operation did not find any files to load in directory {}. "
1084            + "Does it contain files in subdirectories that correspond to column family names?",
1085          (hfofDir != null ? hfofDir.toUri().toString() : ""));
1086        return Collections.emptyMap();
1087      }
1088      pool = createExecutorService();
1089      return performBulkLoad(conn, tableName, queue, pool, copyFile);
1090    } finally {
1091      cleanup(conn, tableName, queue, pool);
1092    }
1093  }
1094
1095  @Override
1096  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
1097    Map<byte[], List<Path>> family2Files) throws IOException {
1098    try (AsyncClusterConnection conn = ClusterConnectionFactory
1099      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1100      return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles());
1101    }
1102  }
1103
1104  @Override
1105  public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir) throws IOException {
1106    try (AsyncClusterConnection conn = ClusterConnectionFactory
1107      .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
1108      AsyncAdmin admin = conn.getAdmin();
1109      if (!FutureUtils.get(admin.tableExists(tableName))) {
1110        if (isCreateTable()) {
1111          createTable(tableName, dir, admin);
1112        } else {
1113          throwAndLogTableNotFoundException(tableName);
1114        }
1115      }
1116      return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles());
1117    }
1118  }
1119
1120  /**
1121   * @throws TableNotFoundException if table does not exist.
1122   */
1123  private void tableExists(AsyncClusterConnection conn, TableName tableName) throws IOException {
1124    if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
1125      throwAndLogTableNotFoundException(tableName);
1126    }
1127  }
1128
1129  private void throwAndLogTableNotFoundException(TableName tn) throws TableNotFoundException {
1130    String errorMsg = format("Table '%s' does not exist.", tn);
1131    LOG.error(errorMsg);
1132    throw new TableNotFoundException(errorMsg);
1133  }
1134
1135  public void setBulkToken(String bulkToken) {
1136    this.bulkToken = bulkToken;
1137  }
1138
1139  public void setClusterIds(List<String> clusterIds) {
1140    this.clusterIds = clusterIds;
1141  }
1142
1143  private void usage() {
1144    System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
1145      + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
1146      + "Loads directory of hfiles -- a region dir or product of HFileOutputFormat -- "
1147      + "into an hbase table.\n" + "OPTIONS (for other -D options, see source code):\n" + " -D"
1148      + CREATE_TABLE_CONF_KEY + "=no whether to create table; when 'no', target "
1149      + "table must exist.\n" + " -D" + IGNORE_UNMATCHED_CF_CONF_KEY
1150      + "=yes to ignore unmatched column families.\n"
1151      + " -loadTable for when directory of files to load has a depth of 3; target table must "
1152      + "exist;\n" + " must be last of the options on command line.\n"
1153      + "See http://hbase.apache.org/book.html#arch.bulk.load.complete.strays for "
1154      + "documentation.\n");
1155  }
1156
1157  @Override
1158  public int run(String[] args) throws Exception {
1159    if (args.length != 2 && args.length != 3) {
1160      usage();
1161      return -1;
1162    }
1163    // Re-initialize to apply -D options from the command line parameters
1164    initialize();
1165    Path dirPath = new Path(args[0]);
1166    TableName tableName = TableName.valueOf(args[1]);
1167    if (args.length == 2) {
1168      return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1;
1169    } else {
1170      Map<byte[], List<Path>> family2Files = Maps.newHashMap();
1171      FileSystem fs = FileSystem.get(getConf());
1172      for (FileStatus regionDir : fs.listStatus(dirPath)) {
1173        FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
1174          Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
1175          byte[] familyName = Bytes.toBytes(family);
1176          if (family2Files.containsKey(familyName)) {
1177            family2Files.get(familyName).add(path);
1178          } else {
1179            family2Files.put(familyName, Lists.newArrayList(path));
1180          }
1181        });
1182      }
1183      return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1;
1184    }
1185  }
1186
1187  public static void main(String[] args) throws Exception {
1188    Configuration conf = HBaseConfiguration.create();
1189    int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
1190    System.exit(ret);
1191  }
1192
1193  @Override
1194  public void disableReplication() {
1195    this.replicate = false;
1196  }
1197
1198  @Override
1199  public boolean isReplicationDisabled() {
1200    return !this.replicate;
1201  }
1202}