001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
003 * agreements. See the NOTICE file distributed with this work for additional information regarding
004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
005 * "License"); you may not use this file except in compliance with the License. You may obtain a
006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
009 * for the specific language governing permissions and limitations under the License.
010 */
011package org.apache.hadoop.hbase.replication.regionserver;
012
013import java.io.Closeable;
014import java.io.FileNotFoundException;
015import java.io.IOException;
016import java.io.InterruptedIOException;
017import java.math.BigInteger;
018import java.security.SecureRandom;
019import java.util.ArrayList;
020import java.util.Deque;
021import java.util.HashMap;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Map;
025import java.util.Map.Entry;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.FileUtil;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.permission.FsPermission;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.security.User;
043import org.apache.hadoop.hbase.security.UserProvider;
044import org.apache.hadoop.hbase.security.token.FsDelegationToken;
045import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
046import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.CommonFSUtils;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056
057/**
058 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
059 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
060 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
061 * Call {@link #close()} when done.
062 */
063@InterfaceAudience.Private
064public class HFileReplicator implements Closeable {
065  /** Maximum number of threads to allow in pool to copy hfiles during replication */
066  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
067      "hbase.replication.bulkload.copy.maxthreads";
068  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
069  /** Number of hfiles to copy per thread during replication */
070  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
071      "hbase.replication.bulkload.copy.hfiles.perthread";
072  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
073
074  private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class);
075  private static final String UNDERSCORE = "_";
076  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
077
078  private Configuration sourceClusterConf;
079  private String sourceBaseNamespaceDirPath;
080  private String sourceHFileArchiveDirPath;
081  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
082  private FileSystem sinkFs;
083  private FsDelegationToken fsDelegationToken;
084  private UserProvider userProvider;
085  private Configuration conf;
086  private Connection connection;
087  private Path hbaseStagingDir;
088  private ThreadPoolExecutor exec;
089  private int maxCopyThreads;
090  private int copiesPerThread;
091  private List<String> sourceClusterIds;
092
093  public HFileReplicator(Configuration sourceClusterConf,
094      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
095      Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
096      Connection connection, List<String> sourceClusterIds) throws IOException {
097    this.sourceClusterConf = sourceClusterConf;
098    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
099    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
100    this.bulkLoadHFileMap = tableQueueMap;
101    this.conf = conf;
102    this.connection = connection;
103    this.sourceClusterIds = sourceClusterIds;
104
105    userProvider = UserProvider.instantiate(conf);
106    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
107    this.hbaseStagingDir =
108      new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
109    this.maxCopyThreads =
110        this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
111          REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
112    this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
113        new ThreadFactoryBuilder().setDaemon(true)
114            .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).
115          build());
116    this.copiesPerThread =
117        conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
118          REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
119
120    sinkFs = FileSystem.get(conf);
121  }
122
123  @Override
124  public void close() throws IOException {
125    if (this.exec != null) {
126      this.exec.shutdown();
127    }
128  }
129
130  public Void replicate() throws IOException {
131    // Copy all the hfiles to the local file system
132    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
133
134    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
135
136    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
137      String tableNameString = tableStagingDir.getKey();
138      Path stagingDir = tableStagingDir.getValue();
139
140      LoadIncrementalHFiles loadHFiles = null;
141      try {
142        loadHFiles = new LoadIncrementalHFiles(conf);
143        loadHFiles.setClusterIds(sourceClusterIds);
144      } catch (Exception e) {
145        LOG.error("Failed initialize LoadIncrementalHFiles for replicating bulk loaded data.", e);
146        throw new IOException(e);
147      }
148      Configuration newConf = HBaseConfiguration.create(conf);
149      newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
150      loadHFiles.setConf(newConf);
151
152      TableName tableName = TableName.valueOf(tableNameString);
153      Table table = this.connection.getTable(tableName);
154
155      // Prepare collection of queue of hfiles to be loaded(replicated)
156      Deque<LoadQueueItem> queue = new LinkedList<>();
157      loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
158
159      if (queue.isEmpty()) {
160        LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri());
161        return null;
162      }
163
164      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
165        fsDelegationToken.acquireDelegationToken(sinkFs);
166        // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
167        // data
168        loadHFiles.setBulkToken(stagingDir.toString());
169        doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
170      } finally {
171        cleanup(stagingDir.toString(), table);
172      }
173    }
174    return null;
175  }
176
177  private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
178      Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
179    int count = 0;
180    Pair<byte[][], byte[][]> startEndKeys;
181    while (!queue.isEmpty()) {
182      // need to reload split keys each iteration.
183      startEndKeys = locator.getStartEndKeys();
184      if (count != 0) {
185        LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size());
186      }
187
188      if (maxRetries != 0 && count >= maxRetries) {
189        throw new IOException("Retry attempted " + count + " times without completing, bailing.");
190      }
191      count++;
192
193      // Try bulk load
194      loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
195    }
196  }
197
198  private void cleanup(String stagingDir, Table table) {
199    // Release the file system delegation token
200    fsDelegationToken.releaseDelegationToken();
201    // Delete the staging directory
202    if (stagingDir != null) {
203      try {
204        sinkFs.delete(new Path(stagingDir), true);
205      } catch (IOException e) {
206        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
207      }
208    }
209    // Do not close the file system
210
211    /*
212     * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
213     * "Failed to close the file system"); } }
214     */
215
216    // Close the table
217    if (table != null) {
218      try {
219        table.close();
220      } catch (IOException e) {
221        LOG.warn("Failed to close the table.", e);
222      }
223    }
224  }
225
226  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
227    Map<String, Path> mapOfCopiedHFiles = new HashMap<>();
228    Pair<byte[], List<String>> familyHFilePathsPair;
229    List<String> hfilePaths;
230    byte[] family;
231    Path familyStagingDir;
232    int familyHFilePathsPairsListSize;
233    int totalNoOfHFiles;
234    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
235    FileSystem sourceFs = null;
236
237    try {
238      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
239      /*
240       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
241       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
242       * disable the loading of FS from cache, so that a new FS is created with source cluster
243       * configuration.
244       */
245      String sourceScheme = sourceClusterPath.toUri().getScheme();
246      String disableCacheName =
247          String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
248      sourceClusterConf.setBoolean(disableCacheName, true);
249
250      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
251
252      User user = userProvider.getCurrent();
253      // For each table name in the map
254      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
255          .entrySet()) {
256        String tableName = tableEntry.getKey();
257
258        // Create staging directory for each table
259        Path stagingDir =
260            createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
261
262        familyHFilePathsPairsList = tableEntry.getValue();
263        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
264
265        // For each list of family hfile paths pair in the table
266        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
267          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
268
269          family = familyHFilePathsPair.getFirst();
270          hfilePaths = familyHFilePathsPair.getSecond();
271
272          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
273          totalNoOfHFiles = hfilePaths.size();
274
275          // For each list of hfile paths for the family
276          List<Future<Void>> futures = new ArrayList<>();
277          Callable<Void> c;
278          Future<Void> future;
279          int currentCopied = 0;
280          // Copy the hfiles parallely
281          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
282            c =
283                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
284                  currentCopied + this.copiesPerThread));
285            future = exec.submit(c);
286            futures.add(future);
287            currentCopied += this.copiesPerThread;
288          }
289
290          int remaining = totalNoOfHFiles - currentCopied;
291          if (remaining > 0) {
292            c =
293                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
294                  currentCopied + remaining));
295            future = exec.submit(c);
296            futures.add(future);
297          }
298
299          for (Future<Void> f : futures) {
300            try {
301              f.get();
302            } catch (InterruptedException e) {
303              InterruptedIOException iioe =
304                  new InterruptedIOException(
305                      "Failed to copy HFiles to local file system. This will be retried again "
306                          + "by the source cluster.");
307              iioe.initCause(e);
308              throw iioe;
309            } catch (ExecutionException e) {
310              throw new IOException("Failed to copy HFiles to local file system. This will "
311                  + "be retried again by the source cluster.", e);
312            }
313          }
314        }
315        // Add the staging directory to this table. Staging directory contains all the hfiles
316        // belonging to this table
317        mapOfCopiedHFiles.put(tableName, stagingDir);
318      }
319      return mapOfCopiedHFiles;
320    } finally {
321      if (sourceFs != null) {
322        sourceFs.close();
323      }
324      if(exec != null) {
325        exec.shutdown();
326      }
327    }
328  }
329
330  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
331    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
332    int RANDOM_WIDTH = 320;
333    int RANDOM_RADIX = 32;
334    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
335    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
336        + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
337    return createStagingDir(baseDir, user, randomDir);
338  }
339
340  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
341    Path p = new Path(baseDir, randomDir);
342    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
343    sinkFs.setPermission(p, PERM_ALL_ACCESS);
344    return p;
345  }
346
347  /**
348   * This class will copy the given hfiles from the given source file system to the given local file
349   * system staging directory.
350   */
351  private class Copier implements Callable<Void> {
352    private FileSystem sourceFs;
353    private Path stagingDir;
354    private List<String> hfiles;
355
356    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
357        throws IOException {
358      this.sourceFs = sourceFs;
359      this.stagingDir = stagingDir;
360      this.hfiles = hfiles;
361    }
362
363    @Override
364    public Void call() throws IOException {
365      Path sourceHFilePath;
366      Path localHFilePath;
367      int totalHFiles = hfiles.size();
368      for (int i = 0; i < totalHFiles; i++) {
369        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
370        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
371        try {
372          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
373          // If any other exception other than FNFE then we will fail the replication requests and
374          // source will retry to replicate these data.
375        } catch (FileNotFoundException e) {
376          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
377              + ". Trying to copy from hfile archive directory.",
378            e);
379          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
380
381          try {
382            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
383          } catch (FileNotFoundException e1) {
384            // This will mean that the hfile does not exists any where in source cluster FS. So we
385            // cannot do anything here just log and continue.
386            LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
387                + ". Hence ignoring this hfile from replication..",
388              e1);
389            continue;
390          }
391        }
392        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
393      }
394      return null;
395    }
396  }
397}