001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.ByteArrayOutputStream;
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.lang.reflect.InvocationTargetException;
025import java.util.ArrayList;
026import java.util.List;
027import java.util.UUID;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HDFSBlocksDistribution;
032import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.PrivateCellUtil;
035import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.ConnectionFactory;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.RegionInfo;
040import org.apache.hadoop.hbase.client.RegionLocator;
041import org.apache.hadoop.hbase.client.Result;
042import org.apache.hadoop.hbase.client.Scan;
043import org.apache.hadoop.hbase.client.Scan.ReadType;
044import org.apache.hadoop.hbase.client.TableDescriptor;
045import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
048import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
049import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.RegionSplitter;
053import org.apache.hadoop.io.Writable;
054import org.apache.hadoop.mapreduce.Job;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
065
066/**
067 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
068 */
069@InterfaceAudience.Private
070public class TableSnapshotInputFormatImpl {
071  // TODO: Snapshots files are owned in fs by the hbase user. There is no
072  // easy way to delegate access.
073
074  public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class);
075
076  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
077  // key for specifying the root dir of the restored snapshot
078  protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
079
080  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */
081  private static final String LOCALITY_CUTOFF_MULTIPLIER =
082    "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
083  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
084
085  /**
086   * For MapReduce jobs running multiple mappers per region, determines what split algorithm we
087   * should be using to find split points for scanners.
088   */
089  public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";
090  /**
091   * For MapReduce jobs running multiple mappers per region, determines number of splits to generate
092   * per region.
093   */
094  public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
095
096  /**
097   * Whether to calculate the block location for splits. Default to true. If the computing layer
098   * runs outside of HBase cluster, the block locality does not master. Setting this value to false
099   * could skip the calculation and save some time. Set access modifier to "public" so that these
100   * could be accessed by test classes of both org.apache.hadoop.hbase.mapred and
101   * org.apache.hadoop.hbase.mapreduce.
102   */
103  public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =
104    "hbase.TableSnapshotInputFormat.locality.enabled";
105  public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
106
107  /**
108   * Whether to calculate the Snapshot region location by region location from meta. It is much
109   * faster than computing block locations for splits.
110   */
111  public static final String SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION =
112    "hbase.TableSnapshotInputFormat.locality.by.region.location";
113
114  public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT = false;
115
116  /**
117   * In some scenario, scan limited rows on each InputSplit for sampling data extraction
118   */
119  public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =
120    "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";
121
122  /**
123   * Whether to enable scan metrics on Scan, default to true
124   */
125  public static final String SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED =
126    "hbase.TableSnapshotInputFormat.scan_metrics.enabled";
127
128  public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
129
130  /**
131   * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,
132   * default STREAM.
133   */
134  public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
135    "hbase.TableSnapshotInputFormat.scanner.readtype";
136  public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;
137
138  /**
139   * Implementation class for InputSplit logic common between mapred and mapreduce.
140   */
141  public static class InputSplit implements Writable {
142
143    private TableDescriptor htd;
144    private RegionInfo regionInfo;
145    private String[] locations;
146    private String scan;
147    private String restoreDir;
148
149    // constructor for mapreduce framework / Writable
150    public InputSplit() {
151    }
152
153    public InputSplit(TableDescriptor htd, RegionInfo regionInfo, List<String> locations, Scan scan,
154      Path restoreDir) {
155      this.htd = htd;
156      this.regionInfo = regionInfo;
157      if (locations == null || locations.isEmpty()) {
158        this.locations = new String[0];
159      } else {
160        this.locations = locations.toArray(new String[locations.size()]);
161      }
162      try {
163        this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
164      } catch (IOException e) {
165        LOG.warn("Failed to convert Scan to String", e);
166      }
167
168      this.restoreDir = restoreDir.toString();
169    }
170
171    public TableDescriptor getHtd() {
172      return htd;
173    }
174
175    public String getScan() {
176      return scan;
177    }
178
179    public String getRestoreDir() {
180      return restoreDir;
181    }
182
183    public long getLength() {
184      // TODO: We can obtain the file sizes of the snapshot here.
185      return 0;
186    }
187
188    public String[] getLocations() {
189      return locations;
190    }
191
192    public TableDescriptor getTableDescriptor() {
193      return htd;
194    }
195
196    public RegionInfo getRegionInfo() {
197      return regionInfo;
198    }
199
200    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
201    // doing this wrapping with Writables.
202    @Override
203    public void write(DataOutput out) throws IOException {
204      TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
205        .setTable(ProtobufUtil.toTableSchema(htd)).setRegion(ProtobufUtil.toRegionInfo(regionInfo));
206
207      for (String location : locations) {
208        builder.addLocations(location);
209      }
210
211      TableSnapshotRegionSplit split = builder.build();
212
213      ByteArrayOutputStream baos = new ByteArrayOutputStream();
214      split.writeTo(baos);
215      baos.close();
216      byte[] buf = baos.toByteArray();
217      out.writeInt(buf.length);
218      out.write(buf);
219
220      Bytes.writeByteArray(out, Bytes.toBytes(scan));
221      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
222
223    }
224
225    @Override
226    public void readFields(DataInput in) throws IOException {
227      int len = in.readInt();
228      byte[] buf = new byte[len];
229      in.readFully(buf);
230      TableSnapshotRegionSplit split = TableSnapshotRegionSplit.parser().parseFrom(buf);
231      this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
232      this.regionInfo = ProtobufUtil.toRegionInfo(split.getRegion());
233      List<String> locationsList = split.getLocationsList();
234      this.locations = locationsList.toArray(new String[locationsList.size()]);
235
236      this.scan = Bytes.toString(Bytes.readByteArray(in));
237      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
238    }
239  }
240
241  /**
242   * Implementation class for RecordReader logic common between mapred and mapreduce.
243   */
244  public static class RecordReader {
245    private InputSplit split;
246    private Scan scan;
247    private Result result = null;
248    private ImmutableBytesWritable row = null;
249    private ClientSideRegionScanner scanner;
250    private int numOfCompleteRows = 0;
251    private int rowLimitPerSplit;
252
253    public ClientSideRegionScanner getScanner() {
254      return scanner;
255    }
256
257    public void initialize(InputSplit split, Configuration conf) throws IOException {
258      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
259      this.split = split;
260      this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
261      TableDescriptor htd = split.htd;
262      RegionInfo hri = this.split.getRegionInfo();
263      FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
264
265      // region is immutable, this should be fine,
266      // otherwise we have to set the thread read point
267      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
268      // disable caching of data blocks
269      scan.setCacheBlocks(false);
270
271      scanner =
272        new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
273    }
274
275    public boolean nextKeyValue() throws IOException {
276      result = scanner.next();
277      if (result == null) {
278        // we are done
279        return false;
280      }
281
282      if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) {
283        return false;
284      }
285      if (this.row == null) {
286        this.row = new ImmutableBytesWritable();
287      }
288      this.row.set(result.getRow());
289      return true;
290    }
291
292    public ImmutableBytesWritable getCurrentKey() {
293      return row;
294    }
295
296    public Result getCurrentValue() {
297      return result;
298    }
299
300    public long getPos() {
301      return 0;
302    }
303
304    public float getProgress() {
305      return 0; // TODO: use total bytes to estimate
306    }
307
308    public void close() {
309      if (this.scanner != null) {
310        this.scanner.close();
311      }
312    }
313  }
314
315  public static List<InputSplit> getSplits(Configuration conf) throws IOException {
316    String snapshotName = getSnapshotName(conf);
317
318    Path rootDir = CommonFSUtils.getRootDir(conf);
319    FileSystem fs = rootDir.getFileSystem(conf);
320
321    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
322
323    List<RegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
324
325    // TODO: mapred does not support scan as input API. Work around for now.
326    Scan scan = extractScanFromConf(conf);
327    // the temp dir where the snapshot is restored
328    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
329
330    RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);
331
332    int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);
333
334    return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
335  }
336
337  public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
338    String splitAlgoClassName = conf.get(SPLIT_ALGO);
339    if (splitAlgoClassName == null) {
340      return null;
341    }
342    try {
343      return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
344        .getDeclaredConstructor().newInstance();
345    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
346      | NoSuchMethodException | InvocationTargetException e) {
347      throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e);
348    }
349  }
350
351  public static List<RegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
352    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
353    if (regionManifests == null) {
354      throw new IllegalArgumentException("Snapshot seems empty");
355    }
356
357    List<RegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
358
359    for (SnapshotRegionManifest regionManifest : regionManifests) {
360      RegionInfo hri = ProtobufUtil.toRegionInfo(regionManifest.getRegionInfo());
361      if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
362        continue;
363      }
364      regionInfos.add(hri);
365    }
366    return regionInfos;
367  }
368
369  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
370    Path rootDir, FileSystem fs) throws IOException {
371    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
372    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
373    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
374  }
375
376  public static Scan extractScanFromConf(Configuration conf) throws IOException {
377    Scan scan = null;
378    if (conf.get(TableInputFormat.SCAN) != null) {
379      scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
380    } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
381      String[] columns =
382        conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
383      scan = new Scan();
384      for (String col : columns) {
385        scan.addFamily(Bytes.toBytes(col));
386      }
387    } else {
388      throw new IllegalArgumentException("Unable to create scan");
389    }
390
391    if (scan.getReadType() == ReadType.DEFAULT) {
392      LOG.info(
393        "Provided Scan has DEFAULT ReadType," + " updating STREAM for Snapshot-based InputFormat");
394      // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.
395      scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
396        SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
397    }
398
399    return scan;
400  }
401
402  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
403    List<RegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
404    return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);
405  }
406
407  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
408    List<RegionInfo> regionManifests, Path restoreDir, Configuration conf,
409    RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
410    // load table descriptor
411    TableDescriptor htd = manifest.getTableDescriptor();
412
413    Path tableDir = CommonFSUtils.getTableDir(restoreDir, htd.getTableName());
414
415    boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
416      SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
417
418    boolean scanMetricsEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED,
419      SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT);
420    scan.setScanMetricsEnabled(scanMetricsEnabled);
421
422    boolean useRegionLoc = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION,
423      SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT);
424
425    Connection connection = null;
426    RegionLocator regionLocator = null;
427    if (localityEnabled && useRegionLoc) {
428      Configuration newConf = new Configuration(conf);
429      newConf.setInt("hbase.hconnection.threads.max", 1);
430      try {
431        connection = ConnectionFactory.createConnection(newConf);
432        regionLocator = connection.getRegionLocator(htd.getTableName());
433
434        /* Get all locations for the table and cache it */
435        regionLocator.getAllRegionLocations();
436      } finally {
437        if (connection != null) {
438          connection.close();
439        }
440      }
441    }
442
443    List<InputSplit> splits = new ArrayList<>();
444    for (RegionInfo hri : regionManifests) {
445      // load region descriptor
446      List<String> hosts = null;
447      if (localityEnabled) {
448        if (regionLocator != null) {
449          /* Get Location from the local cache */
450          HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false);
451
452          hosts = new ArrayList<>(1);
453          hosts.add(location.getHostname());
454        } else {
455          hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir);
456        }
457      }
458
459      if (numSplits > 1) {
460        byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
461        for (int i = 0; i < sp.length - 1; i++) {
462          if (
463            PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1])
464          ) {
465
466            Scan boundedScan = new Scan(scan);
467            if (scan.getStartRow().length == 0) {
468              boundedScan.withStartRow(sp[i]);
469            } else {
470              boundedScan.withStartRow(
471                Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
472            }
473
474            if (scan.getStopRow().length == 0) {
475              boundedScan.withStopRow(sp[i + 1]);
476            } else {
477              boundedScan.withStopRow(
478                Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
479            }
480
481            splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
482          }
483        }
484      } else {
485        if (
486          PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(),
487            hri.getEndKey())
488        ) {
489
490          splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
491        }
492      }
493    }
494
495    return splits;
496  }
497
498  /**
499   * Compute block locations for snapshot files (which will get the locations for referred hfiles)
500   * only when localityEnabled is true.
501   */
502  private static List<String> calculateLocationsForInputSplit(Configuration conf,
503    TableDescriptor htd, RegionInfo hri, Path tableDir) throws IOException {
504    return getBestLocations(conf, HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
505  }
506
507  /**
508   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
509   * weights into account, thus will treat every location passed from the input split as equal. We
510   * do not want to blindly pass all the locations, since we are creating one split per region, and
511   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
512   * is used. On the expected stable case, only one location will contain most of the blocks as
513   * local. On the other hand, in favored node assignment, 3 nodes will contain highly local blocks.
514   * Here we are doing a simple heuristic, where we will pass all hosts which have at least 80%
515   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
516   * host with the best locality. Return at most numTopsAtMost locations if there are more than
517   * that.
518   */
519  private static List<String> getBestLocations(Configuration conf,
520    HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {
521    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
522
523    if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is
524      return null;
525    }
526
527    if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1
528      numTopsAtMost = 1;
529    }
530    int top = Math.min(numTopsAtMost, hostAndWeights.length);
531    List<String> locations = new ArrayList<>(top);
532    HostAndWeight topHost = hostAndWeights[0];
533    locations.add(topHost.getHost());
534
535    if (top == 1) { // only care about the top host
536      return locations;
537    }
538
539    // When top >= 2,
540    // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
541    double cutoffMultiplier =
542      conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
543
544    double filterWeight = topHost.getWeight() * cutoffMultiplier;
545
546    for (int i = 1; i <= top - 1; i++) {
547      if (hostAndWeights[i].getWeight() >= filterWeight) {
548        locations.add(hostAndWeights[i].getHost());
549      } else {
550        // As hostAndWeights is in descending order,
551        // we could break the loop as long as we meet a weight which is less than filterWeight.
552        break;
553      }
554    }
555
556    return locations;
557  }
558
559  public static List<String> getBestLocations(Configuration conf,
560    HDFSBlocksDistribution blockDistribution) {
561    // 3 nodes will contain highly local blocks. So default to 3.
562    return getBestLocations(conf, blockDistribution, 3);
563  }
564
565  private static String getSnapshotName(Configuration conf) {
566    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
567    if (snapshotName == null) {
568      throw new IllegalArgumentException("Snapshot name must be provided");
569    }
570    return snapshotName;
571  }
572
573  /**
574   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
575   * @param conf         the job to configuration
576   * @param snapshotName the name of the snapshot to read from
577   * @param restoreDir   a temporary directory to restore the snapshot into. Current user should
578   *                     have write permissions to this directory, and this should not be a
579   *                     subdirectory of rootdir. After the job is finished, restoreDir can be
580   *                     deleted.
581   * @throws IOException if an error occurs
582   */
583  public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
584    throws IOException {
585    setInput(conf, snapshotName, restoreDir, null, 1);
586  }
587
588  /**
589   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
590   * @param conf               the job to configure
591   * @param snapshotName       the name of the snapshot to read from
592   * @param restoreDir         a temporary directory to restore the snapshot into. Current user
593   *                           should have write permissions to this directory, and this should not
594   *                           be a subdirectory of rootdir. After the job is finished, restoreDir
595   *                           can be deleted.
596   * @param numSplitsPerRegion how many input splits to generate per one region
597   * @param splitAlgo          SplitAlgorithm to be used when generating InputSplits
598   * @throws IOException if an error occurs
599   */
600  public static void setInput(Configuration conf, String snapshotName, Path restoreDir,
601    RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
602    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
603    if (numSplitsPerRegion < 1) {
604      throw new IllegalArgumentException(
605        "numSplits must be >= 1, " + "illegal numSplits : " + numSplitsPerRegion);
606    }
607    if (splitAlgo == null && numSplitsPerRegion > 1) {
608      throw new IllegalArgumentException("Split algo can't be null when numSplits > 1");
609    }
610    if (splitAlgo != null) {
611      conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());
612    }
613    conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);
614    Path rootDir = CommonFSUtils.getRootDir(conf);
615    FileSystem fs = rootDir.getFileSystem(conf);
616
617    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
618
619    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
620    conf.set(RESTORE_DIR_KEY, restoreDir.toString());
621  }
622
623  /**
624   * clean restore directory after snapshot scan job
625   * @param job          the snapshot scan job
626   * @param snapshotName the name of the snapshot to read from
627   * @throws IOException if an error occurs
628   */
629  public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
630    Configuration conf = job.getConfiguration();
631    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
632    FileSystem fs = restoreDir.getFileSystem(conf);
633    if (!fs.exists(restoreDir)) {
634      LOG.warn("{} doesn't exist on file system, maybe it's already been cleaned", restoreDir);
635      return;
636    }
637    if (!fs.delete(restoreDir, true)) {
638      LOG.warn("Failed clean restore dir {} for snapshot {}", restoreDir, snapshotName);
639    }
640    LOG.debug("Clean restore directory {} for {}", restoreDir, snapshotName);
641  }
642}