001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapred;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.List;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.client.RegionInfo;
026import org.apache.hadoop.hbase.client.Result;
027import org.apache.hadoop.hbase.client.Scan;
028import org.apache.hadoop.hbase.client.TableDescriptor;
029import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
030import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
031import org.apache.hadoop.hbase.util.RegionSplitter;
032import org.apache.hadoop.mapred.InputFormat;
033import org.apache.hadoop.mapred.InputSplit;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RecordReader;
036import org.apache.hadoop.mapred.Reporter;
037import org.apache.yetus.audience.InterfaceAudience;
038
039/**
040 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
041 * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
042 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
043 */
044@InterfaceAudience.Public
045public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
046
047  public static class TableSnapshotRegionSplit implements InputSplit {
048    private TableSnapshotInputFormatImpl.InputSplit delegate;
049
050    // constructor for mapreduce framework / Writable
051    public TableSnapshotRegionSplit() {
052      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
053    }
054
055    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
056      this.delegate = delegate;
057    }
058
059    public TableSnapshotRegionSplit(TableDescriptor htd, RegionInfo regionInfo,
060      List<String> locations, Scan scan, Path restoreDir) {
061      this.delegate =
062        new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
063    }
064
065    @Override
066    public long getLength() throws IOException {
067      return delegate.getLength();
068    }
069
070    @Override
071    public String[] getLocations() throws IOException {
072      return delegate.getLocations();
073    }
074
075    @Override
076    public void write(DataOutput out) throws IOException {
077      delegate.write(out);
078    }
079
080    @Override
081    public void readFields(DataInput in) throws IOException {
082      delegate.readFields(in);
083    }
084  }
085
086  static class TableSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
087
088    private TableSnapshotInputFormatImpl.RecordReader delegate;
089
090    public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
091      throws IOException {
092      delegate = new TableSnapshotInputFormatImpl.RecordReader();
093      delegate.initialize(split.delegate, job);
094    }
095
096    @Override
097    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
098      if (!delegate.nextKeyValue()) {
099        return false;
100      }
101      ImmutableBytesWritable currentKey = delegate.getCurrentKey();
102      key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
103      value.copyFrom(delegate.getCurrentValue());
104      return true;
105    }
106
107    @Override
108    public ImmutableBytesWritable createKey() {
109      return new ImmutableBytesWritable();
110    }
111
112    @Override
113    public Result createValue() {
114      return new Result();
115    }
116
117    @Override
118    public long getPos() throws IOException {
119      return delegate.getPos();
120    }
121
122    @Override
123    public void close() throws IOException {
124      delegate.close();
125    }
126
127    @Override
128    public float getProgress() throws IOException {
129      return delegate.getProgress();
130    }
131  }
132
133  @Override
134  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
135    List<TableSnapshotInputFormatImpl.InputSplit> splits =
136      TableSnapshotInputFormatImpl.getSplits(job);
137    InputSplit[] results = new InputSplit[splits.size()];
138    for (int i = 0; i < splits.size(); i++) {
139      results[i] = new TableSnapshotRegionSplit(splits.get(i));
140    }
141    return results;
142  }
143
144  @Override
145  public RecordReader<ImmutableBytesWritable, Result> getRecordReader(InputSplit split, JobConf job,
146    Reporter reporter) throws IOException {
147    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
148  }
149
150  /**
151   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
152   * @param job          the job to configure
153   * @param snapshotName the name of the snapshot to read from
154   * @param restoreDir   a temporary directory to restore the snapshot into. Current user should
155   *                     have write permissions to this directory, and this should not be a
156   *                     subdirectory of rootdir. After the job is finished, restoreDir can be
157   *                     deleted.
158   * @throws IOException if an error occurs
159   */
160  public static void setInput(JobConf job, String snapshotName, Path restoreDir)
161    throws IOException {
162    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
163  }
164
165  /**
166   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
167   * @param job                the job to configure
168   * @param snapshotName       the name of the snapshot to read from
169   * @param restoreDir         a temporary directory to restore the snapshot into. Current user
170   *                           should have write permissions to this directory, and this should not
171   *                           be a subdirectory of rootdir. After the job is finished, restoreDir
172   *                           can be deleted.
173   * @param splitAlgo          split algorithm to generate splits from region
174   * @param numSplitsPerRegion how many input splits to generate per one region
175   * @throws IOException if an error occurs
176   */
177  public static void setInput(JobConf job, String snapshotName, Path restoreDir,
178    RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
179    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo,
180      numSplitsPerRegion);
181  }
182}