001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.Closeable; 021import java.io.FileNotFoundException; 022import java.io.IOException; 023import java.io.InterruptedIOException; 024import java.math.BigInteger; 025import java.security.SecureRandom; 026import java.util.ArrayList; 027import java.util.Deque; 028import java.util.HashMap; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.Map; 032import java.util.Map.Entry; 033import java.util.concurrent.Callable; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.Future; 036import java.util.concurrent.ThreadPoolExecutor; 037import java.util.concurrent.TimeUnit; 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.fs.FileSystem; 040import org.apache.hadoop.fs.FileUtil; 041import org.apache.hadoop.fs.Path; 042import org.apache.hadoop.fs.permission.FsPermission; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.AsyncClusterConnection; 046import org.apache.hadoop.hbase.security.User; 047import org.apache.hadoop.hbase.security.UserProvider; 048import org.apache.hadoop.hbase.security.token.FsDelegationToken; 049import org.apache.hadoop.hbase.tool.BulkLoadHFiles; 050import org.apache.hadoop.hbase.tool.BulkLoadHFiles.LoadQueueItem; 051import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.hbase.util.Threads; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062/** 063 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local 064 * staging directory and then it will use ({@link BulkLoadHFiles} to prepare a collection of 065 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster. 066 * Call {@link #close()} when done. 067 */ 068@InterfaceAudience.Private 069public class HFileReplicator implements Closeable { 070 /** Maximum number of threads to allow in pool to copy hfiles during replication */ 071 public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = 072 "hbase.replication.bulkload.copy.maxthreads"; 073 public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10; 074 /** Number of hfiles to copy per thread during replication */ 075 public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = 076 "hbase.replication.bulkload.copy.hfiles.perthread"; 077 public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10; 078 079 private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class); 080 private static final String UNDERSCORE = "_"; 081 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); 082 083 private Configuration sourceClusterConf; 084 private String sourceBaseNamespaceDirPath; 085 private String sourceHFileArchiveDirPath; 086 private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap; 087 private FileSystem sinkFs; 088 private FsDelegationToken fsDelegationToken; 089 private UserProvider userProvider; 090 private Configuration conf; 091 private AsyncClusterConnection connection; 092 private Path hbaseStagingDir; 093 private ThreadPoolExecutor exec; 094 private int maxCopyThreads; 095 private int copiesPerThread; 096 private List<String> sourceClusterIds; 097 098 public HFileReplicator(Configuration sourceClusterConf, 099 String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, 100 Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, 101 AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException { 102 this.sourceClusterConf = sourceClusterConf; 103 this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; 104 this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; 105 this.bulkLoadHFileMap = tableQueueMap; 106 this.conf = conf; 107 this.connection = connection; 108 this.sourceClusterIds = sourceClusterIds; 109 110 userProvider = UserProvider.instantiate(conf); 111 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 112 this.hbaseStagingDir = 113 new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME); 114 this.maxCopyThreads = 115 this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 116 REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); 117 this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS, 118 new ThreadFactoryBuilder().setDaemon(true) 119 .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath). 120 build()); 121 this.copiesPerThread = 122 conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 123 REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); 124 125 sinkFs = FileSystem.get(conf); 126 } 127 128 @Override 129 public void close() throws IOException { 130 if (this.exec != null) { 131 this.exec.shutdown(); 132 } 133 } 134 135 public Void replicate() throws IOException { 136 // Copy all the hfiles to the local file system 137 Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir(); 138 139 int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 140 141 for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) { 142 String tableNameString = tableStagingDir.getKey(); 143 Path stagingDir = tableStagingDir.getValue(); 144 TableName tableName = TableName.valueOf(tableNameString); 145 146 // Prepare collection of queue of hfiles to be loaded(replicated) 147 Deque<LoadQueueItem> queue = new LinkedList<>(); 148 BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false, 149 false); 150 151 if (queue.isEmpty()) { 152 LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); 153 return null; 154 } 155 fsDelegationToken.acquireDelegationToken(sinkFs); 156 try { 157 doBulkLoad(conf, tableName, stagingDir, queue, maxRetries); 158 } finally { 159 cleanup(stagingDir); 160 } 161 } 162 return null; 163 } 164 165 private void doBulkLoad(Configuration conf, TableName tableName, Path stagingDir, 166 Deque<LoadQueueItem> queue, int maxRetries) throws IOException { 167 BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf); 168 // Set the staging directory which will be used by BulkLoadHFilesTool for loading the data 169 loader.setBulkToken(stagingDir.toString()); 170 //updating list of cluster ids where this bulkload event has already been processed 171 loader.setClusterIds(sourceClusterIds); 172 for (int count = 0; !queue.isEmpty(); count++) { 173 if (count != 0) { 174 LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size()); 175 } 176 177 if (maxRetries != 0 && count >= maxRetries) { 178 throw new IOException("Retry attempted " + count + " times without completing, bailing."); 179 } 180 181 // Try bulk load 182 loader.loadHFileQueue(connection, tableName, queue, false); 183 } 184 } 185 186 private void cleanup(Path stagingDir) { 187 // Release the file system delegation token 188 fsDelegationToken.releaseDelegationToken(); 189 // Delete the staging directory 190 if (stagingDir != null) { 191 try { 192 sinkFs.delete(stagingDir, true); 193 } catch (IOException e) { 194 LOG.warn("Failed to delete the staging directory " + stagingDir, e); 195 } 196 } 197 // Do not close the file system 198 } 199 200 private Map<String, Path> copyHFilesToStagingDir() throws IOException { 201 Map<String, Path> mapOfCopiedHFiles = new HashMap<>(); 202 Pair<byte[], List<String>> familyHFilePathsPair; 203 List<String> hfilePaths; 204 byte[] family; 205 Path familyStagingDir; 206 int familyHFilePathsPairsListSize; 207 int totalNoOfHFiles; 208 List<Pair<byte[], List<String>>> familyHFilePathsPairsList; 209 FileSystem sourceFs = null; 210 211 try { 212 Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath); 213 /* 214 * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster 215 * has same FS name service then it will return peer cluster FS. To avoid this we explicitly 216 * disable the loading of FS from cache, so that a new FS is created with source cluster 217 * configuration. 218 */ 219 String sourceScheme = sourceClusterPath.toUri().getScheme(); 220 String disableCacheName = 221 String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme }); 222 sourceClusterConf.setBoolean(disableCacheName, true); 223 224 sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf); 225 226 User user = userProvider.getCurrent(); 227 // For each table name in the map 228 for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap 229 .entrySet()) { 230 String tableName = tableEntry.getKey(); 231 232 // Create staging directory for each table 233 Path stagingDir = 234 createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); 235 236 familyHFilePathsPairsList = tableEntry.getValue(); 237 familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); 238 239 // For each list of family hfile paths pair in the table 240 for (int i = 0; i < familyHFilePathsPairsListSize; i++) { 241 familyHFilePathsPair = familyHFilePathsPairsList.get(i); 242 243 family = familyHFilePathsPair.getFirst(); 244 hfilePaths = familyHFilePathsPair.getSecond(); 245 246 familyStagingDir = new Path(stagingDir, Bytes.toString(family)); 247 totalNoOfHFiles = hfilePaths.size(); 248 249 // For each list of hfile paths for the family 250 List<Future<Void>> futures = new ArrayList<>(); 251 Callable<Void> c; 252 Future<Void> future; 253 int currentCopied = 0; 254 // Copy the hfiles parallely 255 while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { 256 c = 257 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 258 currentCopied + this.copiesPerThread)); 259 future = exec.submit(c); 260 futures.add(future); 261 currentCopied += this.copiesPerThread; 262 } 263 264 int remaining = totalNoOfHFiles - currentCopied; 265 if (remaining > 0) { 266 c = 267 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 268 currentCopied + remaining)); 269 future = exec.submit(c); 270 futures.add(future); 271 } 272 273 for (Future<Void> f : futures) { 274 try { 275 f.get(); 276 } catch (InterruptedException e) { 277 InterruptedIOException iioe = 278 new InterruptedIOException( 279 "Failed to copy HFiles to local file system. This will be retried again " 280 + "by the source cluster."); 281 iioe.initCause(e); 282 throw iioe; 283 } catch (ExecutionException e) { 284 throw new IOException("Failed to copy HFiles to local file system. This will " 285 + "be retried again by the source cluster.", e); 286 } 287 } 288 } 289 // Add the staging directory to this table. Staging directory contains all the hfiles 290 // belonging to this table 291 mapOfCopiedHFiles.put(tableName, stagingDir); 292 } 293 return mapOfCopiedHFiles; 294 } finally { 295 if (sourceFs != null) { 296 sourceFs.close(); 297 } 298 if(exec != null) { 299 exec.shutdown(); 300 } 301 } 302 } 303 304 private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { 305 String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); 306 int RANDOM_WIDTH = 320; 307 int RANDOM_RADIX = 32; 308 String doubleUnderScore = UNDERSCORE + UNDERSCORE; 309 String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore 310 + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX)); 311 return createStagingDir(baseDir, user, randomDir); 312 } 313 314 private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException { 315 Path p = new Path(baseDir, randomDir); 316 sinkFs.mkdirs(p, PERM_ALL_ACCESS); 317 sinkFs.setPermission(p, PERM_ALL_ACCESS); 318 return p; 319 } 320 321 /** 322 * This class will copy the given hfiles from the given source file system to the given local file 323 * system staging directory. 324 */ 325 private class Copier implements Callable<Void> { 326 private FileSystem sourceFs; 327 private Path stagingDir; 328 private List<String> hfiles; 329 330 public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles) 331 throws IOException { 332 this.sourceFs = sourceFs; 333 this.stagingDir = stagingDir; 334 this.hfiles = hfiles; 335 } 336 337 @Override 338 public Void call() throws IOException { 339 Path sourceHFilePath; 340 Path localHFilePath; 341 int totalHFiles = hfiles.size(); 342 for (int i = 0; i < totalHFiles; i++) { 343 sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); 344 localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); 345 try { 346 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 347 // If any other exception other than FNFE then we will fail the replication requests and 348 // source will retry to replicate these data. 349 } catch (FileNotFoundException e) { 350 LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 351 + ". Trying to copy from hfile archive directory.", 352 e); 353 sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); 354 355 try { 356 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 357 } catch (FileNotFoundException e1) { 358 // This will mean that the hfile does not exists any where in source cluster FS. So we 359 // cannot do anything here just log and continue. 360 LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 361 + ". Hence ignoring this hfile from replication..", 362 e1); 363 continue; 364 } 365 } 366 sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); 367 } 368 return null; 369 } 370 } 371}