001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.Closeable;
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.math.BigInteger;
025import java.util.ArrayList;
026import java.util.Deque;
027import java.util.HashMap;
028import java.util.LinkedList;
029import java.util.List;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.concurrent.Callable;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Future;
035import java.util.concurrent.ThreadLocalRandom;
036import java.util.concurrent.ThreadPoolExecutor;
037import java.util.concurrent.TimeUnit;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileSystem;
040import org.apache.hadoop.fs.FileUtil;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.permission.FsPermission;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.RegionLocator;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.security.User;
050import org.apache.hadoop.hbase.security.UserProvider;
051import org.apache.hadoop.hbase.security.token.FsDelegationToken;
052import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
053import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.CommonFSUtils;
056import org.apache.hadoop.hbase.util.Pair;
057import org.apache.hadoop.hbase.util.Threads;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
063
064/**
065 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
066 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
067 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
068 * Call {@link #close()} when done.
069 */
070@InterfaceAudience.Private
071public class HFileReplicator implements Closeable {
072  /** Maximum number of threads to allow in pool to copy hfiles during replication */
073  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
074    "hbase.replication.bulkload.copy.maxthreads";
075  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
076  /** Number of hfiles to copy per thread during replication */
077  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
078    "hbase.replication.bulkload.copy.hfiles.perthread";
079  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
080
081  private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class);
082  private static final String UNDERSCORE = "_";
083  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
084
085  private Configuration sourceClusterConf;
086  private String sourceBaseNamespaceDirPath;
087  private String sourceHFileArchiveDirPath;
088  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
089  private FileSystem sinkFs;
090  private FsDelegationToken fsDelegationToken;
091  private UserProvider userProvider;
092  private Configuration conf;
093  private Connection connection;
094  private Path hbaseStagingDir;
095  private ThreadPoolExecutor exec;
096  private int maxCopyThreads;
097  private int copiesPerThread;
098  private List<String> sourceClusterIds;
099
100  public HFileReplicator(Configuration sourceClusterConf, String sourceBaseNamespaceDirPath,
101    String sourceHFileArchiveDirPath, Map<String, List<Pair<byte[], List<String>>>> tableQueueMap,
102    Configuration conf, Connection connection, List<String> sourceClusterIds) throws IOException {
103    this.sourceClusterConf = sourceClusterConf;
104    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
105    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
106    this.bulkLoadHFileMap = tableQueueMap;
107    this.conf = conf;
108    this.connection = connection;
109    this.sourceClusterIds = sourceClusterIds;
110
111    userProvider = UserProvider.instantiate(conf);
112    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
113    this.hbaseStagingDir =
114      new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME);
115    this.maxCopyThreads = this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
116      REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
117    this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
118      new ThreadFactoryBuilder().setDaemon(true)
119        .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath).build());
120    this.copiesPerThread = conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
121      REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
122
123    sinkFs = FileSystem.get(conf);
124  }
125
126  @Override
127  public void close() throws IOException {
128    if (this.exec != null) {
129      this.exec.shutdown();
130    }
131  }
132
133  public Void replicate() throws IOException {
134    // Copy all the hfiles to the local file system
135    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
136
137    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
138
139    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
140      String tableNameString = tableStagingDir.getKey();
141      Path stagingDir = tableStagingDir.getValue();
142
143      LoadIncrementalHFiles loadHFiles = null;
144      try {
145        loadHFiles = new LoadIncrementalHFiles(conf);
146        loadHFiles.setClusterIds(sourceClusterIds);
147      } catch (Exception e) {
148        LOG.error("Failed initialize LoadIncrementalHFiles for replicating bulk loaded data.", e);
149        throw new IOException(e);
150      }
151      Configuration newConf = HBaseConfiguration.create(conf);
152      newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
153      loadHFiles.setConf(newConf);
154
155      TableName tableName = TableName.valueOf(tableNameString);
156      Table table = this.connection.getTable(tableName);
157
158      // Prepare collection of queue of hfiles to be loaded(replicated)
159      Deque<LoadQueueItem> queue = new LinkedList<>();
160      loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
161
162      if (queue.isEmpty()) {
163        LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri());
164        return null;
165      }
166
167      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
168        fsDelegationToken.acquireDelegationToken(sinkFs);
169        // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
170        // data
171        loadHFiles.setBulkToken(stagingDir.toString());
172        doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
173      } finally {
174        cleanup(stagingDir.toString(), table);
175      }
176    }
177    return null;
178  }
179
180  private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, Deque<LoadQueueItem> queue,
181    RegionLocator locator, int maxRetries) throws IOException {
182    int count = 0;
183    Pair<byte[][], byte[][]> startEndKeys;
184    while (!queue.isEmpty()) {
185      // need to reload split keys each iteration.
186      startEndKeys = locator.getStartEndKeys();
187      if (count != 0) {
188        LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size());
189      }
190
191      if (maxRetries != 0 && count >= maxRetries) {
192        throw new IOException("Retry attempted " + count + " times without completing, bailing.");
193      }
194      count++;
195
196      // Try bulk load
197      loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
198    }
199  }
200
201  private void cleanup(String stagingDir, Table table) {
202    // Release the file system delegation token
203    fsDelegationToken.releaseDelegationToken();
204    // Delete the staging directory
205    if (stagingDir != null) {
206      try {
207        sinkFs.delete(new Path(stagingDir), true);
208      } catch (IOException e) {
209        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
210      }
211    }
212    // Do not close the file system
213
214    /*
215     * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
216     * "Failed to close the file system"); } }
217     */
218
219    // Close the table
220    if (table != null) {
221      try {
222        table.close();
223      } catch (IOException e) {
224        LOG.warn("Failed to close the table.", e);
225      }
226    }
227  }
228
229  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
230    Map<String, Path> mapOfCopiedHFiles = new HashMap<>();
231    Pair<byte[], List<String>> familyHFilePathsPair;
232    List<String> hfilePaths;
233    byte[] family;
234    Path familyStagingDir;
235    int familyHFilePathsPairsListSize;
236    int totalNoOfHFiles;
237    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
238    FileSystem sourceFs = null;
239
240    try {
241      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
242      /*
243       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
244       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
245       * disable the loading of FS from cache, so that a new FS is created with source cluster
246       * configuration.
247       */
248      String sourceScheme = sourceClusterPath.toUri().getScheme();
249      String disableCacheName =
250        String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
251      sourceClusterConf.setBoolean(disableCacheName, true);
252
253      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
254
255      User user = userProvider.getCurrent();
256      // For each table name in the map
257      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
258        .entrySet()) {
259        String tableName = tableEntry.getKey();
260
261        // Create staging directory for each table
262        Path stagingDir = createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName));
263
264        familyHFilePathsPairsList = tableEntry.getValue();
265        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
266
267        // For each list of family hfile paths pair in the table
268        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
269          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
270
271          family = familyHFilePathsPair.getFirst();
272          hfilePaths = familyHFilePathsPair.getSecond();
273
274          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
275          totalNoOfHFiles = hfilePaths.size();
276
277          // For each list of hfile paths for the family
278          List<Future<Void>> futures = new ArrayList<>();
279          Callable<Void> c;
280          Future<Void> future;
281          int currentCopied = 0;
282          // Copy the hfiles parallely
283          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
284            c = new Copier(sourceFs, familyStagingDir,
285              hfilePaths.subList(currentCopied, currentCopied + this.copiesPerThread));
286            future = exec.submit(c);
287            futures.add(future);
288            currentCopied += this.copiesPerThread;
289          }
290
291          int remaining = totalNoOfHFiles - currentCopied;
292          if (remaining > 0) {
293            c = new Copier(sourceFs, familyStagingDir,
294              hfilePaths.subList(currentCopied, currentCopied + remaining));
295            future = exec.submit(c);
296            futures.add(future);
297          }
298
299          for (Future<Void> f : futures) {
300            try {
301              f.get();
302            } catch (InterruptedException e) {
303              InterruptedIOException iioe = new InterruptedIOException(
304                "Failed to copy HFiles to local file system. This will be retried again "
305                  + "by the source cluster.");
306              iioe.initCause(e);
307              throw iioe;
308            } catch (ExecutionException e) {
309              throw new IOException("Failed to copy HFiles to local file system. This will "
310                + "be retried again by the source cluster.", e);
311            }
312          }
313        }
314        // Add the staging directory to this table. Staging directory contains all the hfiles
315        // belonging to this table
316        mapOfCopiedHFiles.put(tableName, stagingDir);
317      }
318      return mapOfCopiedHFiles;
319    } finally {
320      if (sourceFs != null) {
321        sourceFs.close();
322      }
323      if (exec != null) {
324        exec.shutdown();
325      }
326    }
327  }
328
329  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
330    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
331    int RANDOM_WIDTH = 320;
332    int RANDOM_RADIX = 32;
333    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
334    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
335      + (new BigInteger(RANDOM_WIDTH, ThreadLocalRandom.current()).toString(RANDOM_RADIX));
336    return createStagingDir(baseDir, user, randomDir);
337  }
338
339  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
340    Path p = new Path(baseDir, randomDir);
341    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
342    sinkFs.setPermission(p, PERM_ALL_ACCESS);
343    return p;
344  }
345
346  /**
347   * This class will copy the given hfiles from the given source file system to the given local file
348   * system staging directory.
349   */
350  private class Copier implements Callable<Void> {
351    private FileSystem sourceFs;
352    private Path stagingDir;
353    private List<String> hfiles;
354
355    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
356      throws IOException {
357      this.sourceFs = sourceFs;
358      this.stagingDir = stagingDir;
359      this.hfiles = hfiles;
360    }
361
362    @Override
363    public Void call() throws IOException {
364      Path sourceHFilePath;
365      Path localHFilePath;
366      int totalHFiles = hfiles.size();
367      for (int i = 0; i < totalHFiles; i++) {
368        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
369        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
370        try {
371          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
372          // If any other exception other than FNFE then we will fail the replication requests and
373          // source will retry to replicate these data.
374        } catch (FileNotFoundException e) {
375          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
376            + ". Trying to copy from hfile archive directory.", e);
377          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
378
379          try {
380            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
381          } catch (FileNotFoundException e1) {
382            // This will mean that the hfile does not exists any where in source cluster FS. So we
383            // cannot do anything here just log and continue.
384            LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
385              + ". Hence ignoring this hfile from replication..", e1);
386            continue;
387          }
388        }
389        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
390      }
391      return null;
392    }
393  }
394}