001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.snapshot; 020 021import java.io.BufferedInputStream; 022import java.io.DataInput; 023import java.io.DataOutput; 024import java.io.FileNotFoundException; 025import java.io.IOException; 026import java.io.InputStream; 027import java.util.ArrayList; 028import java.util.Collections; 029import java.util.Comparator; 030import java.util.LinkedList; 031import java.util.List; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.fs.FSDataInputStream; 035import org.apache.hadoop.fs.FSDataOutputStream; 036import org.apache.hadoop.fs.FileChecksum; 037import org.apache.hadoop.fs.FileStatus; 038import org.apache.hadoop.fs.FileSystem; 039import org.apache.hadoop.fs.FileUtil; 040import org.apache.hadoop.fs.Path; 041import org.apache.hadoop.fs.permission.FsPermission; 042import org.apache.hadoop.hbase.HBaseConfiguration; 043import org.apache.hadoop.hbase.HConstants; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.io.FileLink; 047import org.apache.hadoop.hbase.io.HFileLink; 048import org.apache.hadoop.hbase.io.WALLink; 049import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; 050import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; 051import org.apache.hadoop.hbase.mob.MobUtils; 052import org.apache.hadoop.hbase.util.AbstractHBaseTool; 053import org.apache.hadoop.hbase.util.FSUtils; 054import org.apache.hadoop.hbase.util.HFileArchiveUtil; 055import org.apache.hadoop.hbase.util.Pair; 056import org.apache.hadoop.io.BytesWritable; 057import org.apache.hadoop.io.IOUtils; 058import org.apache.hadoop.io.NullWritable; 059import org.apache.hadoop.io.Writable; 060import org.apache.hadoop.mapreduce.InputFormat; 061import org.apache.hadoop.mapreduce.InputSplit; 062import org.apache.hadoop.mapreduce.Job; 063import org.apache.hadoop.mapreduce.JobContext; 064import org.apache.hadoop.mapreduce.Mapper; 065import org.apache.hadoop.mapreduce.RecordReader; 066import org.apache.hadoop.mapreduce.TaskAttemptContext; 067import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; 068import org.apache.hadoop.mapreduce.security.TokenCache; 069import org.apache.hadoop.util.StringUtils; 070import org.apache.hadoop.util.Tool; 071import org.apache.yetus.audience.InterfaceAudience; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; 075import org.apache.hbase.thirdparty.org.apache.commons.cli.Option; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 079 080/** 081 * Export the specified snapshot to a given FileSystem. 082 * 083 * The .snapshot/name folder is copied to the destination cluster 084 * and then all the hfiles/wals are copied using a Map-Reduce Job in the .archive/ location. 085 * When everything is done, the second cluster can restore the snapshot. 086 */ 087@InterfaceAudience.Public 088public class ExportSnapshot extends AbstractHBaseTool implements Tool { 089 public static final String NAME = "exportsnapshot"; 090 /** Configuration prefix for overrides for the source filesystem */ 091 public static final String CONF_SOURCE_PREFIX = NAME + ".from."; 092 /** Configuration prefix for overrides for the destination filesystem */ 093 public static final String CONF_DEST_PREFIX = NAME + ".to."; 094 095 private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshot.class); 096 097 private static final String MR_NUM_MAPS = "mapreduce.job.maps"; 098 private static final String CONF_NUM_SPLITS = "snapshot.export.format.splits"; 099 private static final String CONF_SNAPSHOT_NAME = "snapshot.export.format.snapshot.name"; 100 private static final String CONF_SNAPSHOT_DIR = "snapshot.export.format.snapshot.dir"; 101 private static final String CONF_FILES_USER = "snapshot.export.files.attributes.user"; 102 private static final String CONF_FILES_GROUP = "snapshot.export.files.attributes.group"; 103 private static final String CONF_FILES_MODE = "snapshot.export.files.attributes.mode"; 104 private static final String CONF_CHECKSUM_VERIFY = "snapshot.export.checksum.verify"; 105 private static final String CONF_OUTPUT_ROOT = "snapshot.export.output.root"; 106 private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; 107 private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; 108 private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; 109 private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb"; 110 private static final String CONF_MR_JOB_NAME = "mapreduce.job.name"; 111 protected static final String CONF_SKIP_TMP = "snapshot.export.skip.tmp"; 112 113 static class Testing { 114 static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; 115 static final String CONF_TEST_FAILURE_COUNT = "test.snapshot.export.failure.count"; 116 int failuresCountToInject = 0; 117 int injectedFailureCount = 0; 118 } 119 120 // Command line options and defaults. 121 static final class Options { 122 static final Option SNAPSHOT = new Option(null, "snapshot", true, "Snapshot to restore."); 123 static final Option TARGET_NAME = new Option(null, "target", true, 124 "Target name for the snapshot."); 125 static final Option COPY_TO = new Option(null, "copy-to", true, "Remote " 126 + "destination hdfs://"); 127 static final Option COPY_FROM = new Option(null, "copy-from", true, 128 "Input folder hdfs:// (default hbase.rootdir)"); 129 static final Option NO_CHECKSUM_VERIFY = new Option(null, "no-checksum-verify", false, 130 "Do not verify checksum, use name+length only."); 131 static final Option NO_TARGET_VERIFY = new Option(null, "no-target-verify", false, 132 "Do not verify the integrity of the exported snapshot."); 133 static final Option OVERWRITE = new Option(null, "overwrite", false, 134 "Rewrite the snapshot manifest if already exists."); 135 static final Option CHUSER = new Option(null, "chuser", true, 136 "Change the owner of the files to the specified one."); 137 static final Option CHGROUP = new Option(null, "chgroup", true, 138 "Change the group of the files to the specified one."); 139 static final Option CHMOD = new Option(null, "chmod", true, 140 "Change the permission of the files to the specified one."); 141 static final Option MAPPERS = new Option(null, "mappers", true, 142 "Number of mappers to use during the copy (mapreduce.job.maps)."); 143 static final Option BANDWIDTH = new Option(null, "bandwidth", true, 144 "Limit bandwidth to this value in MB/second."); 145 } 146 147 // Export Map-Reduce Counters, to keep track of the progress 148 public enum Counter { 149 MISSING_FILES, FILES_COPIED, FILES_SKIPPED, COPY_FAILED, 150 BYTES_EXPECTED, BYTES_SKIPPED, BYTES_COPIED 151 } 152 153 private static class ExportMapper extends Mapper<BytesWritable, NullWritable, 154 NullWritable, NullWritable> { 155 private static final Logger LOG = LoggerFactory.getLogger(ExportMapper.class); 156 final static int REPORT_SIZE = 1 * 1024 * 1024; 157 final static int BUFFER_SIZE = 64 * 1024; 158 159 private boolean verifyChecksum; 160 private String filesGroup; 161 private String filesUser; 162 private short filesMode; 163 private int bufferSize; 164 165 private FileSystem outputFs; 166 private Path outputArchive; 167 private Path outputRoot; 168 169 private FileSystem inputFs; 170 private Path inputArchive; 171 private Path inputRoot; 172 173 private static Testing testing = new Testing(); 174 175 @Override 176 public void setup(Context context) throws IOException { 177 Configuration conf = context.getConfiguration(); 178 179 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 180 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 181 182 verifyChecksum = conf.getBoolean(CONF_CHECKSUM_VERIFY, true); 183 184 filesGroup = conf.get(CONF_FILES_GROUP); 185 filesUser = conf.get(CONF_FILES_USER); 186 filesMode = (short)conf.getInt(CONF_FILES_MODE, 0); 187 outputRoot = new Path(conf.get(CONF_OUTPUT_ROOT)); 188 inputRoot = new Path(conf.get(CONF_INPUT_ROOT)); 189 190 inputArchive = new Path(inputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 191 outputArchive = new Path(outputRoot, HConstants.HFILE_ARCHIVE_DIRECTORY); 192 193 try { 194 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 195 inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 196 } catch (IOException e) { 197 throw new IOException("Could not get the input FileSystem with root=" + inputRoot, e); 198 } 199 200 try { 201 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 202 outputFs = FileSystem.get(outputRoot.toUri(), destConf); 203 } catch (IOException e) { 204 throw new IOException("Could not get the output FileSystem with root="+ outputRoot, e); 205 } 206 207 // Use the default block size of the outputFs if bigger 208 int defaultBlockSize = Math.max((int) outputFs.getDefaultBlockSize(outputRoot), BUFFER_SIZE); 209 bufferSize = conf.getInt(CONF_BUFFER_SIZE, defaultBlockSize); 210 LOG.info("Using bufferSize=" + StringUtils.humanReadableInt(bufferSize)); 211 212 for (Counter c : Counter.values()) { 213 context.getCounter(c).increment(0); 214 } 215 if (context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) { 216 testing.failuresCountToInject = conf.getInt(Testing.CONF_TEST_FAILURE_COUNT, 0); 217 // Get number of times we have already injected failure based on attempt number of this 218 // task. 219 testing.injectedFailureCount = context.getTaskAttemptID().getId(); 220 } 221 } 222 223 @Override 224 protected void cleanup(Context context) { 225 IOUtils.closeStream(inputFs); 226 IOUtils.closeStream(outputFs); 227 } 228 229 @Override 230 public void map(BytesWritable key, NullWritable value, Context context) 231 throws InterruptedException, IOException { 232 SnapshotFileInfo inputInfo = SnapshotFileInfo.parseFrom(key.copyBytes()); 233 Path outputPath = getOutputPath(inputInfo); 234 235 copyFile(context, inputInfo, outputPath); 236 } 237 238 /** 239 * Returns the location where the inputPath will be copied. 240 */ 241 private Path getOutputPath(final SnapshotFileInfo inputInfo) throws IOException { 242 Path path = null; 243 switch (inputInfo.getType()) { 244 case HFILE: 245 Path inputPath = new Path(inputInfo.getHfile()); 246 String family = inputPath.getParent().getName(); 247 TableName table =HFileLink.getReferencedTableName(inputPath.getName()); 248 String region = HFileLink.getReferencedRegionName(inputPath.getName()); 249 String hfile = HFileLink.getReferencedHFileName(inputPath.getName()); 250 path = new Path(FSUtils.getTableDir(new Path("./"), table), 251 new Path(region, new Path(family, hfile))); 252 break; 253 case WAL: 254 LOG.warn("snapshot does not keeps WALs: " + inputInfo); 255 break; 256 default: 257 throw new IOException("Invalid File Type: " + inputInfo.getType().toString()); 258 } 259 return new Path(outputArchive, path); 260 } 261 262 @SuppressWarnings("checkstyle:linelength") 263 /** 264 * Used by TestExportSnapshot to test for retries when failures happen. 265 * Failure is injected in {@link #copyFile(Mapper.Context, org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotFileInfo, Path)}. 266 */ 267 private void injectTestFailure(final Context context, final SnapshotFileInfo inputInfo) 268 throws IOException { 269 if (!context.getConfiguration().getBoolean(Testing.CONF_TEST_FAILURE, false)) return; 270 if (testing.injectedFailureCount >= testing.failuresCountToInject) return; 271 testing.injectedFailureCount++; 272 context.getCounter(Counter.COPY_FAILED).increment(1); 273 LOG.debug("Injecting failure. Count: " + testing.injectedFailureCount); 274 throw new IOException(String.format("TEST FAILURE (%d of max %d): Unable to copy input=%s", 275 testing.injectedFailureCount, testing.failuresCountToInject, inputInfo)); 276 } 277 278 private void copyFile(final Context context, final SnapshotFileInfo inputInfo, 279 final Path outputPath) throws IOException { 280 // Get the file information 281 FileStatus inputStat = getSourceFileStatus(context, inputInfo); 282 283 // Verify if the output file exists and is the same that we want to copy 284 if (outputFs.exists(outputPath)) { 285 FileStatus outputStat = outputFs.getFileStatus(outputPath); 286 if (outputStat != null && sameFile(inputStat, outputStat)) { 287 LOG.info("Skip copy " + inputStat.getPath() + " to " + outputPath + ", same file."); 288 context.getCounter(Counter.FILES_SKIPPED).increment(1); 289 context.getCounter(Counter.BYTES_SKIPPED).increment(inputStat.getLen()); 290 return; 291 } 292 } 293 294 InputStream in = openSourceFile(context, inputInfo); 295 int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100); 296 if (Integer.MAX_VALUE != bandwidthMB) { 297 in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024L); 298 } 299 300 try { 301 context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); 302 303 // Ensure that the output folder is there and copy the file 304 createOutputPath(outputPath.getParent()); 305 FSDataOutputStream out = outputFs.create(outputPath, true); 306 try { 307 copyData(context, inputStat.getPath(), in, outputPath, out, inputStat.getLen()); 308 } finally { 309 out.close(); 310 } 311 312 // Try to Preserve attributes 313 if (!preserveAttributes(outputPath, inputStat)) { 314 LOG.warn("You may have to run manually chown on: " + outputPath); 315 } 316 } finally { 317 in.close(); 318 injectTestFailure(context, inputInfo); 319 } 320 } 321 322 /** 323 * Create the output folder and optionally set ownership. 324 */ 325 private void createOutputPath(final Path path) throws IOException { 326 if (filesUser == null && filesGroup == null) { 327 outputFs.mkdirs(path); 328 } else { 329 Path parent = path.getParent(); 330 if (!outputFs.exists(parent) && !parent.isRoot()) { 331 createOutputPath(parent); 332 } 333 outputFs.mkdirs(path); 334 if (filesUser != null || filesGroup != null) { 335 // override the owner when non-null user/group is specified 336 outputFs.setOwner(path, filesUser, filesGroup); 337 } 338 if (filesMode > 0) { 339 outputFs.setPermission(path, new FsPermission(filesMode)); 340 } 341 } 342 } 343 344 /** 345 * Try to Preserve the files attribute selected by the user copying them from the source file 346 * This is only required when you are exporting as a different user than "hbase" or on a system 347 * that doesn't have the "hbase" user. 348 * 349 * This is not considered a blocking failure since the user can force a chmod with the user 350 * that knows is available on the system. 351 */ 352 private boolean preserveAttributes(final Path path, final FileStatus refStat) { 353 FileStatus stat; 354 try { 355 stat = outputFs.getFileStatus(path); 356 } catch (IOException e) { 357 LOG.warn("Unable to get the status for file=" + path); 358 return false; 359 } 360 361 try { 362 if (filesMode > 0 && stat.getPermission().toShort() != filesMode) { 363 outputFs.setPermission(path, new FsPermission(filesMode)); 364 } else if (refStat != null && !stat.getPermission().equals(refStat.getPermission())) { 365 outputFs.setPermission(path, refStat.getPermission()); 366 } 367 } catch (IOException e) { 368 LOG.warn("Unable to set the permission for file="+ stat.getPath() +": "+ e.getMessage()); 369 return false; 370 } 371 372 boolean hasRefStat = (refStat != null); 373 String user = stringIsNotEmpty(filesUser) || !hasRefStat ? filesUser : refStat.getOwner(); 374 String group = stringIsNotEmpty(filesGroup) || !hasRefStat ? filesGroup : refStat.getGroup(); 375 if (stringIsNotEmpty(user) || stringIsNotEmpty(group)) { 376 try { 377 if (!(user.equals(stat.getOwner()) && group.equals(stat.getGroup()))) { 378 outputFs.setOwner(path, user, group); 379 } 380 } catch (IOException e) { 381 LOG.warn("Unable to set the owner/group for file="+ stat.getPath() +": "+ e.getMessage()); 382 LOG.warn("The user/group may not exist on the destination cluster: user=" + 383 user + " group=" + group); 384 return false; 385 } 386 } 387 388 return true; 389 } 390 391 private boolean stringIsNotEmpty(final String str) { 392 return str != null && str.length() > 0; 393 } 394 395 private void copyData(final Context context, 396 final Path inputPath, final InputStream in, 397 final Path outputPath, final FSDataOutputStream out, 398 final long inputFileSize) 399 throws IOException { 400 final String statusMessage = "copied %s/" + StringUtils.humanReadableInt(inputFileSize) + 401 " (%.1f%%)"; 402 403 try { 404 byte[] buffer = new byte[bufferSize]; 405 long totalBytesWritten = 0; 406 int reportBytes = 0; 407 int bytesRead; 408 409 long stime = System.currentTimeMillis(); 410 while ((bytesRead = in.read(buffer)) > 0) { 411 out.write(buffer, 0, bytesRead); 412 totalBytesWritten += bytesRead; 413 reportBytes += bytesRead; 414 415 if (reportBytes >= REPORT_SIZE) { 416 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 417 context.setStatus(String.format(statusMessage, 418 StringUtils.humanReadableInt(totalBytesWritten), 419 (totalBytesWritten/(float)inputFileSize) * 100.0f) + 420 " from " + inputPath + " to " + outputPath); 421 reportBytes = 0; 422 } 423 } 424 long etime = System.currentTimeMillis(); 425 426 context.getCounter(Counter.BYTES_COPIED).increment(reportBytes); 427 context.setStatus(String.format(statusMessage, 428 StringUtils.humanReadableInt(totalBytesWritten), 429 (totalBytesWritten/(float)inputFileSize) * 100.0f) + 430 " from " + inputPath + " to " + outputPath); 431 432 // Verify that the written size match 433 if (totalBytesWritten != inputFileSize) { 434 String msg = "number of bytes copied not matching copied=" + totalBytesWritten + 435 " expected=" + inputFileSize + " for file=" + inputPath; 436 throw new IOException(msg); 437 } 438 439 LOG.info("copy completed for input=" + inputPath + " output=" + outputPath); 440 LOG.info("size=" + totalBytesWritten + 441 " (" + StringUtils.humanReadableInt(totalBytesWritten) + ")" + 442 " time=" + StringUtils.formatTimeDiff(etime, stime) + 443 String.format(" %.3fM/sec", (totalBytesWritten / ((etime - stime)/1000.0))/1048576.0)); 444 context.getCounter(Counter.FILES_COPIED).increment(1); 445 } catch (IOException e) { 446 LOG.error("Error copying " + inputPath + " to " + outputPath, e); 447 context.getCounter(Counter.COPY_FAILED).increment(1); 448 throw e; 449 } 450 } 451 452 /** 453 * Try to open the "source" file. 454 * Throws an IOException if the communication with the inputFs fail or 455 * if the file is not found. 456 */ 457 private FSDataInputStream openSourceFile(Context context, final SnapshotFileInfo fileInfo) 458 throws IOException { 459 try { 460 Configuration conf = context.getConfiguration(); 461 FileLink link = null; 462 switch (fileInfo.getType()) { 463 case HFILE: 464 Path inputPath = new Path(fileInfo.getHfile()); 465 link = getFileLink(inputPath, conf); 466 break; 467 case WAL: 468 String serverName = fileInfo.getWalServer(); 469 String logName = fileInfo.getWalName(); 470 link = new WALLink(inputRoot, serverName, logName); 471 break; 472 default: 473 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 474 } 475 return link.open(inputFs); 476 } catch (IOException e) { 477 context.getCounter(Counter.MISSING_FILES).increment(1); 478 LOG.error("Unable to open source file=" + fileInfo.toString(), e); 479 throw e; 480 } 481 } 482 483 private FileStatus getSourceFileStatus(Context context, final SnapshotFileInfo fileInfo) 484 throws IOException { 485 try { 486 Configuration conf = context.getConfiguration(); 487 FileLink link = null; 488 switch (fileInfo.getType()) { 489 case HFILE: 490 Path inputPath = new Path(fileInfo.getHfile()); 491 link = getFileLink(inputPath, conf); 492 break; 493 case WAL: 494 link = new WALLink(inputRoot, fileInfo.getWalServer(), fileInfo.getWalName()); 495 break; 496 default: 497 throw new IOException("Invalid File Type: " + fileInfo.getType().toString()); 498 } 499 return link.getFileStatus(inputFs); 500 } catch (FileNotFoundException e) { 501 context.getCounter(Counter.MISSING_FILES).increment(1); 502 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 503 throw e; 504 } catch (IOException e) { 505 LOG.error("Unable to get the status for source file=" + fileInfo.toString(), e); 506 throw e; 507 } 508 } 509 510 private FileLink getFileLink(Path path, Configuration conf) throws IOException{ 511 String regionName = HFileLink.getReferencedRegionName(path.getName()); 512 TableName tableName = HFileLink.getReferencedTableName(path.getName()); 513 if(MobUtils.getMobRegionInfo(tableName).getEncodedName().equals(regionName)) { 514 return HFileLink.buildFromHFileLinkPattern(MobUtils.getQualifiedMobRootDir(conf), 515 HFileArchiveUtil.getArchivePath(conf), path); 516 } 517 return HFileLink.buildFromHFileLinkPattern(inputRoot, inputArchive, path); 518 } 519 520 private FileChecksum getFileChecksum(final FileSystem fs, final Path path) { 521 try { 522 return fs.getFileChecksum(path); 523 } catch (IOException e) { 524 LOG.warn("Unable to get checksum for file=" + path, e); 525 return null; 526 } 527 } 528 529 /** 530 * Check if the two files are equal by looking at the file length, 531 * and at the checksum (if user has specified the verifyChecksum flag). 532 */ 533 private boolean sameFile(final FileStatus inputStat, final FileStatus outputStat) { 534 // Not matching length 535 if (inputStat.getLen() != outputStat.getLen()) return false; 536 537 // Mark files as equals, since user asked for no checksum verification 538 if (!verifyChecksum) return true; 539 540 // If checksums are not available, files are not the same. 541 FileChecksum inChecksum = getFileChecksum(inputFs, inputStat.getPath()); 542 if (inChecksum == null) return false; 543 544 FileChecksum outChecksum = getFileChecksum(outputFs, outputStat.getPath()); 545 if (outChecksum == null) return false; 546 547 return inChecksum.equals(outChecksum); 548 } 549 } 550 551 // ========================================================================== 552 // Input Format 553 // ========================================================================== 554 555 /** 556 * Extract the list of files (HFiles/WALs) to copy using Map-Reduce. 557 * @return list of files referenced by the snapshot (pair of path and size) 558 */ 559 private static List<Pair<SnapshotFileInfo, Long>> getSnapshotFiles(final Configuration conf, 560 final FileSystem fs, final Path snapshotDir) throws IOException { 561 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 562 563 final List<Pair<SnapshotFileInfo, Long>> files = new ArrayList<>(); 564 final TableName table = TableName.valueOf(snapshotDesc.getTable()); 565 566 // Get snapshot files 567 LOG.info("Loading Snapshot '" + snapshotDesc.getName() + "' hfile list"); 568 SnapshotReferenceUtil.visitReferencedFiles(conf, fs, snapshotDir, snapshotDesc, 569 new SnapshotReferenceUtil.SnapshotVisitor() { 570 @Override 571 public void storeFile(final RegionInfo regionInfo, final String family, 572 final SnapshotRegionManifest.StoreFile storeFile) throws IOException { 573 // for storeFile.hasReference() case, copied as part of the manifest 574 if (!storeFile.hasReference()) { 575 String region = regionInfo.getEncodedName(); 576 String hfile = storeFile.getName(); 577 Path path = HFileLink.createPath(table, region, family, hfile); 578 579 SnapshotFileInfo fileInfo = SnapshotFileInfo.newBuilder() 580 .setType(SnapshotFileInfo.Type.HFILE) 581 .setHfile(path.toString()) 582 .build(); 583 584 long size; 585 if (storeFile.hasFileSize()) { 586 size = storeFile.getFileSize(); 587 } else { 588 size = HFileLink.buildFromHFileLinkPattern(conf, path).getFileStatus(fs).getLen(); 589 } 590 files.add(new Pair<>(fileInfo, size)); 591 } 592 } 593 }); 594 595 return files; 596 } 597 598 /** 599 * Given a list of file paths and sizes, create around ngroups in as balanced a way as possible. 600 * The groups created will have similar amounts of bytes. 601 * <p> 602 * The algorithm used is pretty straightforward; the file list is sorted by size, 603 * and then each group fetch the bigger file available, iterating through groups 604 * alternating the direction. 605 */ 606 static List<List<Pair<SnapshotFileInfo, Long>>> getBalancedSplits( 607 final List<Pair<SnapshotFileInfo, Long>> files, final int ngroups) { 608 // Sort files by size, from small to big 609 Collections.sort(files, new Comparator<Pair<SnapshotFileInfo, Long>>() { 610 public int compare(Pair<SnapshotFileInfo, Long> a, Pair<SnapshotFileInfo, Long> b) { 611 long r = a.getSecond() - b.getSecond(); 612 return (r < 0) ? -1 : ((r > 0) ? 1 : 0); 613 } 614 }); 615 616 // create balanced groups 617 List<List<Pair<SnapshotFileInfo, Long>>> fileGroups = new LinkedList<>(); 618 long[] sizeGroups = new long[ngroups]; 619 int hi = files.size() - 1; 620 int lo = 0; 621 622 List<Pair<SnapshotFileInfo, Long>> group; 623 int dir = 1; 624 int g = 0; 625 626 while (hi >= lo) { 627 if (g == fileGroups.size()) { 628 group = new LinkedList<>(); 629 fileGroups.add(group); 630 } else { 631 group = fileGroups.get(g); 632 } 633 634 Pair<SnapshotFileInfo, Long> fileInfo = files.get(hi--); 635 636 // add the hi one 637 sizeGroups[g] += fileInfo.getSecond(); 638 group.add(fileInfo); 639 640 // change direction when at the end or the beginning 641 g += dir; 642 if (g == ngroups) { 643 dir = -1; 644 g = ngroups - 1; 645 } else if (g < 0) { 646 dir = 1; 647 g = 0; 648 } 649 } 650 651 if (LOG.isDebugEnabled()) { 652 for (int i = 0; i < sizeGroups.length; ++i) { 653 LOG.debug("export split=" + i + " size=" + StringUtils.humanReadableInt(sizeGroups[i])); 654 } 655 } 656 657 return fileGroups; 658 } 659 660 private static class ExportSnapshotInputFormat extends InputFormat<BytesWritable, NullWritable> { 661 @Override 662 public RecordReader<BytesWritable, NullWritable> createRecordReader(InputSplit split, 663 TaskAttemptContext tac) throws IOException, InterruptedException { 664 return new ExportSnapshotRecordReader(((ExportSnapshotInputSplit)split).getSplitKeys()); 665 } 666 667 @Override 668 public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { 669 Configuration conf = context.getConfiguration(); 670 Path snapshotDir = new Path(conf.get(CONF_SNAPSHOT_DIR)); 671 FileSystem fs = FileSystem.get(snapshotDir.toUri(), conf); 672 673 List<Pair<SnapshotFileInfo, Long>> snapshotFiles = getSnapshotFiles(conf, fs, snapshotDir); 674 int mappers = conf.getInt(CONF_NUM_SPLITS, 0); 675 if (mappers == 0 && snapshotFiles.size() > 0) { 676 mappers = 1 + (snapshotFiles.size() / conf.getInt(CONF_MAP_GROUP, 10)); 677 mappers = Math.min(mappers, snapshotFiles.size()); 678 conf.setInt(CONF_NUM_SPLITS, mappers); 679 conf.setInt(MR_NUM_MAPS, mappers); 680 } 681 682 List<List<Pair<SnapshotFileInfo, Long>>> groups = getBalancedSplits(snapshotFiles, mappers); 683 List<InputSplit> splits = new ArrayList(groups.size()); 684 for (List<Pair<SnapshotFileInfo, Long>> files: groups) { 685 splits.add(new ExportSnapshotInputSplit(files)); 686 } 687 return splits; 688 } 689 690 private static class ExportSnapshotInputSplit extends InputSplit implements Writable { 691 private List<Pair<BytesWritable, Long>> files; 692 private long length; 693 694 public ExportSnapshotInputSplit() { 695 this.files = null; 696 } 697 698 public ExportSnapshotInputSplit(final List<Pair<SnapshotFileInfo, Long>> snapshotFiles) { 699 this.files = new ArrayList(snapshotFiles.size()); 700 for (Pair<SnapshotFileInfo, Long> fileInfo: snapshotFiles) { 701 this.files.add(new Pair<>( 702 new BytesWritable(fileInfo.getFirst().toByteArray()), fileInfo.getSecond())); 703 this.length += fileInfo.getSecond(); 704 } 705 } 706 707 private List<Pair<BytesWritable, Long>> getSplitKeys() { 708 return files; 709 } 710 711 @Override 712 public long getLength() throws IOException, InterruptedException { 713 return length; 714 } 715 716 @Override 717 public String[] getLocations() throws IOException, InterruptedException { 718 return new String[] {}; 719 } 720 721 @Override 722 public void readFields(DataInput in) throws IOException { 723 int count = in.readInt(); 724 files = new ArrayList<>(count); 725 length = 0; 726 for (int i = 0; i < count; ++i) { 727 BytesWritable fileInfo = new BytesWritable(); 728 fileInfo.readFields(in); 729 long size = in.readLong(); 730 files.add(new Pair<>(fileInfo, size)); 731 length += size; 732 } 733 } 734 735 @Override 736 public void write(DataOutput out) throws IOException { 737 out.writeInt(files.size()); 738 for (final Pair<BytesWritable, Long> fileInfo: files) { 739 fileInfo.getFirst().write(out); 740 out.writeLong(fileInfo.getSecond()); 741 } 742 } 743 } 744 745 private static class ExportSnapshotRecordReader 746 extends RecordReader<BytesWritable, NullWritable> { 747 private final List<Pair<BytesWritable, Long>> files; 748 private long totalSize = 0; 749 private long procSize = 0; 750 private int index = -1; 751 752 ExportSnapshotRecordReader(final List<Pair<BytesWritable, Long>> files) { 753 this.files = files; 754 for (Pair<BytesWritable, Long> fileInfo: files) { 755 totalSize += fileInfo.getSecond(); 756 } 757 } 758 759 @Override 760 public void close() { } 761 762 @Override 763 public BytesWritable getCurrentKey() { return files.get(index).getFirst(); } 764 765 @Override 766 public NullWritable getCurrentValue() { return NullWritable.get(); } 767 768 @Override 769 public float getProgress() { return (float)procSize / totalSize; } 770 771 @Override 772 public void initialize(InputSplit split, TaskAttemptContext tac) { } 773 774 @Override 775 public boolean nextKeyValue() { 776 if (index >= 0) { 777 procSize += files.get(index).getSecond(); 778 } 779 return(++index < files.size()); 780 } 781 } 782 } 783 784 // ========================================================================== 785 // Tool 786 // ========================================================================== 787 788 /** 789 * Run Map-Reduce Job to perform the files copy. 790 */ 791 private void runCopyJob(final Path inputRoot, final Path outputRoot, 792 final String snapshotName, final Path snapshotDir, final boolean verifyChecksum, 793 final String filesUser, final String filesGroup, final int filesMode, 794 final int mappers, final int bandwidthMB) 795 throws IOException, InterruptedException, ClassNotFoundException { 796 Configuration conf = getConf(); 797 if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); 798 if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); 799 if (mappers > 0) { 800 conf.setInt(CONF_NUM_SPLITS, mappers); 801 conf.setInt(MR_NUM_MAPS, mappers); 802 } 803 conf.setInt(CONF_FILES_MODE, filesMode); 804 conf.setBoolean(CONF_CHECKSUM_VERIFY, verifyChecksum); 805 conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); 806 conf.set(CONF_INPUT_ROOT, inputRoot.toString()); 807 conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB); 808 conf.set(CONF_SNAPSHOT_NAME, snapshotName); 809 conf.set(CONF_SNAPSHOT_DIR, snapshotDir.toString()); 810 811 String jobname = conf.get(CONF_MR_JOB_NAME, "ExportSnapshot-" + snapshotName); 812 Job job = new Job(conf); 813 job.setJobName(jobname); 814 job.setJarByClass(ExportSnapshot.class); 815 TableMapReduceUtil.addDependencyJars(job); 816 job.setMapperClass(ExportMapper.class); 817 job.setInputFormatClass(ExportSnapshotInputFormat.class); 818 job.setOutputFormatClass(NullOutputFormat.class); 819 job.setMapSpeculativeExecution(false); 820 job.setNumReduceTasks(0); 821 822 // Acquire the delegation Tokens 823 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 824 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 825 new Path[] { inputRoot }, srcConf); 826 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 827 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 828 new Path[] { outputRoot }, destConf); 829 830 // Run the MR Job 831 if (!job.waitForCompletion(true)) { 832 throw new ExportSnapshotException(job.getStatus().getFailureInfo()); 833 } 834 } 835 836 private void verifySnapshot(final Configuration baseConf, 837 final FileSystem fs, final Path rootDir, final Path snapshotDir) throws IOException { 838 // Update the conf with the current root dir, since may be a different cluster 839 Configuration conf = new Configuration(baseConf); 840 FSUtils.setRootDir(conf, rootDir); 841 FSUtils.setFsDefault(conf, FSUtils.getRootDir(conf)); 842 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 843 SnapshotReferenceUtil.verifySnapshot(conf, fs, snapshotDir, snapshotDesc); 844 } 845 846 /** 847 * Set path ownership. 848 */ 849 private void setOwner(final FileSystem fs, final Path path, final String user, 850 final String group, final boolean recursive) throws IOException { 851 if (user != null || group != null) { 852 if (recursive && fs.isDirectory(path)) { 853 for (FileStatus child : fs.listStatus(path)) { 854 setOwner(fs, child.getPath(), user, group, recursive); 855 } 856 } 857 fs.setOwner(path, user, group); 858 } 859 } 860 861 /** 862 * Set path permission. 863 */ 864 private void setPermission(final FileSystem fs, final Path path, final short filesMode, 865 final boolean recursive) throws IOException { 866 if (filesMode > 0) { 867 FsPermission perm = new FsPermission(filesMode); 868 if (recursive && fs.isDirectory(path)) { 869 for (FileStatus child : fs.listStatus(path)) { 870 setPermission(fs, child.getPath(), filesMode, recursive); 871 } 872 } 873 fs.setPermission(path, perm); 874 } 875 } 876 877 private boolean verifyTarget = true; 878 private boolean verifyChecksum = true; 879 private String snapshotName = null; 880 private String targetName = null; 881 private boolean overwrite = false; 882 private String filesGroup = null; 883 private String filesUser = null; 884 private Path outputRoot = null; 885 private Path inputRoot = null; 886 private int bandwidthMB = Integer.MAX_VALUE; 887 private int filesMode = 0; 888 private int mappers = 0; 889 890 @Override 891 protected void processOptions(CommandLine cmd) { 892 snapshotName = cmd.getOptionValue(Options.SNAPSHOT.getLongOpt(), snapshotName); 893 targetName = cmd.getOptionValue(Options.TARGET_NAME.getLongOpt(), targetName); 894 if (cmd.hasOption(Options.COPY_TO.getLongOpt())) { 895 outputRoot = new Path(cmd.getOptionValue(Options.COPY_TO.getLongOpt())); 896 } 897 if (cmd.hasOption(Options.COPY_FROM.getLongOpt())) { 898 inputRoot = new Path(cmd.getOptionValue(Options.COPY_FROM.getLongOpt())); 899 } 900 mappers = getOptionAsInt(cmd, Options.MAPPERS.getLongOpt(), mappers); 901 filesUser = cmd.getOptionValue(Options.CHUSER.getLongOpt(), filesUser); 902 filesGroup = cmd.getOptionValue(Options.CHGROUP.getLongOpt(), filesGroup); 903 filesMode = getOptionAsInt(cmd, Options.CHMOD.getLongOpt(), filesMode); 904 bandwidthMB = getOptionAsInt(cmd, Options.BANDWIDTH.getLongOpt(), bandwidthMB); 905 overwrite = cmd.hasOption(Options.OVERWRITE.getLongOpt()); 906 // And verifyChecksum and verifyTarget with values read from old args in processOldArgs(...). 907 verifyChecksum = !cmd.hasOption(Options.NO_CHECKSUM_VERIFY.getLongOpt()); 908 verifyTarget = !cmd.hasOption(Options.NO_TARGET_VERIFY.getLongOpt()); 909 } 910 911 /** 912 * Execute the export snapshot by copying the snapshot metadata, hfiles and wals. 913 * @return 0 on success, and != 0 upon failure. 914 */ 915 @Override 916 public int doWork() throws IOException { 917 Configuration conf = getConf(); 918 919 // Check user options 920 if (snapshotName == null) { 921 System.err.println("Snapshot name not provided."); 922 LOG.error("Use -h or --help for usage instructions."); 923 return 0; 924 } 925 926 if (outputRoot == null) { 927 System.err.println("Destination file-system (--" + Options.COPY_TO.getLongOpt() 928 + ") not provided."); 929 LOG.error("Use -h or --help for usage instructions."); 930 return 0; 931 } 932 933 if (targetName == null) { 934 targetName = snapshotName; 935 } 936 if (inputRoot == null) { 937 inputRoot = FSUtils.getRootDir(conf); 938 } else { 939 FSUtils.setRootDir(conf, inputRoot); 940 } 941 942 Configuration srcConf = HBaseConfiguration.createClusterConf(conf, null, CONF_SOURCE_PREFIX); 943 srcConf.setBoolean("fs." + inputRoot.toUri().getScheme() + ".impl.disable.cache", true); 944 FileSystem inputFs = FileSystem.get(inputRoot.toUri(), srcConf); 945 LOG.debug("inputFs=" + inputFs.getUri().toString() + " inputRoot=" + inputRoot); 946 Configuration destConf = HBaseConfiguration.createClusterConf(conf, null, CONF_DEST_PREFIX); 947 destConf.setBoolean("fs." + outputRoot.toUri().getScheme() + ".impl.disable.cache", true); 948 FileSystem outputFs = FileSystem.get(outputRoot.toUri(), destConf); 949 LOG.debug("outputFs=" + outputFs.getUri().toString() + " outputRoot=" + outputRoot.toString()); 950 951 boolean skipTmp = conf.getBoolean(CONF_SKIP_TMP, false); 952 953 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, inputRoot); 954 Path snapshotTmpDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(targetName, outputRoot); 955 Path outputSnapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(targetName, outputRoot); 956 Path initialOutputSnapshotDir = skipTmp ? outputSnapshotDir : snapshotTmpDir; 957 958 // Find the necessary directory which need to change owner and group 959 Path needSetOwnerDir = SnapshotDescriptionUtils.getSnapshotRootDir(outputRoot); 960 if (outputFs.exists(needSetOwnerDir)) { 961 if (skipTmp) { 962 needSetOwnerDir = outputSnapshotDir; 963 } else { 964 needSetOwnerDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(outputRoot); 965 if (outputFs.exists(needSetOwnerDir)) { 966 needSetOwnerDir = snapshotTmpDir; 967 } 968 } 969 } 970 971 // Check if the snapshot already exists 972 if (outputFs.exists(outputSnapshotDir)) { 973 if (overwrite) { 974 if (!outputFs.delete(outputSnapshotDir, true)) { 975 System.err.println("Unable to remove existing snapshot directory: " + outputSnapshotDir); 976 return 1; 977 } 978 } else { 979 System.err.println("The snapshot '" + targetName + 980 "' already exists in the destination: " + outputSnapshotDir); 981 return 1; 982 } 983 } 984 985 if (!skipTmp) { 986 // Check if the snapshot already in-progress 987 if (outputFs.exists(snapshotTmpDir)) { 988 if (overwrite) { 989 if (!outputFs.delete(snapshotTmpDir, true)) { 990 System.err.println("Unable to remove existing snapshot tmp directory: "+snapshotTmpDir); 991 return 1; 992 } 993 } else { 994 System.err.println("A snapshot with the same name '"+ targetName +"' may be in-progress"); 995 System.err.println("Please check "+snapshotTmpDir+". If the snapshot has completed, "); 996 System.err.println("consider removing "+snapshotTmpDir+" by using the -overwrite option"); 997 return 1; 998 } 999 } 1000 } 1001 1002 // Step 1 - Copy fs1:/.snapshot/<snapshot> to fs2:/.snapshot/.tmp/<snapshot> 1003 // The snapshot references must be copied before the hfiles otherwise the cleaner 1004 // will remove them because they are unreferenced. 1005 try { 1006 LOG.info("Copy Snapshot Manifest"); 1007 FileUtil.copy(inputFs, snapshotDir, outputFs, initialOutputSnapshotDir, false, false, conf); 1008 } catch (IOException e) { 1009 throw new ExportSnapshotException("Failed to copy the snapshot directory: from=" + 1010 snapshotDir + " to=" + initialOutputSnapshotDir, e); 1011 } finally { 1012 if (filesUser != null || filesGroup != null) { 1013 LOG.warn((filesUser == null ? "" : "Change the owner of " + needSetOwnerDir + " to " 1014 + filesUser) 1015 + (filesGroup == null ? "" : ", Change the group of " + needSetOwnerDir + " to " 1016 + filesGroup)); 1017 setOwner(outputFs, needSetOwnerDir, filesUser, filesGroup, true); 1018 } 1019 if (filesMode > 0) { 1020 LOG.warn("Change the permission of " + needSetOwnerDir + " to " + filesMode); 1021 setPermission(outputFs, needSetOwnerDir, (short)filesMode, true); 1022 } 1023 } 1024 1025 // Write a new .snapshotinfo if the target name is different from the source name 1026 if (!targetName.equals(snapshotName)) { 1027 SnapshotDescription snapshotDesc = 1028 SnapshotDescriptionUtils.readSnapshotInfo(inputFs, snapshotDir) 1029 .toBuilder() 1030 .setName(targetName) 1031 .build(); 1032 SnapshotDescriptionUtils.writeSnapshotInfo(snapshotDesc, initialOutputSnapshotDir, outputFs); 1033 if (filesUser != null || filesGroup != null) { 1034 outputFs.setOwner(new Path(initialOutputSnapshotDir, 1035 SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), filesUser, filesGroup); 1036 } 1037 if (filesMode > 0) { 1038 outputFs.setPermission(new Path(initialOutputSnapshotDir, 1039 SnapshotDescriptionUtils.SNAPSHOTINFO_FILE), new FsPermission((short)filesMode)); 1040 } 1041 } 1042 1043 // Step 2 - Start MR Job to copy files 1044 // The snapshot references must be copied before the files otherwise the files gets removed 1045 // by the HFileArchiver, since they have no references. 1046 try { 1047 runCopyJob(inputRoot, outputRoot, snapshotName, snapshotDir, verifyChecksum, 1048 filesUser, filesGroup, filesMode, mappers, bandwidthMB); 1049 1050 LOG.info("Finalize the Snapshot Export"); 1051 if (!skipTmp) { 1052 // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> 1053 if (!outputFs.rename(snapshotTmpDir, outputSnapshotDir)) { 1054 throw new ExportSnapshotException("Unable to rename snapshot directory from=" + 1055 snapshotTmpDir + " to=" + outputSnapshotDir); 1056 } 1057 } 1058 1059 // Step 4 - Verify snapshot integrity 1060 if (verifyTarget) { 1061 LOG.info("Verify snapshot integrity"); 1062 verifySnapshot(destConf, outputFs, outputRoot, outputSnapshotDir); 1063 } 1064 1065 LOG.info("Export Completed: " + targetName); 1066 return 0; 1067 } catch (Exception e) { 1068 LOG.error("Snapshot export failed", e); 1069 if (!skipTmp) { 1070 outputFs.delete(snapshotTmpDir, true); 1071 } 1072 outputFs.delete(outputSnapshotDir, true); 1073 return 1; 1074 } finally { 1075 IOUtils.closeStream(inputFs); 1076 IOUtils.closeStream(outputFs); 1077 } 1078 } 1079 1080 @Override 1081 protected void printUsage() { 1082 super.printUsage(); 1083 System.out.println("\n" 1084 + "Examples:\n" 1085 + " hbase snapshot export \\\n" 1086 + " --snapshot MySnapshot --copy-to hdfs://srv2:8082/hbase \\\n" 1087 + " --chuser MyUser --chgroup MyGroup --chmod 700 --mappers 16\n" 1088 + "\n" 1089 + " hbase snapshot export \\\n" 1090 + " --snapshot MySnapshot --copy-from hdfs://srv2:8082/hbase \\\n" 1091 + " --copy-to hdfs://srv1:50070/hbase"); 1092 } 1093 1094 @Override protected void addOptions() { 1095 addRequiredOption(Options.SNAPSHOT); 1096 addOption(Options.COPY_TO); 1097 addOption(Options.COPY_FROM); 1098 addOption(Options.TARGET_NAME); 1099 addOption(Options.NO_CHECKSUM_VERIFY); 1100 addOption(Options.NO_TARGET_VERIFY); 1101 addOption(Options.OVERWRITE); 1102 addOption(Options.CHUSER); 1103 addOption(Options.CHGROUP); 1104 addOption(Options.CHMOD); 1105 addOption(Options.MAPPERS); 1106 addOption(Options.BANDWIDTH); 1107 } 1108 1109 public static void main(String[] args) { 1110 new ExportSnapshot().doStaticMain(args); 1111 } 1112}