001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
003 * agreements. See the NOTICE file distributed with this work for additional information regarding
004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
005 * "License"); you may not use this file except in compliance with the License. You may obtain a
006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
009 * for the specific language governing permissions and limitations under the License.
010 */
011package org.apache.hadoop.hbase.replication.regionserver;
012
013import java.io.FileNotFoundException;
014import java.io.IOException;
015import java.io.InterruptedIOException;
016import java.math.BigInteger;
017import java.security.SecureRandom;
018import java.util.ArrayList;
019import java.util.Deque;
020import java.util.HashMap;
021import java.util.LinkedList;
022import java.util.List;
023import java.util.Map;
024import java.util.Map.Entry;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.Future;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.FileUtil;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.fs.permission.FsPermission;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Table;
042import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
043import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
044import org.apache.hadoop.hbase.security.User;
045import org.apache.hadoop.hbase.security.UserProvider;
046import org.apache.hadoop.hbase.security.token.FsDelegationToken;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.Pair;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
056
057/**
058 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
059 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
060 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
061 */
062@InterfaceAudience.Private
063public class HFileReplicator {
064  /** Maximum number of threads to allow in pool to copy hfiles during replication */
065  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
066      "hbase.replication.bulkload.copy.maxthreads";
067  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
068  /** Number of hfiles to copy per thread during replication */
069  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
070      "hbase.replication.bulkload.copy.hfiles.perthread";
071  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
072
073  private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class);
074  private static final String UNDERSCORE = "_";
075  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
076
077  private Configuration sourceClusterConf;
078  private String sourceBaseNamespaceDirPath;
079  private String sourceHFileArchiveDirPath;
080  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
081  private FileSystem sinkFs;
082  private FsDelegationToken fsDelegationToken;
083  private UserProvider userProvider;
084  private Configuration conf;
085  private Connection connection;
086  private Path hbaseStagingDir;
087  private ThreadPoolExecutor exec;
088  private int maxCopyThreads;
089  private int copiesPerThread;
090  private List<String> sourceClusterIds;
091
092  public HFileReplicator(Configuration sourceClusterConf,
093      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
094      Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
095      Connection connection, List<String> sourceClusterIds) throws IOException {
096    this.sourceClusterConf = sourceClusterConf;
097    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
098    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
099    this.bulkLoadHFileMap = tableQueueMap;
100    this.conf = conf;
101    this.connection = connection;
102    this.sourceClusterIds = sourceClusterIds;
103
104    userProvider = UserProvider.instantiate(conf);
105    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
106    this.hbaseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
107    this.maxCopyThreads =
108        this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
109          REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
110    this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
111        new ThreadFactoryBuilder().setDaemon(true)
112            .setNameFormat("HFileReplicationCallable-%1$d").build());
113    this.copiesPerThread =
114        conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
115          REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
116
117    sinkFs = FileSystem.get(conf);
118  }
119
120  public Void replicate() throws IOException {
121    // Copy all the hfiles to the local file system
122    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
123
124    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
125
126    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
127      String tableNameString = tableStagingDir.getKey();
128      Path stagingDir = tableStagingDir.getValue();
129
130      LoadIncrementalHFiles loadHFiles = null;
131      try {
132        loadHFiles = new LoadIncrementalHFiles(conf);
133        loadHFiles.setClusterIds(sourceClusterIds);
134      } catch (Exception e) {
135        LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
136            + " data.", e);
137        throw new IOException(e);
138      }
139      Configuration newConf = HBaseConfiguration.create(conf);
140      newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
141      loadHFiles.setConf(newConf);
142
143      TableName tableName = TableName.valueOf(tableNameString);
144      Table table = this.connection.getTable(tableName);
145
146      // Prepare collection of queue of hfiles to be loaded(replicated)
147      Deque<LoadQueueItem> queue = new LinkedList<>();
148      loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
149
150      if (queue.isEmpty()) {
151        LOG.warn("Replication process did not find any files to replicate in directory "
152            + stagingDir.toUri());
153        return null;
154      }
155
156      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
157
158        fsDelegationToken.acquireDelegationToken(sinkFs);
159
160        // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
161        // data
162        loadHFiles.setBulkToken(stagingDir.toString());
163
164        doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
165      } finally {
166        cleanup(stagingDir.toString(), table);
167      }
168    }
169    return null;
170  }
171
172  private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
173      Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
174    int count = 0;
175    Pair<byte[][], byte[][]> startEndKeys;
176    while (!queue.isEmpty()) {
177      // need to reload split keys each iteration.
178      startEndKeys = locator.getStartEndKeys();
179      if (count != 0) {
180        LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with "
181            + queue.size() + " files still remaining to replicate.");
182      }
183
184      if (maxRetries != 0 && count >= maxRetries) {
185        throw new IOException("Retry attempted " + count
186            + " times without completing, bailing out.");
187      }
188      count++;
189
190      // Try bulk load
191      loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
192    }
193  }
194
195  private void cleanup(String stagingDir, Table table) {
196    // Release the file system delegation token
197    fsDelegationToken.releaseDelegationToken();
198    // Delete the staging directory
199    if (stagingDir != null) {
200      try {
201        sinkFs.delete(new Path(stagingDir), true);
202      } catch (IOException e) {
203        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
204      }
205    }
206    // Do not close the file system
207
208    /*
209     * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
210     * "Failed to close the file system"); } }
211     */
212
213    // Close the table
214    if (table != null) {
215      try {
216        table.close();
217      } catch (IOException e) {
218        LOG.warn("Failed to close the table.", e);
219      }
220    }
221  }
222
223  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
224    Map<String, Path> mapOfCopiedHFiles = new HashMap<>();
225    Pair<byte[], List<String>> familyHFilePathsPair;
226    List<String> hfilePaths;
227    byte[] family;
228    Path familyStagingDir;
229    int familyHFilePathsPairsListSize;
230    int totalNoOfHFiles;
231    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
232    FileSystem sourceFs = null;
233
234    try {
235      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
236      /*
237       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
238       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
239       * disable the loading of FS from cache, so that a new FS is created with source cluster
240       * configuration.
241       */
242      String sourceScheme = sourceClusterPath.toUri().getScheme();
243      String disableCacheName =
244          String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
245      sourceClusterConf.setBoolean(disableCacheName, true);
246
247      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
248
249      User user = userProvider.getCurrent();
250      // For each table name in the map
251      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
252          .entrySet()) {
253        String tableName = tableEntry.getKey();
254
255        // Create staging directory for each table
256        Path stagingDir =
257            createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
258
259        familyHFilePathsPairsList = tableEntry.getValue();
260        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
261
262        // For each list of family hfile paths pair in the table
263        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
264          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
265
266          family = familyHFilePathsPair.getFirst();
267          hfilePaths = familyHFilePathsPair.getSecond();
268
269          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
270          totalNoOfHFiles = hfilePaths.size();
271
272          // For each list of hfile paths for the family
273          List<Future<Void>> futures = new ArrayList<>();
274          Callable<Void> c;
275          Future<Void> future;
276          int currentCopied = 0;
277          // Copy the hfiles parallely
278          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
279            c =
280                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
281                  currentCopied + this.copiesPerThread));
282            future = exec.submit(c);
283            futures.add(future);
284            currentCopied += this.copiesPerThread;
285          }
286
287          int remaining = totalNoOfHFiles - currentCopied;
288          if (remaining > 0) {
289            c =
290                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
291                  currentCopied + remaining));
292            future = exec.submit(c);
293            futures.add(future);
294          }
295
296          for (Future<Void> f : futures) {
297            try {
298              f.get();
299            } catch (InterruptedException e) {
300              InterruptedIOException iioe =
301                  new InterruptedIOException(
302                      "Failed to copy HFiles to local file system. This will be retried again "
303                          + "by the source cluster.");
304              iioe.initCause(e);
305              throw iioe;
306            } catch (ExecutionException e) {
307              throw new IOException("Failed to copy HFiles to local file system. This will "
308                  + "be retried again by the source cluster.", e);
309            }
310          }
311        }
312        // Add the staging directory to this table. Staging directory contains all the hfiles
313        // belonging to this table
314        mapOfCopiedHFiles.put(tableName, stagingDir);
315      }
316      return mapOfCopiedHFiles;
317    } finally {
318      if (sourceFs != null) {
319        sourceFs.close();
320      }
321      if(exec != null) {
322        exec.shutdown();
323      }
324    }
325  }
326
327  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
328    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
329    int RANDOM_WIDTH = 320;
330    int RANDOM_RADIX = 32;
331    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
332    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
333        + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
334    return createStagingDir(baseDir, user, randomDir);
335  }
336
337  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
338    Path p = new Path(baseDir, randomDir);
339    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
340    sinkFs.setPermission(p, PERM_ALL_ACCESS);
341    return p;
342  }
343
344  /**
345   * This class will copy the given hfiles from the given source file system to the given local file
346   * system staging directory.
347   */
348  private class Copier implements Callable<Void> {
349    private FileSystem sourceFs;
350    private Path stagingDir;
351    private List<String> hfiles;
352
353    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
354        throws IOException {
355      this.sourceFs = sourceFs;
356      this.stagingDir = stagingDir;
357      this.hfiles = hfiles;
358    }
359
360    @Override
361    public Void call() throws IOException {
362      Path sourceHFilePath;
363      Path localHFilePath;
364      int totalHFiles = hfiles.size();
365      for (int i = 0; i < totalHFiles; i++) {
366        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
367        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
368        try {
369          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
370          // If any other exception other than FNFE then we will fail the replication requests and
371          // source will retry to replicate these data.
372        } catch (FileNotFoundException e) {
373          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
374              + ". Trying to copy from hfile archive directory.",
375            e);
376          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
377
378          try {
379            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
380          } catch (FileNotFoundException e1) {
381            // This will mean that the hfile does not exists any where in source cluster FS. So we
382            // cannot do anything here just log and continue.
383            LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
384                + ". Hence ignoring this hfile from replication..",
385              e1);
386            continue;
387          }
388        }
389        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
390      }
391      return null;
392    }
393  }
394}