001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.math.BigInteger;
025import java.security.SecureRandom;
026import java.util.ArrayList;
027import java.util.Deque;
028import java.util.HashMap;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.Map;
032import java.util.Map.Entry;
033import java.util.concurrent.Callable;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Future;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.FileUtil;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.permission.FsPermission;
043import org.apache.hadoop.hbase.HConstants;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.AsyncClusterConnection;
046import org.apache.hadoop.hbase.security.User;
047import org.apache.hadoop.hbase.security.UserProvider;
048import org.apache.hadoop.hbase.security.token.FsDelegationToken;
049import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
050import org.apache.hadoop.hbase.tool.BulkLoadHFiles.LoadQueueItem;
051import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061
062/**
063 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
064 * staging directory and then it will use ({@link BulkLoadHFiles} to prepare a collection of
065 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
066 * Call {@link #close()} when done.
067 */
068@InterfaceAudience.Private
069public class HFileReplicator implements Closeable {
070  /** Maximum number of threads to allow in pool to copy hfiles during replication */
071  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
072      "hbase.replication.bulkload.copy.maxthreads";
073  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
074  /** Number of hfiles to copy per thread during replication */
075  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
076      "hbase.replication.bulkload.copy.hfiles.perthread";
077  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
078
079  private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class);
080  private static final String UNDERSCORE = "_";
081  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
082
083  private Configuration sourceClusterConf;
084  private String sourceBaseNamespaceDirPath;
085  private String sourceHFileArchiveDirPath;
086  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
087  private FileSystem sinkFs;
088  private FsDelegationToken fsDelegationToken;
089  private UserProvider userProvider;
090  private Configuration conf;
091  private AsyncClusterConnection connection;
092  private Path hbaseStagingDir;
093  private ThreadPoolExecutor exec;
094  private int maxCopyThreads;
095  private int copiesPerThread;
096  private List<String> sourceClusterIds;
097
098  public HFileReplicator(Configuration sourceClusterConf,
099      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
100      Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
101      AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
102    this.sourceClusterConf = sourceClusterConf;
103    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
104    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
105    this.bulkLoadHFileMap = tableQueueMap;
106    this.conf = conf;
107    this.connection = connection;
108    this.sourceClusterIds = sourceClusterIds;
109
110    userProvider = UserProvider.instantiate(conf);
111    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
112    this.hbaseStagingDir =
113      new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
114    this.maxCopyThreads =
115        this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
116          REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
117    this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
118        new ThreadFactoryBuilder().setDaemon(true)
119            .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).
120          build());
121    this.copiesPerThread =
122        conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
123          REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
124
125    sinkFs = FileSystem.get(conf);
126  }
127
128  @Override
129  public void close() throws IOException {
130    if (this.exec != null) {
131      this.exec.shutdown();
132    }
133  }
134
135  public Void replicate() throws IOException {
136    // Copy all the hfiles to the local file system
137    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
138
139    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
140
141    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
142      String tableNameString = tableStagingDir.getKey();
143      Path stagingDir = tableStagingDir.getValue();
144      TableName tableName = TableName.valueOf(tableNameString);
145
146      // Prepare collection of queue of hfiles to be loaded(replicated)
147      Deque<LoadQueueItem> queue = new LinkedList<>();
148      BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false,
149        false);
150
151      if (queue.isEmpty()) {
152        LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri());
153        return null;
154      }
155      fsDelegationToken.acquireDelegationToken(sinkFs);
156      try {
157        doBulkLoad(conf, tableName, stagingDir, queue, maxRetries);
158      } finally {
159        cleanup(stagingDir);
160      }
161    }
162    return null;
163  }
164
165  private void doBulkLoad(Configuration conf, TableName tableName, Path stagingDir,
166      Deque<LoadQueueItem> queue, int maxRetries) throws IOException {
167    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
168    // Set the staging directory which will be used by BulkLoadHFilesTool for loading the data
169    loader.setBulkToken(stagingDir.toString());
170    //updating list of cluster ids where this bulkload event has already been processed
171    loader.setClusterIds(sourceClusterIds);
172    for (int count = 0; !queue.isEmpty(); count++) {
173      if (count != 0) {
174        LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size());
175      }
176
177      if (maxRetries != 0 && count >= maxRetries) {
178        throw new IOException("Retry attempted " + count + " times without completing, bailing.");
179      }
180
181      // Try bulk load
182      loader.loadHFileQueue(connection, tableName, queue, false);
183    }
184  }
185
186  private void cleanup(Path stagingDir) {
187    // Release the file system delegation token
188    fsDelegationToken.releaseDelegationToken();
189    // Delete the staging directory
190    if (stagingDir != null) {
191      try {
192        sinkFs.delete(stagingDir, true);
193      } catch (IOException e) {
194        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
195      }
196    }
197    // Do not close the file system
198  }
199
200  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
201    Map<String, Path> mapOfCopiedHFiles = new HashMap<>();
202    Pair<byte[], List<String>> familyHFilePathsPair;
203    List<String> hfilePaths;
204    byte[] family;
205    Path familyStagingDir;
206    int familyHFilePathsPairsListSize;
207    int totalNoOfHFiles;
208    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
209    FileSystem sourceFs = null;
210
211    try {
212      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
213      /*
214       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
215       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
216       * disable the loading of FS from cache, so that a new FS is created with source cluster
217       * configuration.
218       */
219      String sourceScheme = sourceClusterPath.toUri().getScheme();
220      String disableCacheName =
221          String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
222      sourceClusterConf.setBoolean(disableCacheName, true);
223
224      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
225
226      User user = userProvider.getCurrent();
227      // For each table name in the map
228      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
229          .entrySet()) {
230        String tableName = tableEntry.getKey();
231
232        // Create staging directory for each table
233        Path stagingDir =
234            createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
235
236        familyHFilePathsPairsList = tableEntry.getValue();
237        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
238
239        // For each list of family hfile paths pair in the table
240        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
241          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
242
243          family = familyHFilePathsPair.getFirst();
244          hfilePaths = familyHFilePathsPair.getSecond();
245
246          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
247          totalNoOfHFiles = hfilePaths.size();
248
249          // For each list of hfile paths for the family
250          List<Future<Void>> futures = new ArrayList<>();
251          Callable<Void> c;
252          Future<Void> future;
253          int currentCopied = 0;
254          // Copy the hfiles parallely
255          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
256            c =
257                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
258                  currentCopied + this.copiesPerThread));
259            future = exec.submit(c);
260            futures.add(future);
261            currentCopied += this.copiesPerThread;
262          }
263
264          int remaining = totalNoOfHFiles - currentCopied;
265          if (remaining > 0) {
266            c =
267                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
268                  currentCopied + remaining));
269            future = exec.submit(c);
270            futures.add(future);
271          }
272
273          for (Future<Void> f : futures) {
274            try {
275              f.get();
276            } catch (InterruptedException e) {
277              InterruptedIOException iioe =
278                  new InterruptedIOException(
279                      "Failed to copy HFiles to local file system. This will be retried again "
280                          + "by the source cluster.");
281              iioe.initCause(e);
282              throw iioe;
283            } catch (ExecutionException e) {
284              throw new IOException("Failed to copy HFiles to local file system. This will "
285                  + "be retried again by the source cluster.", e);
286            }
287          }
288        }
289        // Add the staging directory to this table. Staging directory contains all the hfiles
290        // belonging to this table
291        mapOfCopiedHFiles.put(tableName, stagingDir);
292      }
293      return mapOfCopiedHFiles;
294    } finally {
295      if (sourceFs != null) {
296        sourceFs.close();
297      }
298      if(exec != null) {
299        exec.shutdown();
300      }
301    }
302  }
303
304  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
305    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
306    int RANDOM_WIDTH = 320;
307    int RANDOM_RADIX = 32;
308    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
309    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
310        + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
311    return createStagingDir(baseDir, user, randomDir);
312  }
313
314  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
315    Path p = new Path(baseDir, randomDir);
316    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
317    sinkFs.setPermission(p, PERM_ALL_ACCESS);
318    return p;
319  }
320
321  /**
322   * This class will copy the given hfiles from the given source file system to the given local file
323   * system staging directory.
324   */
325  private class Copier implements Callable<Void> {
326    private FileSystem sourceFs;
327    private Path stagingDir;
328    private List<String> hfiles;
329
330    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
331        throws IOException {
332      this.sourceFs = sourceFs;
333      this.stagingDir = stagingDir;
334      this.hfiles = hfiles;
335    }
336
337    @Override
338    public Void call() throws IOException {
339      Path sourceHFilePath;
340      Path localHFilePath;
341      int totalHFiles = hfiles.size();
342      for (int i = 0; i < totalHFiles; i++) {
343        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
344        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
345        try {
346          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
347          // If any other exception other than FNFE then we will fail the replication requests and
348          // source will retry to replicate these data.
349        } catch (FileNotFoundException e) {
350          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
351              + ". Trying to copy from hfile archive directory.",
352            e);
353          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
354
355          try {
356            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
357          } catch (FileNotFoundException e1) {
358            // This will mean that the hfile does not exists any where in source cluster FS. So we
359            // cannot do anything here just log and continue.
360            LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
361                + ". Hence ignoring this hfile from replication..",
362              e1);
363            continue;
364          }
365        }
366        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
367      }
368      return null;
369    }
370  }
371}