001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.replication.regionserver; 012 013import java.io.Closeable; 014import java.io.FileNotFoundException; 015import java.io.IOException; 016import java.io.InterruptedIOException; 017import java.math.BigInteger; 018import java.security.SecureRandom; 019import java.util.ArrayList; 020import java.util.Deque; 021import java.util.HashMap; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.Map; 025import java.util.Map.Entry; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.fs.FileSystem; 033import org.apache.hadoop.fs.FileUtil; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.fs.permission.FsPermission; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.HConstants; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Table; 042import org.apache.hadoop.hbase.security.User; 043import org.apache.hadoop.hbase.security.UserProvider; 044import org.apache.hadoop.hbase.security.token.FsDelegationToken; 045import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; 046import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.CommonFSUtils; 049import org.apache.hadoop.hbase.util.Pair; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.yetus.audience.InterfaceAudience; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 056 057/** 058 * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local 059 * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of 060 * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster. 061 * Call {@link #close()} when done. 062 */ 063@InterfaceAudience.Private 064public class HFileReplicator implements Closeable { 065 /** Maximum number of threads to allow in pool to copy hfiles during replication */ 066 public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY = 067 "hbase.replication.bulkload.copy.maxthreads"; 068 public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10; 069 /** Number of hfiles to copy per thread during replication */ 070 public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY = 071 "hbase.replication.bulkload.copy.hfiles.perthread"; 072 public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10; 073 074 private static final Logger LOG = LoggerFactory.getLogger(HFileReplicator.class); 075 private static final String UNDERSCORE = "_"; 076 private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); 077 078 private Configuration sourceClusterConf; 079 private String sourceBaseNamespaceDirPath; 080 private String sourceHFileArchiveDirPath; 081 private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap; 082 private FileSystem sinkFs; 083 private FsDelegationToken fsDelegationToken; 084 private UserProvider userProvider; 085 private Configuration conf; 086 private Connection connection; 087 private Path hbaseStagingDir; 088 private ThreadPoolExecutor exec; 089 private int maxCopyThreads; 090 private int copiesPerThread; 091 private List<String> sourceClusterIds; 092 093 public HFileReplicator(Configuration sourceClusterConf, 094 String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath, 095 Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf, 096 Connection connection, List<String> sourceClusterIds) throws IOException { 097 this.sourceClusterConf = sourceClusterConf; 098 this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath; 099 this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath; 100 this.bulkLoadHFileMap = tableQueueMap; 101 this.conf = conf; 102 this.connection = connection; 103 this.sourceClusterIds = sourceClusterIds; 104 105 userProvider = UserProvider.instantiate(conf); 106 fsDelegationToken = new FsDelegationToken(userProvider, "renewer"); 107 this.hbaseStagingDir = 108 new Path(CommonFSUtils.getRootDir(conf), HConstants.BULKLOAD_STAGING_DIR_NAME); 109 this.maxCopyThreads = 110 this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY, 111 REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT); 112 this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS, 113 new ThreadFactoryBuilder().setDaemon(true) 114 .setNameFormat("HFileReplicationCopier-%1$d-" + this.sourceBaseNamespaceDirPath). 115 build()); 116 this.copiesPerThread = 117 conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY, 118 REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT); 119 120 sinkFs = FileSystem.get(conf); 121 } 122 123 @Override 124 public void close() throws IOException { 125 if (this.exec != null) { 126 this.exec.shutdown(); 127 } 128 } 129 130 public Void replicate() throws IOException { 131 // Copy all the hfiles to the local file system 132 Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir(); 133 134 int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); 135 136 for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) { 137 String tableNameString = tableStagingDir.getKey(); 138 Path stagingDir = tableStagingDir.getValue(); 139 140 LoadIncrementalHFiles loadHFiles = null; 141 try { 142 loadHFiles = new LoadIncrementalHFiles(conf); 143 loadHFiles.setClusterIds(sourceClusterIds); 144 } catch (Exception e) { 145 LOG.error("Failed initialize LoadIncrementalHFiles for replicating bulk loaded data.", e); 146 throw new IOException(e); 147 } 148 Configuration newConf = HBaseConfiguration.create(conf); 149 newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no"); 150 loadHFiles.setConf(newConf); 151 152 TableName tableName = TableName.valueOf(tableNameString); 153 Table table = this.connection.getTable(tableName); 154 155 // Prepare collection of queue of hfiles to be loaded(replicated) 156 Deque<LoadQueueItem> queue = new LinkedList<>(); 157 loadHFiles.prepareHFileQueue(stagingDir, table, queue, false); 158 159 if (queue.isEmpty()) { 160 LOG.warn("Did not find any files to replicate in directory {}", stagingDir.toUri()); 161 return null; 162 } 163 164 try (RegionLocator locator = connection.getRegionLocator(tableName)) { 165 fsDelegationToken.acquireDelegationToken(sinkFs); 166 // Set the staging directory which will be used by LoadIncrementalHFiles for loading the 167 // data 168 loadHFiles.setBulkToken(stagingDir.toString()); 169 doBulkLoad(loadHFiles, table, queue, locator, maxRetries); 170 } finally { 171 cleanup(stagingDir.toString(), table); 172 } 173 } 174 return null; 175 } 176 177 private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table, 178 Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException { 179 int count = 0; 180 Pair<byte[][], byte[][]> startEndKeys; 181 while (!queue.isEmpty()) { 182 // need to reload split keys each iteration. 183 startEndKeys = locator.getStartEndKeys(); 184 if (count != 0) { 185 LOG.warn("Error replicating HFiles; retry={} with {} remaining.", count, queue.size()); 186 } 187 188 if (maxRetries != 0 && count >= maxRetries) { 189 throw new IOException("Retry attempted " + count + " times without completing, bailing."); 190 } 191 count++; 192 193 // Try bulk load 194 loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys); 195 } 196 } 197 198 private void cleanup(String stagingDir, Table table) { 199 // Release the file system delegation token 200 fsDelegationToken.releaseDelegationToken(); 201 // Delete the staging directory 202 if (stagingDir != null) { 203 try { 204 sinkFs.delete(new Path(stagingDir), true); 205 } catch (IOException e) { 206 LOG.warn("Failed to delete the staging directory " + stagingDir, e); 207 } 208 } 209 // Do not close the file system 210 211 /* 212 * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn( 213 * "Failed to close the file system"); } } 214 */ 215 216 // Close the table 217 if (table != null) { 218 try { 219 table.close(); 220 } catch (IOException e) { 221 LOG.warn("Failed to close the table.", e); 222 } 223 } 224 } 225 226 private Map<String, Path> copyHFilesToStagingDir() throws IOException { 227 Map<String, Path> mapOfCopiedHFiles = new HashMap<>(); 228 Pair<byte[], List<String>> familyHFilePathsPair; 229 List<String> hfilePaths; 230 byte[] family; 231 Path familyStagingDir; 232 int familyHFilePathsPairsListSize; 233 int totalNoOfHFiles; 234 List<Pair<byte[], List<String>>> familyHFilePathsPairsList; 235 FileSystem sourceFs = null; 236 237 try { 238 Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath); 239 /* 240 * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster 241 * has same FS name service then it will return peer cluster FS. To avoid this we explicitly 242 * disable the loading of FS from cache, so that a new FS is created with source cluster 243 * configuration. 244 */ 245 String sourceScheme = sourceClusterPath.toUri().getScheme(); 246 String disableCacheName = 247 String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme }); 248 sourceClusterConf.setBoolean(disableCacheName, true); 249 250 sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf); 251 252 User user = userProvider.getCurrent(); 253 // For each table name in the map 254 for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap 255 .entrySet()) { 256 String tableName = tableEntry.getKey(); 257 258 // Create staging directory for each table 259 Path stagingDir = 260 createStagingDir(hbaseStagingDir, user, TableName.valueOf(tableName)); 261 262 familyHFilePathsPairsList = tableEntry.getValue(); 263 familyHFilePathsPairsListSize = familyHFilePathsPairsList.size(); 264 265 // For each list of family hfile paths pair in the table 266 for (int i = 0; i < familyHFilePathsPairsListSize; i++) { 267 familyHFilePathsPair = familyHFilePathsPairsList.get(i); 268 269 family = familyHFilePathsPair.getFirst(); 270 hfilePaths = familyHFilePathsPair.getSecond(); 271 272 familyStagingDir = new Path(stagingDir, Bytes.toString(family)); 273 totalNoOfHFiles = hfilePaths.size(); 274 275 // For each list of hfile paths for the family 276 List<Future<Void>> futures = new ArrayList<>(); 277 Callable<Void> c; 278 Future<Void> future; 279 int currentCopied = 0; 280 // Copy the hfiles parallely 281 while (totalNoOfHFiles > currentCopied + this.copiesPerThread) { 282 c = 283 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 284 currentCopied + this.copiesPerThread)); 285 future = exec.submit(c); 286 futures.add(future); 287 currentCopied += this.copiesPerThread; 288 } 289 290 int remaining = totalNoOfHFiles - currentCopied; 291 if (remaining > 0) { 292 c = 293 new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied, 294 currentCopied + remaining)); 295 future = exec.submit(c); 296 futures.add(future); 297 } 298 299 for (Future<Void> f : futures) { 300 try { 301 f.get(); 302 } catch (InterruptedException e) { 303 InterruptedIOException iioe = 304 new InterruptedIOException( 305 "Failed to copy HFiles to local file system. This will be retried again " 306 + "by the source cluster."); 307 iioe.initCause(e); 308 throw iioe; 309 } catch (ExecutionException e) { 310 throw new IOException("Failed to copy HFiles to local file system. This will " 311 + "be retried again by the source cluster.", e); 312 } 313 } 314 } 315 // Add the staging directory to this table. Staging directory contains all the hfiles 316 // belonging to this table 317 mapOfCopiedHFiles.put(tableName, stagingDir); 318 } 319 return mapOfCopiedHFiles; 320 } finally { 321 if (sourceFs != null) { 322 sourceFs.close(); 323 } 324 if(exec != null) { 325 exec.shutdown(); 326 } 327 } 328 } 329 330 private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException { 331 String tblName = tableName.getNameAsString().replace(":", UNDERSCORE); 332 int RANDOM_WIDTH = 320; 333 int RANDOM_RADIX = 32; 334 String doubleUnderScore = UNDERSCORE + UNDERSCORE; 335 String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore 336 + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX)); 337 return createStagingDir(baseDir, user, randomDir); 338 } 339 340 private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException { 341 Path p = new Path(baseDir, randomDir); 342 sinkFs.mkdirs(p, PERM_ALL_ACCESS); 343 sinkFs.setPermission(p, PERM_ALL_ACCESS); 344 return p; 345 } 346 347 /** 348 * This class will copy the given hfiles from the given source file system to the given local file 349 * system staging directory. 350 */ 351 private class Copier implements Callable<Void> { 352 private FileSystem sourceFs; 353 private Path stagingDir; 354 private List<String> hfiles; 355 356 public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles) 357 throws IOException { 358 this.sourceFs = sourceFs; 359 this.stagingDir = stagingDir; 360 this.hfiles = hfiles; 361 } 362 363 @Override 364 public Void call() throws IOException { 365 Path sourceHFilePath; 366 Path localHFilePath; 367 int totalHFiles = hfiles.size(); 368 for (int i = 0; i < totalHFiles; i++) { 369 sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i)); 370 localHFilePath = new Path(stagingDir, sourceHFilePath.getName()); 371 try { 372 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 373 // If any other exception other than FNFE then we will fail the replication requests and 374 // source will retry to replicate these data. 375 } catch (FileNotFoundException e) { 376 LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 377 + ". Trying to copy from hfile archive directory.", 378 e); 379 sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i)); 380 381 try { 382 FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf); 383 } catch (FileNotFoundException e1) { 384 // This will mean that the hfile does not exists any where in source cluster FS. So we 385 // cannot do anything here just log and continue. 386 LOG.debug("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath 387 + ". Hence ignoring this hfile from replication..", 388 e1); 389 continue; 390 } 391 } 392 sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS); 393 } 394 return null; 395 } 396 } 397}