001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.ByteArrayOutputStream;
022import java.io.DataInput;
023import java.io.DataOutput;
024import java.io.IOException;
025import java.lang.reflect.InvocationTargetException;
026import java.util.ArrayList;
027import java.util.List;
028import java.util.UUID;
029
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HDFSBlocksDistribution;
034import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
035import org.apache.hadoop.hbase.HRegionInfo;
036import org.apache.hadoop.hbase.PrivateCellUtil;
037import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
038import org.apache.hadoop.hbase.client.IsolationLevel;
039import org.apache.hadoop.hbase.client.Result;
040import org.apache.hadoop.hbase.client.Scan;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
043import org.apache.hadoop.hbase.regionserver.HRegion;
044import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
045import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
046import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.FSUtils;
049import org.apache.hadoop.hbase.util.RegionSplitter;
050import org.apache.hadoop.io.Writable;
051import org.apache.yetus.audience.InterfaceAudience;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
056import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
060
061/**
062 * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
063 */
064@InterfaceAudience.Private
065public class TableSnapshotInputFormatImpl {
066  // TODO: Snapshots files are owned in fs by the hbase user. There is no
067  // easy way to delegate access.
068
069  public static final Logger LOG = LoggerFactory.getLogger(TableSnapshotInputFormatImpl.class);
070
071  private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
072  // key for specifying the root dir of the restored snapshot
073  protected static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
074
075  /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution, int)} */
076  private static final String LOCALITY_CUTOFF_MULTIPLIER =
077    "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
078  private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
079
080  /**
081   * For MapReduce jobs running multiple mappers per region, determines
082   * what split algorithm we should be using to find split points for scanners.
083   */
084  public static final String SPLIT_ALGO = "hbase.mapreduce.split.algorithm";
085  /**
086   * For MapReduce jobs running multiple mappers per region, determines
087   * number of splits to generate per region.
088   */
089  public static final String NUM_SPLITS_PER_REGION = "hbase.mapreduce.splits.per.region";
090
091  /**
092   * Whether to calculate the block location for splits. Default to true.
093   * If the computing layer runs outside of HBase cluster, the block locality does not master.
094   * Setting this value to false could skip the calculation and save some time.
095   *
096   * Set access modifier to "public" so that these could be accessed by test classes of
097   * both org.apache.hadoop.hbase.mapred
098   * and  org.apache.hadoop.hbase.mapreduce.
099   */
100  public static final String  SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY =
101      "hbase.TableSnapshotInputFormat.locality.enabled";
102  public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
103
104  /**
105   * Implementation class for InputSplit logic common between mapred and mapreduce.
106   */
107  public static class InputSplit implements Writable {
108
109    private TableDescriptor htd;
110    private HRegionInfo regionInfo;
111    private String[] locations;
112    private String scan;
113    private String restoreDir;
114
115    // constructor for mapreduce framework / Writable
116    public InputSplit() {}
117
118    public InputSplit(TableDescriptor htd, HRegionInfo regionInfo, List<String> locations,
119        Scan scan, Path restoreDir) {
120      this.htd = htd;
121      this.regionInfo = regionInfo;
122      if (locations == null || locations.isEmpty()) {
123        this.locations = new String[0];
124      } else {
125        this.locations = locations.toArray(new String[locations.size()]);
126      }
127      try {
128        this.scan = scan != null ? TableMapReduceUtil.convertScanToString(scan) : "";
129      } catch (IOException e) {
130        LOG.warn("Failed to convert Scan to String", e);
131      }
132
133      this.restoreDir = restoreDir.toString();
134    }
135
136    public TableDescriptor getHtd() {
137      return htd;
138    }
139
140    public String getScan() {
141      return scan;
142    }
143
144    public String getRestoreDir() {
145      return restoreDir;
146    }
147
148    public long getLength() {
149      //TODO: We can obtain the file sizes of the snapshot here.
150      return 0;
151    }
152
153    public String[] getLocations() {
154      return locations;
155    }
156
157    public TableDescriptor getTableDescriptor() {
158      return htd;
159    }
160
161    public HRegionInfo getRegionInfo() {
162      return regionInfo;
163    }
164
165    // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
166    // doing this wrapping with Writables.
167    @Override
168    public void write(DataOutput out) throws IOException {
169      TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
170          .setTable(ProtobufUtil.toTableSchema(htd))
171          .setRegion(HRegionInfo.convert(regionInfo));
172
173      for (String location : locations) {
174        builder.addLocations(location);
175      }
176
177      TableSnapshotRegionSplit split = builder.build();
178
179      ByteArrayOutputStream baos = new ByteArrayOutputStream();
180      split.writeTo(baos);
181      baos.close();
182      byte[] buf = baos.toByteArray();
183      out.writeInt(buf.length);
184      out.write(buf);
185
186      Bytes.writeByteArray(out, Bytes.toBytes(scan));
187      Bytes.writeByteArray(out, Bytes.toBytes(restoreDir));
188
189    }
190
191    @Override
192    public void readFields(DataInput in) throws IOException {
193      int len = in.readInt();
194      byte[] buf = new byte[len];
195      in.readFully(buf);
196      TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
197      this.htd = ProtobufUtil.toTableDescriptor(split.getTable());
198      this.regionInfo = HRegionInfo.convert(split.getRegion());
199      List<String> locationsList = split.getLocationsList();
200      this.locations = locationsList.toArray(new String[locationsList.size()]);
201
202      this.scan = Bytes.toString(Bytes.readByteArray(in));
203      this.restoreDir = Bytes.toString(Bytes.readByteArray(in));
204    }
205  }
206
207  /**
208   * Implementation class for RecordReader logic common between mapred and mapreduce.
209   */
210  public static class RecordReader {
211    private InputSplit split;
212    private Scan scan;
213    private Result result = null;
214    private ImmutableBytesWritable row = null;
215    private ClientSideRegionScanner scanner;
216
217    public ClientSideRegionScanner getScanner() {
218      return scanner;
219    }
220
221    public void initialize(InputSplit split, Configuration conf) throws IOException {
222      this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
223      this.split = split;
224      TableDescriptor htd = split.htd;
225      HRegionInfo hri = this.split.getRegionInfo();
226      FileSystem fs = FSUtils.getCurrentFileSystem(conf);
227
228
229      // region is immutable, this should be fine,
230      // otherwise we have to set the thread read point
231      scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
232      // disable caching of data blocks
233      scan.setCacheBlocks(false);
234      scan.setScanMetricsEnabled(true);
235
236      scanner =
237          new ClientSideRegionScanner(conf, fs, new Path(split.restoreDir), htd, hri, scan, null);
238    }
239
240    public boolean nextKeyValue() throws IOException {
241      result = scanner.next();
242      if (result == null) {
243        //we are done
244        return false;
245      }
246
247      if (this.row == null) {
248        this.row = new ImmutableBytesWritable();
249      }
250      this.row.set(result.getRow());
251      return true;
252    }
253
254    public ImmutableBytesWritable getCurrentKey() {
255      return row;
256    }
257
258    public Result getCurrentValue() {
259      return result;
260    }
261
262    public long getPos() {
263      return 0;
264    }
265
266    public float getProgress() {
267      return 0; // TODO: use total bytes to estimate
268    }
269
270    public void close() {
271      if (this.scanner != null) {
272        this.scanner.close();
273      }
274    }
275  }
276
277  public static List<InputSplit> getSplits(Configuration conf) throws IOException {
278    String snapshotName = getSnapshotName(conf);
279
280    Path rootDir = FSUtils.getRootDir(conf);
281    FileSystem fs = rootDir.getFileSystem(conf);
282
283    SnapshotManifest manifest = getSnapshotManifest(conf, snapshotName, rootDir, fs);
284
285    List<HRegionInfo> regionInfos = getRegionInfosFromManifest(manifest);
286
287    // TODO: mapred does not support scan as input API. Work around for now.
288    Scan scan = extractScanFromConf(conf);
289    // the temp dir where the snapshot is restored
290    Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
291
292    RegionSplitter.SplitAlgorithm splitAlgo = getSplitAlgo(conf);
293
294    int numSplits = conf.getInt(NUM_SPLITS_PER_REGION, 1);
295
296    return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
297  }
298
299  public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{
300    String splitAlgoClassName = conf.get(SPLIT_ALGO);
301    if (splitAlgoClassName == null)
302      return null;
303    try {
304      return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
305          .getDeclaredConstructor().newInstance();
306    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
307        NoSuchMethodException | InvocationTargetException e) {
308      throw new IOException("SplitAlgo class " + splitAlgoClassName + " is not found", e);
309    }
310  }
311
312
313  public static List<HRegionInfo> getRegionInfosFromManifest(SnapshotManifest manifest) {
314    List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
315    if (regionManifests == null) {
316      throw new IllegalArgumentException("Snapshot seems empty");
317    }
318
319    List<HRegionInfo> regionInfos = Lists.newArrayListWithCapacity(regionManifests.size());
320
321    for (SnapshotRegionManifest regionManifest : regionManifests) {
322      HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
323      if (hri.isOffline() && (hri.isSplit() || hri.isSplitParent())) {
324        continue;
325      }
326      regionInfos.add(hri);
327    }
328    return regionInfos;
329  }
330
331  public static SnapshotManifest getSnapshotManifest(Configuration conf, String snapshotName,
332      Path rootDir, FileSystem fs) throws IOException {
333    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
334    SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
335    return SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
336  }
337
338  public static Scan extractScanFromConf(Configuration conf) throws IOException {
339    Scan scan = null;
340    if (conf.get(TableInputFormat.SCAN) != null) {
341      scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
342    } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
343      String[] columns =
344        conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
345      scan = new Scan();
346      for (String col : columns) {
347        scan.addFamily(Bytes.toBytes(col));
348      }
349    } else {
350      throw new IllegalArgumentException("Unable to create scan");
351    }
352    return scan;
353  }
354
355  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
356      List<HRegionInfo> regionManifests, Path restoreDir, Configuration conf) throws IOException {
357    return getSplits(scan, manifest, regionManifests, restoreDir, conf, null, 1);
358  }
359
360  public static List<InputSplit> getSplits(Scan scan, SnapshotManifest manifest,
361      List<HRegionInfo> regionManifests, Path restoreDir,
362      Configuration conf, RegionSplitter.SplitAlgorithm sa, int numSplits) throws IOException {
363    // load table descriptor
364    TableDescriptor htd = manifest.getTableDescriptor();
365
366    Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
367
368    boolean localityEnabled = conf.getBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY,
369                                              SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT);
370
371    List<InputSplit> splits = new ArrayList<>();
372    for (HRegionInfo hri : regionManifests) {
373      // load region descriptor
374
375      if (numSplits > 1) {
376        byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true);
377        for (int i = 0; i < sp.length - 1; i++) {
378          if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i],
379                  sp[i + 1])) {
380            List<String> hosts =
381                calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
382
383            Scan boundedScan = new Scan(scan);
384            if (scan.getStartRow().length == 0) {
385              boundedScan.withStartRow(sp[i]);
386            } else {
387              boundedScan.withStartRow(
388                Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]);
389            }
390
391            if (scan.getStopRow().length == 0) {
392              boundedScan.withStopRow(sp[i + 1]);
393            } else {
394              boundedScan.withStopRow(
395                Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]);
396            }
397
398            splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir));
399          }
400        }
401      } else {
402        if (PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
403            hri.getStartKey(), hri.getEndKey())) {
404          List<String> hosts =
405              calculateLocationsForInputSplit(conf, htd, hri, tableDir, localityEnabled);
406          splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir));
407        }
408      }
409    }
410
411    return splits;
412  }
413
414  /**
415   * Compute block locations for snapshot files (which will get the locations for referred hfiles)
416   * only when localityEnabled is true.
417   */
418  private static List<String> calculateLocationsForInputSplit(Configuration conf,
419      TableDescriptor htd, HRegionInfo hri, Path tableDir, boolean localityEnabled)
420      throws IOException {
421    if (localityEnabled) { // care block locality
422      return getBestLocations(conf,
423                              HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
424    } else { // do not care block locality
425      return null;
426    }
427  }
428
429  /**
430   * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
431   * weights into account, thus will treat every location passed from the input split as equal. We
432   * do not want to blindly pass all the locations, since we are creating one split per region, and
433   * the region's blocks are all distributed throughout the cluster unless favorite node assignment
434   * is used. On the expected stable case, only one location will contain most of the blocks as
435   * local.
436   * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
437   * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
438   * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
439   * host with the best locality.
440   * Return at most numTopsAtMost locations if there are more than that.
441   */
442  private static List<String> getBestLocations(Configuration conf,
443      HDFSBlocksDistribution blockDistribution, int numTopsAtMost) {
444    HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
445
446    if (hostAndWeights.length == 0) { // no matter what numTopsAtMost is
447      return null;
448    }
449
450    if (numTopsAtMost < 1) { // invalid if numTopsAtMost < 1, correct it to be 1
451      numTopsAtMost = 1;
452    }
453    int top = Math.min(numTopsAtMost, hostAndWeights.length);
454    List<String> locations = new ArrayList<>(top);
455    HostAndWeight topHost = hostAndWeights[0];
456    locations.add(topHost.getHost());
457
458    if (top == 1) { // only care about the top host
459      return locations;
460    }
461
462    // When top >= 2,
463    // do the heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
464    double cutoffMultiplier
465            = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
466
467    double filterWeight = topHost.getWeight() * cutoffMultiplier;
468
469    for (int i = 1; i <= top - 1; i++) {
470      if (hostAndWeights[i].getWeight() >= filterWeight) {
471        locations.add(hostAndWeights[i].getHost());
472      } else {
473        // As hostAndWeights is in descending order,
474        // we could break the loop as long as we meet a weight which is less than filterWeight.
475        break;
476      }
477    }
478
479    return locations;
480  }
481
482  public static List<String> getBestLocations(Configuration conf,
483      HDFSBlocksDistribution blockDistribution) {
484    // 3 nodes will contain highly local blocks. So default to 3.
485    return getBestLocations(conf, blockDistribution, 3);
486  }
487
488  private static String getSnapshotName(Configuration conf) {
489    String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
490    if (snapshotName == null) {
491      throw new IllegalArgumentException("Snapshot name must be provided");
492    }
493    return snapshotName;
494  }
495
496  /**
497   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
498   * @param conf the job to configuration
499   * @param snapshotName the name of the snapshot to read from
500   * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
501   *          write permissions to this directory, and this should not be a subdirectory of rootdir.
502   *          After the job is finished, restoreDir can be deleted.
503   * @throws IOException if an error occurs
504   */
505  public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
506      throws IOException {
507    setInput(conf, snapshotName, restoreDir, null, 1);
508  }
509
510  /**
511   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
512   * @param conf the job to configure
513   * @param snapshotName the name of the snapshot to read from
514   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
515   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
516   * After the job is finished, restoreDir can be deleted.
517   * @param numSplitsPerRegion how many input splits to generate per one region
518   * @param splitAlgo SplitAlgorithm to be used when generating InputSplits
519   * @throws IOException if an error occurs
520   */
521  public static void setInput(Configuration conf, String snapshotName, Path restoreDir,
522                              RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion)
523          throws IOException {
524    conf.set(SNAPSHOT_NAME_KEY, snapshotName);
525    if (numSplitsPerRegion < 1) {
526      throw new IllegalArgumentException("numSplits must be >= 1, " +
527              "illegal numSplits : " + numSplitsPerRegion);
528    }
529    if (splitAlgo == null && numSplitsPerRegion > 1) {
530      throw new IllegalArgumentException("Split algo can't be null when numSplits > 1");
531    }
532    if (splitAlgo != null) {
533      conf.set(SPLIT_ALGO, splitAlgo.getClass().getName());
534    }
535    conf.setInt(NUM_SPLITS_PER_REGION, numSplitsPerRegion);
536    Path rootDir = FSUtils.getRootDir(conf);
537    FileSystem fs = rootDir.getFileSystem(conf);
538
539    restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
540
541    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
542    conf.set(RESTORE_DIR_KEY, restoreDir.toString());
543  }
544}