001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.ByteArrayOutputStream; 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.lang.reflect.InvocationTargetException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.UUID; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HDFSBlocksDistribution; 032import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight; 033import org.apache.hadoop.hbase.HRegionInfo; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.PrivateCellUtil; 036import org.apache.hadoop.hbase.client.ClientSideRegionScanner; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.client.ConnectionFactory; 039import org.apache.hadoop.hbase.client.IsolationLevel; 040import org.apache.hadoop.hbase.client.RegionLocator; 041import org.apache.hadoop.hbase.client.Result; 042import org.apache.hadoop.hbase.client.Scan; 043import org.apache.hadoop.hbase.client.Scan.ReadType; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 046import org.apache.hadoop.hbase.regionserver.HRegion; 047import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 049import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.CommonFSUtils; 052import org.apache.hadoop.hbase.util.RegionSplitter; 053import org.apache.hadoop.io.Writable; 054import org.apache.hadoop.mapreduce.Job; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest; 065 066/** 067 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots. 068 */ 069@InterfaceAudience.Private 070public class TableSnapshotInputFormatImpl { 071 // TODO: Snapshots files are owned in fs by the hbase user. There is no 072 // easy way to delegate access. 073 074 public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class); 075 076 private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name"; 077 // key for specifying the root dir of the restored snapshot 078 protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir"; 079 080 /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */ 081 private static final String LOCALITY_CUTOFF_MULTIPLIER = 082 "hbase.tablesnapshotinputformat.locality.cutoff.multiplier"; 083 private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f; 084 085 /** 086 * For MapReduce jobs running multiple mappers per region, determines what split algorithm we 087 * should be using to find split points for scanners. 088 */ 089 public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm"; 090 /** 091 * For MapReduce jobs running multiple mappers per region, determines number of splits to generate 092 * per region. 093 */ 094 public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region"; 095 096 /** 097 * Whether to calculate the block location for splits. Default to true. If the computing layer 098 * runs outside of HBase cluster, the block locality does not master. Setting this value to false 099 * could skip the calculation and save some time. Set access modifier to "public" so that these 100 * could be accessed by test classes of both org.apache.hadoop.hbase.mapred and 101 * org.apache.hadoop.hbase.mapreduce. 102 */ 103 public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY = 104 "hbase.TableSnapshotInputFormat.locality.enabled"; 105 public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true; 106 107 /** 108 * Whether to calculate the Snapshot region location by region location from meta. It is much 109 * faster than computing block locations for splits. 110 */ 111 public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION = 112 "hbase.TableSnapshotInputFormat.locality.by.region.location"; 113 114 public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false; 115 116 /** 117 * In some scenario, scan limited rows on each InputSplit for sampling data extraction 118 */ 119 public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT = 120 "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit"; 121 122 /** 123 * Whether to enable scan metrics on Scan, default to true 124 */ 125 public static final String SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED = 126 "hbase.TableSnapshotInputFormat.scan_metrics.enabled"; 127 128 public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true; 129 130 /** 131 * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot, 132 * default STREAM. 133 */ 134 public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE = 135 "hbase.TableSnapshotInputFormat.scanner.readtype"; 136 public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM; 137 138 /** 139 * Implementation class for InputSplit logic common between mapred and mapreduce. 140 */ 141 public static class InputSplit implements Writable { 142 143 private TableDescriptor htd; 144 private HRegionInfo regionInfo; 145 private String[] locations; 146 private String scan; 147 private String restoreDir; 148 149 // constructor for mapreduce framework / Writable 150 public InputSplit() { 151 } 152 153 public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations, 154 Scan scan, Path restoreDir) { 155 this.htd = htd; 156 this.regionInfo = regionInfo; 157 if (locations == null || locations.isEmpty()) { 158 this.locations = new String[0]; 159 } else { 160 this.locations = locations.toArray(new String[locations.size()]); 161 } 162 try { 163 this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : ""; 164 } catch (IOException e) { 165 LOG.warn("Failed to convert Scan to String", e); 166 } 167 168 this.restoreDir = restoreDir.toString(); 169 } 170 171 public TableDescriptor getHtd() { 172 return htd; 173 } 174 175 public String getScan() { 176 return scan; 177 } 178 179 public String getRestoreDir() { 180 return restoreDir; 181 } 182 183 public long getLength() { 184 // TODO: We can obtain the file sizes of the snapshot here. 185 return 0; 186 } 187 188 public String[] getLocations() { 189 return locations; 190 } 191 192 public TableDescriptor getTableDescriptor() { 193 return htd; 194 } 195 196 public HRegionInfo getRegionInfo() { 197 return regionInfo; 198 } 199 200 // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of 201 // doing this wrapping with Writables. 202 @Override 203 public void write(DataOutput out) throws IOException { 204 TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder() 205 .setTable(ProtobufUtil.toTableSchema(htd)).setRegion(HRegionInfo.convert(regionInfo)); 206 207 for (String location : locations) { 208 builder.addLocations(location); 209 } 210 211 TableSnapshotRegionSplit split = builder.build(); 212 213 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 214 split.writeTo(baos); 215 baos.close(); 216 byte[] buf = baos.toByteArray(); 217 out.writeInt(buf.length); 218 out.write(buf); 219 220 Bytes.writeByteArray(out, Bytes.toBytes(scan)); 221 Bytes.writeByteArray(out, Bytes.toBytes(restoreDir)); 222 223 } 224 225 @Override 226 public void readFields(DataInput in) throws IOException { 227 int len = in.readInt(); 228 byte[] buf = new byte[len]; 229 in.readFully(buf); 230 TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf); 231 this.htd = ProtobufUtil.toTableDescriptor(split.getTable()); 232 this.regionInfo = HRegionInfo.convert(split.getRegion()); 233 List<String> locationsList = split.getLocationsList(); 234 this.locations = locationsList.toArray(new String[locationsList.size()]); 235 236 this.scan = Bytes.toString(Bytes.readByteArray(in)); 237 this.restoreDir = Bytes.toString(Bytes.readByteArray(in)); 238 } 239 } 240 241 /** 242 * Implementation class for RecordReader logic common between mapred and mapreduce. 243 */ 244 public static class RecordReader { 245 private InputSplit split; 246 private Scan scan; 247 private Result result = null; 248 private ImmutableBytesWritable row = null; 249 private ClientSideRegionScanner scanner; 250 private int numOfCompleteRows = 0; 251 private int rowLimitPerSplit; 252 253 public ClientSideRegionScanner getScanner() { 254 return scanner; 255 } 256 257 public void initialize(InputSplit split, Configuration conf) throws IOException { 258 this.scan = TableMapReduceUtil.convertStringToScan(split.getScan()); 259 this.split = split; 260 this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0); 261 TableDescriptor htd = split.htd; 262 HRegionInfo hri = this.split.getRegionInfo(); 263 FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf); 264 265 // region is immutable, this should be fine, 266 // otherwise we have to set the thread read point 267 scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); 268 // disable caching of data blocks 269 scan.setCacheBlocks(false); 270 271 scanner = 272 new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null); 273 } 274 275 public boolean nextKeyValue() throws IOException { 276 result = scanner.next(); 277 if (result == null) { 278 // we are done 279 return false; 280 } 281 282 if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) { 283 return false; 284 } 285 if (this.row == null) { 286 this.row = new ImmutableBytesWritable(); 287 } 288 this.row.set(result.getRow()); 289 return true; 290 } 291 292 public ImmutableBytesWritable getCurrentKey() { 293 return row; 294 } 295 296 public Result getCurrentValue() { 297 return result; 298 } 299 300 public long getPos() { 301 return 0; 302 } 303 304 public float getProgress() { 305 return 0; // TODO: use total bytes to estimate 306 } 307 308 public void close() { 309 if (this.scanner != null) { 310 this.scanner.close(); 311 } 312 } 313 } 314 315 public static List<InputSplit> getSplits(Configuration conf) throws IOException { 316 String snapshotName = getSnapshotName(conf); 317 318 Path rootDir = CommonFSUtils.getRootDir(conf); 319 FileSystem fs = rootDir.getFileSystem(conf); 320 321 SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs); 322 323 List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest); 324 325 // TODO: mapred does not support scan as input API. Work around for now. 326 Scan scan = extractScanFromConf(conf); 327 // the temp dir where the snapshot is restored 328 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); 329 330 RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf); 331 332 int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1); 333 334 return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits); 335 } 336 337 public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException { 338 String splitAlgoClassName = conf.get(SPLIT_ALGO); 339 if (splitAlgoClassName == null) { 340 return null; 341 } 342 try { 343 return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class) 344 .getDeclaredConstructor().newInstance(); 345 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException 346 | NoSuchMethodException | InvocationTargetException e) { 347 throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e); 348 } 349 } 350 351 public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) { 352 List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests(); 353 if (regionManifests == null) { 354 throw new IllegalArgumentException("Snapshot seems empty"); 355 } 356 357 List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size()); 358 359 for (SnapshotRegionManifest regionManifest : regionManifests) { 360 HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo()); 361 if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) { 362 continue; 363 } 364 regionInfos.add(hri); 365 } 366 return regionInfos; 367 } 368 369 public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName, 370 Path rootDir, FileSystem fs) throws IOException { 371 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 372 SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 373 return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); 374 } 375 376 public static Scan extractScanFromConf(Configuration conf) throws IOException { 377 Scan scan = null; 378 if (conf.get(TableInputFormat.SCAN) != null) { 379 scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN)); 380 } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) { 381 String[] columns = 382 conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" "); 383 scan = new Scan(); 384 for (String col : columns) { 385 scan.addFamily(Bytes.toBytes(col)); 386 } 387 } else { 388 throw new IllegalArgumentException("Unable to create scan"); 389 } 390 391 if (scan.getReadType() == ReadType.DEFAULT) { 392 LOG.info( 393 "Provided Scan has DEFAULT ReadType," + " updating STREAM for Snapshot-based InputFormat"); 394 // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case. 395 scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, 396 SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT)); 397 } 398 399 return scan; 400 } 401 402 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 403 List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException { 404 return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1); 405 } 406 407 public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest, 408 List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf, 409 RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException { 410 // load table descriptor 411 TableDescriptor htd = manifest.getTableDescriptor(); 412 413 Path tableDir = CommonFSUtils.getTableDir(restoreDir, htd.getTableName()); 414 415 boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY, 416 SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT); 417 418 boolean scanMetricsEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED, 419 SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT); 420 scan.setScanMetricsEnabled(scanMetricsEnabled); 421 422 boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, 423 SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT); 424 425 Connection connection = null; 426 RegionLocator regionLocator = null; 427 if (localityEnabled && useRegionLoc) { 428 Configuration newConf = new Configuration(conf); 429 newConf.setInt("hbase.hconnection.threads.max", 1); 430 try { 431 connection = ConnectionFactory.createConnection(newConf); 432 regionLocator = connection.getRegionLocator(htd.getTableName()); 433 434 /* Get all locations for the table and cache it */ 435 regionLocator.getAllRegionLocations(); 436 } finally { 437 if (connection != null) { 438 connection.close(); 439 } 440 } 441 } 442 443 List<InputSplit> splits = new ArrayList<>(); 444 for (HRegionInfo hri : regionManifests) { 445 // load region descriptor 446 List<String> hosts = null; 447 if (localityEnabled) { 448 if (regionLocator != null) { 449 /* Get Location from the local cache */ 450 HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false); 451 452 hosts = new ArrayList<>(1); 453 hosts.add(location.getHostname()); 454 } else { 455 hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir); 456 } 457 } 458 459 if (numSplits > 1) { 460 byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); 461 for (int i = 0; i < sp.length - 1; i++) { 462 if ( 463 PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1]) 464 ) { 465 466 Scan boundedScan = new Scan(scan); 467 if (scan.getStartRow().length == 0) { 468 boundedScan.withStartRow(sp[i]); 469 } else { 470 boundedScan.withStartRow( 471 Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); 472 } 473 474 if (scan.getStopRow().length == 0) { 475 boundedScan.withStopRow(sp[i + 1]); 476 } else { 477 boundedScan.withStopRow( 478 Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); 479 } 480 481 splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); 482 } 483 } 484 } else { 485 if ( 486 PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), 487 hri.getEndKey()) 488 ) { 489 490 splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); 491 } 492 } 493 } 494 495 return splits; 496 } 497 498 /** 499 * Compute block locations for snapshot files (which will get the locations for referred hfiles) 500 * only when localityEnabled is true. 501 */ 502 private static List<String> calculateLocationsForInputSplit(Configuration conf, 503 TableDescriptor htd, HRegionInfo hri, Path tableDir) throws IOException { 504 return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir)); 505 } 506 507 /** 508 * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take 509 * weights into account, thus will treat every location passed from the input split as equal. We 510 * do not want to blindly pass all the locations, since we are creating one split per region, and 511 * the region's blocks are all distributed throughout the cluster unless favorite node assignment 512 * is used. On the expected stable case, only one location will contain most of the blocks as 513 * local. On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. 514 * Here we are doing a simple heuristic, where we will pass all hosts which have at least 80% 515 * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top 516 * host with the best locality. Return at most numTopsAtMost locations if there are more than 517 * that. 518 */ 519 private static List<String> getBestLocations(Configuration conf, 520 HDFSBlocksDistribution blockDistribution, int numTopsAtMost) { 521 HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights(); 522 523 if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is 524 return null; 525 } 526 527 if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1 528 numTopsAtMost = 1; 529 } 530 int top = Math.min(numTopsAtMost, hostAndWeights.length); 531 List<String> locations = new ArrayList<>(top); 532 HostAndWeight topHost = hostAndWeights[0]; 533 locations.add(topHost.getHost()); 534 535 if (top == 1) { // only care about the top host 536 return locations; 537 } 538 539 // When top >= 2, 540 // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality 541 double cutoffMultiplier = 542 conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER); 543 544 double filterWeight = topHost.getWeight() * cutoffMultiplier; 545 546 for (int i = 1; i <= top - 1; i++) { 547 if (hostAndWeights[i].getWeight() >= filterWeight) { 548 locations.add(hostAndWeights[i].getHost()); 549 } else { 550 // As hostAndWeights is in descending order, 551 // we could break the loop as long as we meet a weight which is less than filterWeight. 552 break; 553 } 554 } 555 556 return locations; 557 } 558 559 public static List<String> getBestLocations(Configuration conf, 560 HDFSBlocksDistribution blockDistribution) { 561 // 3 nodes will contain highly local blocks. So default to 3. 562 return getBestLocations(conf, blockDistribution, 3); 563 } 564 565 private static String getSnapshotName(Configuration conf) { 566 String snapshotName = conf.get(SNAPSHOT_NAME_KEY); 567 if (snapshotName == null) { 568 throw new IllegalArgumentException("Snapshot name must be provided"); 569 } 570 return snapshotName; 571 } 572 573 /** 574 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 575 * @param conf the job to configuration 576 * @param snapshotName the name of the snapshot to read from 577 * @param restoreDir a temporary directory to restore the snapshot into. Current user should 578 * have write permissions to this directory, and this should not be a 579 * subdirectory of rootdir. After the job is finished, restoreDir can be 580 * deleted. 581 * @throws IOException if an error occurs 582 */ 583 public static void setInput(Configuration conf, String snapshotName, Path restoreDir) 584 throws IOException { 585 setInput(conf, snapshotName, restoreDir, null, 1); 586 } 587 588 /** 589 * Configures the job to use TableSnapshotInputFormat to read from a snapshot. 590 * @param conf the job to configure 591 * @param snapshotName the name of the snapshot to read from 592 * @param restoreDir a temporary directory to restore the snapshot into. Current user 593 * should have write permissions to this directory, and this should not 594 * be a subdirectory of rootdir. After the job is finished, restoreDir 595 * can be deleted. 596 * @param numSplitsPerRegion how many input splits to generate per one region 597 * @param splitAlgo SplitAlgorithm to be used when generating InputSplits 598 * @throws IOException if an error occurs 599 */ 600 public static void setInput(Configuration conf, String snapshotName, Path restoreDir, 601 RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException { 602 conf.set(SNAPSHOT_NAME_KEY, snapshotName); 603 if (numSplitsPerRegion < 1) { 604 throw new IllegalArgumentException( 605 "numSplits must be >= 1, " + "illegal numSplits : " + numSplitsPerRegion); 606 } 607 if (splitAlgo == null && numSplitsPerRegion > 1) { 608 throw new IllegalArgumentException("Split algo can't be null when numSplits > 1"); 609 } 610 if (splitAlgo != null) { 611 conf.set(SPLIT_ALGO, splitAlgo.getClass().getName()); 612 } 613 conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion); 614 Path rootDir = CommonFSUtils.getRootDir(conf); 615 FileSystem fs = rootDir.getFileSystem(conf); 616 617 restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); 618 619 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 620 conf.set(RESTORE_DIR_KEY, restoreDir.toString()); 621 } 622 623 /** 624 * clean restore directory after snapshot scan job 625 * @param job the snapshot scan job 626 * @param snapshotName the name of the snapshot to read from 627 * @throws IOException if an error occurs 628 */ 629 public static void cleanRestoreDir(Job job, String snapshotName) throws IOException { 630 Configuration conf = job.getConfiguration(); 631 Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY)); 632 FileSystem fs = restoreDir.getFileSystem(conf); 633 if (!fs.exists(restoreDir)) { 634 LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir); 635 return; 636 } 637 if (!fs.delete(restoreDir, true)) { 638 LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName); 639 } 640 LOG.debug("Clean restore directory {} for {}", restoreDir, snapshotName); 641 } 642}