001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.replication.regionserver; 012 013import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 014 015import java.io.FileNotFoundException; 016import java.io.IOException; 017import java.io.InterruptedIOException; 018import java.math.BigInteger; 019import java.security.SecureRandom; 020import java.util.ArrayList; 021import java.util.Deque; 022import java.util.HashMap; 023import java.util.LinkedList; 024import java.util.List; 025import java.util.Map; 026import java.util.Map.Entry; 027import java.util.concurrent.Callable; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Future; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.fs.FileUtil; 037import org.apache.hadoop.fs.Path; 038import org.apache.hadoop.fs.permission.FsPermission; 039import org.apache.hadoop.hbase.HBaseConfiguration; 040import org.apache.hadoop.hbase.HConstants; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hadoop.hbase.client.Connection; 046import org.apache.hadoop.hbase.client.RegionLocator; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 049import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.security.UserProvider; 052import org.apache.hadoop.hbase.security.token.FsDelegationToken; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.FSUtils; 055import org.apache.hadoop.hbase.util.Pair; 056 057/** 058 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local 059 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of 060 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster. 061 */ 062@InterfaceAudience.Private 063public class HFileReplicator { 064 /** Maximum number of threads to allow in pool to copy hfiles during replication */ 065 public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = 066 "hbase.replication.bulkload.copy.maxthreads"; 067 public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10; 068 /** Number of hfiles to copy per thread during replication */ 069 public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = 070 "hbase.replication.bulkload.copy.hfiles.perthread"; 071 public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10; 072 073 private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class); 074 private static final String UNDERSCORE = "_"; 075 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); 076 077 private Configuration sourceClusterConf; 078 private String sourceBaseNamespaceDirPath; 079 private String sourceHFileArchiveDirPath; 080 private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap; 081 private FileSystem sinkFs; 082 private FsDelegationToken fsDelegationToken; 083 private UserProvider userProvider; 084 private Configuration conf; 085 private Connection connection; 086 private Path hbaseStagingDir; 087 private ThreadPoolExecutor exec; 088 private int maxCopyThreads; 089 private int copiesPerThread; 090 091 public HFileReplicator(Configuration sourceClusterConf, 092 String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, 093 Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, 094 Connection connection) throws IOException { 095 this.sourceClusterConf = sourceClusterConf; 096 this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; 097 this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; 098 this.bulkLoadHFileMap = tableQueueMap; 099 this.conf = conf; 100 this.connection = connection; 101 102 userProvider = UserProvider.instantiate(conf); 103 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 104 this.hbaseStagingDir = new Path(FSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME); 105 this.maxCopyThreads = 106 this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 107 REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); 108 ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); 109 builder.setNameFormat("HFileReplicationCallable-%1$d"); 110 this.exec = 111 new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS, 112 new LinkedBlockingQueue<>(), builder.build()); 113 this.exec.allowCoreThreadTimeOut(true); 114 this.copiesPerThread = 115 conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 116 REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); 117 118 sinkFs = FileSystem.get(conf); 119 } 120 121 public Void replicate() throws IOException { 122 // Copy all the hfiles to the local file system 123 Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir(); 124 125 int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 126 127 for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) { 128 String tableNameString = tableStagingDir.getKey(); 129 Path stagingDir = tableStagingDir.getValue(); 130 131 LoadIncrementalHFiles loadHFiles = null; 132 try { 133 loadHFiles = new LoadIncrementalHFiles(conf); 134 } catch (Exception e) { 135 LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded" 136 + " data.", e); 137 throw new IOException(e); 138 } 139 Configuration newConf = HBaseConfiguration.create(conf); 140 newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 141 loadHFiles.setConf(newConf); 142 143 TableName tableName = TableName.valueOf(tableNameString); 144 Table table = this.connection.getTable(tableName); 145 146 // Prepare collection of queue of hfiles to be loaded(replicated) 147 Deque<LoadQueueItem> queue = new LinkedList<>(); 148 loadHFiles.prepareHFileQueue(stagingDir, table, queue, false); 149 150 if (queue.isEmpty()) { 151 LOG.warn("Replication process did not find any files to replicate in directory " 152 + stagingDir.toUri()); 153 return null; 154 } 155 156 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 157 158 fsDelegationToken.acquireDelegationToken(sinkFs); 159 160 // Set the staging directory which will be used by LoadIncrementalHFiles for loading the 161 // data 162 loadHFiles.setBulkToken(stagingDir.toString()); 163 164 doBulkLoad(loadHFiles, table, queue, locator, maxRetries); 165 } finally { 166 cleanup(stagingDir.toString(), table); 167 } 168 } 169 return null; 170 } 171 172 private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, 173 Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException { 174 int count = 0; 175 Pair<byte[][], byte[][]> startEndKeys; 176 while (!queue.isEmpty()) { 177 // need to reload split keys each iteration. 178 startEndKeys = locator.getStartEndKeys(); 179 if (count != 0) { 180 LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with " 181 + queue.size() + " files still remaining to replicate."); 182 } 183 184 if (maxRetries != 0 && count >= maxRetries) { 185 throw new IOException("Retry attempted " + count 186 + " times without completing, bailing out."); 187 } 188 count++; 189 190 // Try bulk load 191 loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys); 192 } 193 } 194 195 private void cleanup(String stagingDir, Table table) { 196 // Release the file system delegation token 197 fsDelegationToken.releaseDelegationToken(); 198 // Delete the staging directory 199 if (stagingDir != null) { 200 try { 201 sinkFs.delete(new Path(stagingDir), true); 202 } catch (IOException e) { 203 LOG.warn("Failed to delete the staging directory " + stagingDir, e); 204 } 205 } 206 // Do not close the file system 207 208 /* 209 * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn( 210 * "Failed to close the file system"); } } 211 */ 212 213 // Close the table 214 if (table != null) { 215 try { 216 table.close(); 217 } catch (IOException e) { 218 LOG.warn("Failed to close the table.", e); 219 } 220 } 221 } 222 223 private Map<String, Path> copyHFilesToStagingDir() throws IOException { 224 Map<String, Path> mapOfCopiedHFiles = new HashMap<>(); 225 Pair<byte[], List<String>> familyHFilePathsPair; 226 List<String> hfilePaths; 227 byte[] family; 228 Path familyStagingDir; 229 int familyHFilePathsPairsListSize; 230 int totalNoOfHFiles; 231 List<Pair<byte[], List<String>>> familyHFilePathsPairsList; 232 FileSystem sourceFs = null; 233 234 try { 235 Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath); 236 /* 237 * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster 238 * has same FS name service then it will return peer cluster FS. To avoid this we explicitly 239 * disable the loading of FS from cache, so that a new FS is created with source cluster 240 * configuration. 241 */ 242 String sourceScheme = sourceClusterPath.toUri().getScheme(); 243 String disableCacheName = 244 String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme }); 245 sourceClusterConf.setBoolean(disableCacheName, true); 246 247 sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf); 248 249 User user = userProvider.getCurrent(); 250 // For each table name in the map 251 for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap 252 .entrySet()) { 253 String tableName = tableEntry.getKey(); 254 255 // Create staging directory for each table 256 Path stagingDir = 257 createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); 258 259 familyHFilePathsPairsList = tableEntry.getValue(); 260 familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); 261 262 // For each list of family hfile paths pair in the table 263 for (int i = 0; i < familyHFilePathsPairsListSize; i++) { 264 familyHFilePathsPair = familyHFilePathsPairsList.get(i); 265 266 family = familyHFilePathsPair.getFirst(); 267 hfilePaths = familyHFilePathsPair.getSecond(); 268 269 familyStagingDir = new Path(stagingDir, Bytes.toString(family)); 270 totalNoOfHFiles = hfilePaths.size(); 271 272 // For each list of hfile paths for the family 273 List<Future<Void>> futures = new ArrayList<>(); 274 Callable<Void> c; 275 Future<Void> future; 276 int currentCopied = 0; 277 // Copy the hfiles parallely 278 while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { 279 c = 280 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 281 currentCopied + this.copiesPerThread)); 282 future = exec.submit(c); 283 futures.add(future); 284 currentCopied += this.copiesPerThread; 285 } 286 287 int remaining = totalNoOfHFiles - currentCopied; 288 if (remaining > 0) { 289 c = 290 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 291 currentCopied + remaining)); 292 future = exec.submit(c); 293 futures.add(future); 294 } 295 296 for (Future<Void> f : futures) { 297 try { 298 f.get(); 299 } catch (InterruptedException e) { 300 InterruptedIOException iioe = 301 new InterruptedIOException( 302 "Failed to copy HFiles to local file system. This will be retried again " 303 + "by the source cluster."); 304 iioe.initCause(e); 305 throw iioe; 306 } catch (ExecutionException e) { 307 throw new IOException("Failed to copy HFiles to local file system. This will " 308 + "be retried again by the source cluster.", e); 309 } 310 } 311 } 312 // Add the staging directory to this table. Staging directory contains all the hfiles 313 // belonging to this table 314 mapOfCopiedHFiles.put(tableName, stagingDir); 315 } 316 return mapOfCopiedHFiles; 317 } finally { 318 if (sourceFs != null) { 319 sourceFs.close(); 320 } 321 if(exec != null) { 322 exec.shutdown(); 323 } 324 } 325 } 326 327 private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { 328 String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); 329 int RANDOM_WIDTH = 320; 330 int RANDOM_RADIX = 32; 331 String doubleUnderScore = UNDERSCORE + UNDERSCORE; 332 String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore 333 + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX)); 334 return createStagingDir(baseDir, user, randomDir); 335 } 336 337 private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException { 338 Path p = new Path(baseDir, randomDir); 339 sinkFs.mkdirs(p, PERM_ALL_ACCESS); 340 sinkFs.setPermission(p, PERM_ALL_ACCESS); 341 return p; 342 } 343 344 /** 345 * This class will copy the given hfiles from the given source file system to the given local file 346 * system staging directory. 347 */ 348 private class Copier implements Callable<Void> { 349 private FileSystem sourceFs; 350 private Path stagingDir; 351 private List<String> hfiles; 352 353 public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles) 354 throws IOException { 355 this.sourceFs = sourceFs; 356 this.stagingDir = stagingDir; 357 this.hfiles = hfiles; 358 } 359 360 @Override 361 public Void call() throws IOException { 362 Path sourceHFilePath; 363 Path localHFilePath; 364 int totalHFiles = hfiles.size(); 365 for (int i = 0; i < totalHFiles; i++) { 366 sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); 367 localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); 368 try { 369 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 370 // If any other exception other than FNFE then we will fail the replication requests and 371 // source will retry to replicate these data. 372 } catch (FileNotFoundException e) { 373 LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 374 + ". Trying to copy from hfile archive directory.", 375 e); 376 sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); 377 378 try { 379 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 380 } catch (FileNotFoundException e1) { 381 // This will mean that the hfile does not exists any where in source cluster FS. So we 382 // cannot do anything here just log and continue. 383 LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 384 + ". Hence ignoring this hfile from replication..", 385 e1); 386 continue; 387 } 388 } 389 sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); 390 } 391 return null; 392 } 393 } 394}