001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.UUID;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HDFSBlocksDistribution;
033import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
034import org.apache.hadoop.hbase.HRegionInfo;
035import org.apache.hadoop.hbase.PrivateCellUtil;
036import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
037import org.apache.hadoop.hbase.client.IsolationLevel;
038import org.apache.hadoop.hbase.client.Result;
039import org.apache.hadoop.hbase.client.Scan;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
042import org.apache.hadoop.hbase.regionserver.HRegion;
043import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
044import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
045import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.CommonFSUtils;
048import org.apache.hadoop.hbase.util.RegionSplitter;
049import org.apache.hadoop.io.Writable;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
055
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
060
061/**
062 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
063 */
064@InterfaceAudience.Private
065public class TableSnapshotInputFormatImpl {
066  // TODO: Snapshots files are owned in fs by the hbase user. There is no
067  // easy way to delegate access.
068
069  public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class);
070
071  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
072  // key for specifying the root dir of the restored snapshot
073  protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
074
075  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */
076  private static final String LOCALITY_CUTOFF_MULTIPLIER =
077    "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
078  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
079
080  /**
081   * For MapReduce jobs running multiple mappers per region, determines
082   * what split algorithm we should be using to find split points for scanners.
083   */
084  public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";
085  /**
086   * For MapReduce jobs running multiple mappers per region, determines
087   * number of splits to generate per region.
088   */
089  public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
090
091  /**
092   * Whether to calculate the block location for splits. Default to true.
093   * If the computing layer runs outside of HBase cluster, the block locality does not master.
094   * Setting this value to false could skip the calculation and save some time.
095   *
096   * Set access modifier to "public" so that these could be accessed by test classes of
097   * both org.apache.hadoop.hbase.mapred
098   * and  org.apache.hadoop.hbase.mapreduce.
099   */
100  public static final String  SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =
101      "hbase.TableSnapshotInputFormat.locality.enabled";
102  public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
103
104  /**
105   * In some scenario, scan limited rows on each InputSplit for sampling data extraction
106   */
107  public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =
108      "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";
109
110  /**
111   * Implementation class for InputSplit logic common between mapred and mapreduce.
112   */
113  public static class InputSplit implements Writable {
114
115    private TableDescriptor htd;
116    private HRegionInfo regionInfo;
117    private String[] locations;
118    private String scan;
119    private String restoreDir;
120
121    // constructor for mapreduce framework / Writable
122    public InputSplit() {}
123
124    public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
125        Scan scan, Path restoreDir) {
126      this.htd = htd;
127      this.regionInfo = regionInfo;
128      if (locations == null || locations.isEmpty()) {
129        this.locations = new String[0];
130      } else {
131        this.locations = locations.toArray(new String[locations.size()]);
132      }
133      try {
134        this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
135      } catch (IOException e) {
136        LOG.warn("Failed to convert Scan to String", e);
137      }
138
139      this.restoreDir = restoreDir.toString();
140    }
141
142    public TableDescriptor getHtd() {
143      return htd;
144    }
145
146    public String getScan() {
147      return scan;
148    }
149
150    public String getRestoreDir() {
151      return restoreDir;
152    }
153
154    public long getLength() {
155      //TODO: We can obtain the file sizes of the snapshot here.
156      return 0;
157    }
158
159    public String[] getLocations() {
160      return locations;
161    }
162
163    public TableDescriptor getTableDescriptor() {
164      return htd;
165    }
166
167    public HRegionInfo getRegionInfo() {
168      return regionInfo;
169    }
170
171    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
172    // doing this wrapping with Writables.
173    @Override
174    public void write(DataOutput out) throws IOException {
175      TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
176          .setTable(ProtobufUtil.toTableSchema(htd))
177          .setRegion(HRegionInfo.convert(regionInfo));
178
179      for (String location : locations) {
180        builder.addLocations(location);
181      }
182
183      TableSnapshotRegionSplit split = builder.build();
184
185      ByteArrayOutputStream baos = new ByteArrayOutputStream();
186      split.writeTo(baos);
187      baos.close();
188      byte[] buf = baos.toByteArray();
189      out.writeInt(buf.length);
190      out.write(buf);
191
192      Bytes.writeByteArray(out, Bytes.toBytes(scan));
193      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
194
195    }
196
197    @Override
198    public void readFields(DataInput in) throws IOException {
199      int len = in.readInt();
200      byte[] buf = new byte[len];
201      in.readFully(buf);
202      TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
203      this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
204      this.regionInfo = HRegionInfo.convert(split.getRegion());
205      List<String> locationsList = split.getLocationsList();
206      this.locations = locationsList.toArray(new String[locationsList.size()]);
207
208      this.scan = Bytes.toString(Bytes.readByteArray(in));
209      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
210    }
211  }
212
213  /**
214   * Implementation class for RecordReader logic common between mapred and mapreduce.
215   */
216  public static class RecordReader {
217    private InputSplit split;
218    private Scan scan;
219    private Result result = null;
220    private ImmutableBytesWritable row = null;
221    private ClientSideRegionScanner scanner;
222    private int numOfCompleteRows = 0;
223    private int rowLimitPerSplit;
224
225    public ClientSideRegionScanner getScanner() {
226      return scanner;
227    }
228
229    public void initialize(InputSplit split, Configuration conf) throws IOException {
230      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
231      this.split = split;
232      this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
233      TableDescriptor htd = split.htd;
234      HRegionInfo hri = this.split.getRegionInfo();
235      FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
236
237
238      // region is immutable, this should be fine,
239      // otherwise we have to set the thread read point
240      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
241      // disable caching of data blocks
242      scan.setCacheBlocks(false);
243      scan.setScanMetricsEnabled(true);
244
245      scanner =
246          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
247    }
248
249    public boolean nextKeyValue() throws IOException {
250      result = scanner.next();
251      if (result == null) {
252        //we are done
253        return false;
254      }
255
256      if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) {
257        return false;
258      }
259      if (this.row == null) {
260        this.row = new ImmutableBytesWritable();
261      }
262      this.row.set(result.getRow());
263      return true;
264    }
265
266    public ImmutableBytesWritable getCurrentKey() {
267      return row;
268    }
269
270    public Result getCurrentValue() {
271      return result;
272    }
273
274    public long getPos() {
275      return 0;
276    }
277
278    public float getProgress() {
279      return 0; // TODO: use total bytes to estimate
280    }
281
282    public void close() {
283      if (this.scanner != null) {
284        this.scanner.close();
285      }
286    }
287  }
288
289  public static List<InputSplit> getSplits(Configuration conf) throws IOException {
290    String snapshotName = getSnapshotName(conf);
291
292    Path rootDir = CommonFSUtils.getRootDir(conf);
293    FileSystem fs = rootDir.getFileSystem(conf);
294
295    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
296
297    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
298
299    // TODO: mapred does not support scan as input API. Work around for now.
300    Scan scan = extractScanFromConf(conf);
301    // the temp dir where the snapshot is restored
302    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
303
304    RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);
305
306    int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);
307
308    return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
309  }
310
311  public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
312    String splitAlgoClassName = conf.get(SPLIT_ALGO);
313    if (splitAlgoClassName == null) {
314      return null;
315    }
316    try {
317      return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
318          .getDeclaredConstructor().newInstance();
319    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
320        NoSuchMethodException | InvocationTargetException e) {
321      throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e);
322    }
323  }
324
325
326  public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
327    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
328    if (regionManifests == null) {
329      throw new IllegalArgumentException("Snapshot seems empty");
330    }
331
332    List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
333
334    for (SnapshotRegionManifest regionManifest : regionManifests) {
335      HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
336      if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
337        continue;
338      }
339      regionInfos.add(hri);
340    }
341    return regionInfos;
342  }
343
344  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
345      Path rootDir, FileSystem fs) throws IOException {
346    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
347    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
348    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
349  }
350
351  public static Scan extractScanFromConf(Configuration conf) throws IOException {
352    Scan scan = null;
353    if (conf.get(TableInputFormat.SCAN) != null) {
354      scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
355    } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
356      String[] columns =
357        conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
358      scan = new Scan();
359      for (String col : columns) {
360        scan.addFamily(Bytes.toBytes(col));
361      }
362    } else {
363      throw new IllegalArgumentException("Unable to create scan");
364    }
365    return scan;
366  }
367
368  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
369      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
370    return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);
371  }
372
373  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
374      List<HRegionInfo> regionManifests, Path restoreDir,
375      Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
376    // load table descriptor
377    TableDescriptor htd = manifest.getTableDescriptor();
378
379    Path tableDir = CommonFSUtils.getTableDir(restoreDir, htd.getTableName());
380
381    boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
382                                              SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
383
384    List<InputSplit> splits = new ArrayList<>();
385    for (HRegionInfo hri : regionManifests) {
386      // load region descriptor
387
388      if (numSplits > 1) {
389        byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
390        for (int i = 0; i < sp.length - 1; i++) {
391          if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
392                  sp[i + 1])) {
393            List<String> hosts =
394                calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
395
396            Scan boundedScan = new Scan(scan);
397            if (scan.getStartRow().length == 0) {
398              boundedScan.withStartRow(sp[i]);
399            } else {
400              boundedScan.withStartRow(
401                Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
402            }
403
404            if (scan.getStopRow().length == 0) {
405              boundedScan.withStopRow(sp[i + 1]);
406            } else {
407              boundedScan.withStopRow(
408                Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
409            }
410
411            splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
412          }
413        }
414      } else {
415        if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
416            hri.getStartKey(), hri.getEndKey())) {
417          List<String> hosts =
418              calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
419          splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
420        }
421      }
422    }
423
424    return splits;
425  }
426
427  /**
428   * Compute block locations for snapshot files (which will get the locations for referred hfiles)
429   * only when localityEnabled is true.
430   */
431  private static List<String> calculateLocationsForInputSplit(Configuration conf,
432      TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled)
433      throws IOException {
434    if (localityEnabled) { // care block locality
435      return getBestLocations(conf,
436                              HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
437    } else { // do not care block locality
438      return null;
439    }
440  }
441
442  /**
443   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
444   * weights into account, thus will treat every location passed from the input split as equal. We
445   * do not want to blindly pass all the locations, since we are creating one split per region, and
446   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
447   * is used. On the expected stable case, only one location will contain most of the blocks as
448   * local.
449   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
450   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
451   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
452   * host with the best locality.
453   * Return at most numTopsAtMost locations if there are more than that.
454   */
455  private static List<String> getBestLocations(Configuration conf,
456      HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {
457    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
458
459    if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is
460      return null;
461    }
462
463    if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1
464      numTopsAtMost = 1;
465    }
466    int top = Math.min(numTopsAtMost, hostAndWeights.length);
467    List<String> locations = new ArrayList<>(top);
468    HostAndWeight topHost = hostAndWeights[0];
469    locations.add(topHost.getHost());
470
471    if (top == 1) { // only care about the top host
472      return locations;
473    }
474
475    // When top >= 2,
476    // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
477    double cutoffMultiplier
478            = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
479
480    double filterWeight = topHost.getWeight() * cutoffMultiplier;
481
482    for (int i = 1; i <= top - 1; i++) {
483      if (hostAndWeights[i].getWeight() >= filterWeight) {
484        locations.add(hostAndWeights[i].getHost());
485      } else {
486        // As hostAndWeights is in descending order,
487        // we could break the loop as long as we meet a weight which is less than filterWeight.
488        break;
489      }
490    }
491
492    return locations;
493  }
494
495  public static List<String> getBestLocations(Configuration conf,
496      HDFSBlocksDistribution blockDistribution) {
497    // 3 nodes will contain highly local blocks. So default to 3.
498    return getBestLocations(conf, blockDistribution, 3);
499  }
500
501  private static String getSnapshotName(Configuration conf) {
502    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
503    if (snapshotName == null) {
504      throw new IllegalArgumentException("Snapshot name must be provided");
505    }
506    return snapshotName;
507  }
508
509  /**
510   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
511   * @param conf the job to configuration
512   * @param snapshotName the name of the snapshot to read from
513   * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
514   *          write permissions to this directory, and this should not be a subdirectory of rootdir.
515   *          After the job is finished, restoreDir can be deleted.
516   * @throws IOException if an error occurs
517   */
518  public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
519      throws IOException {
520    setInput(conf, snapshotName, restoreDir, null, 1);
521  }
522
523  /**
524   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
525   * @param conf the job to configure
526   * @param snapshotName the name of the snapshot to read from
527   * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
528   *          write permissions to this directory, and this should not be a subdirectory of rootdir.
529   *          After the job is finished, restoreDir can be deleted.
530   * @param numSplitsPerRegion how many input splits to generate per one region
531   * @param splitAlgo SplitAlgorithm to be used when generating InputSplits
532   * @throws IOException if an error occurs
533   */
534  public static void setInput(Configuration conf, String snapshotName, Path restoreDir,
535                              RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion)
536          throws IOException {
537    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
538    if (numSplitsPerRegion < 1) {
539      throw new IllegalArgumentException("numSplits must be >= 1, " +
540              "illegal numSplits : " + numSplitsPerRegion);
541    }
542    if (splitAlgo == null && numSplitsPerRegion > 1) {
543      throw new IllegalArgumentException("Split algo can't be null when numSplits > 1");
544    }
545    if (splitAlgo != null) {
546      conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());
547    }
548    conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);
549    Path rootDir = CommonFSUtils.getRootDir(conf);
550    FileSystem fs = rootDir.getFileSystem(conf);
551
552    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
553
554    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
555    conf.set(RESTORE_DIR_KEY, restoreDir.toString());
556  }
557}