001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapred;
020
021import org.apache.hadoop.fs.Path;
022import org.apache.hadoop.hbase.HRegionInfo;
023import org.apache.hadoop.hbase.HTableDescriptor;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.hadoop.hbase.client.Result;
026import org.apache.hadoop.hbase.client.Scan;
027import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
028import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
029import org.apache.hadoop.hbase.util.RegionSplitter;
030import org.apache.hadoop.mapred.InputFormat;
031import org.apache.hadoop.mapred.InputSplit;
032import org.apache.hadoop.mapred.JobConf;
033import org.apache.hadoop.mapred.RecordReader;
034import org.apache.hadoop.mapred.Reporter;
035
036import java.io.DataInput;
037import java.io.DataOutput;
038import java.io.IOException;
039import java.util.List;
040
041/**
042 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
043 * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
044 *
045 * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
046 */
047@InterfaceAudience.Public
048public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
049
050  public static class TableSnapshotRegionSplit implements InputSplit {
051    private TableSnapshotInputFormatImpl.InputSplit delegate;
052
053    // constructor for mapreduce framework / Writable
054    public TableSnapshotRegionSplit() {
055      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
056    }
057
058    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
059      this.delegate = delegate;
060    }
061
062    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
063        List<String> locations, Scan scan, Path restoreDir) {
064      this.delegate =
065          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
066    }
067
068    @Override
069    public long getLength() throws IOException {
070      return delegate.getLength();
071    }
072
073    @Override
074    public String[] getLocations() throws IOException {
075      return delegate.getLocations();
076    }
077
078    @Override
079    public void write(DataOutput out) throws IOException {
080      delegate.write(out);
081    }
082
083    @Override
084    public void readFields(DataInput in) throws IOException {
085      delegate.readFields(in);
086    }
087  }
088
089  static class TableSnapshotRecordReader
090    implements RecordReader<ImmutableBytesWritable, Result> {
091
092    private TableSnapshotInputFormatImpl.RecordReader delegate;
093
094    public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
095        throws IOException {
096      delegate = new TableSnapshotInputFormatImpl.RecordReader();
097      delegate.initialize(split.delegate, job);
098    }
099
100    @Override
101    public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
102      if (!delegate.nextKeyValue()) {
103        return false;
104      }
105      ImmutableBytesWritable currentKey = delegate.getCurrentKey();
106      key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
107      value.copyFrom(delegate.getCurrentValue());
108      return true;
109    }
110
111    @Override
112    public ImmutableBytesWritable createKey() {
113      return new ImmutableBytesWritable();
114    }
115
116    @Override
117    public Result createValue() {
118      return new Result();
119    }
120
121    @Override
122    public long getPos() throws IOException {
123      return delegate.getPos();
124    }
125
126    @Override
127    public void close() throws IOException {
128      delegate.close();
129    }
130
131    @Override
132    public float getProgress() throws IOException {
133      return delegate.getProgress();
134    }
135  }
136
137  @Override
138  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
139    List<TableSnapshotInputFormatImpl.InputSplit> splits =
140      TableSnapshotInputFormatImpl.getSplits(job);
141    InputSplit[] results = new InputSplit[splits.size()];
142    for (int i = 0; i < splits.size(); i++) {
143      results[i] = new TableSnapshotRegionSplit(splits.get(i));
144    }
145    return results;
146  }
147
148  @Override
149  public RecordReader<ImmutableBytesWritable, Result>
150  getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
151    return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
152  }
153
154  /**
155   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
156   * @param job the job to configure
157   * @param snapshotName the name of the snapshot to read from
158   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
159   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
160   * After the job is finished, restoreDir can be deleted.
161   * @throws IOException if an error occurs
162   */
163  public static void setInput(JobConf job, String snapshotName, Path restoreDir)
164      throws IOException {
165    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
166  }
167
168  /**
169   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
170   * @param job the job to configure
171   * @param snapshotName the name of the snapshot to read from
172   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
173   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
174   * After the job is finished, restoreDir can be deleted.
175   * @param splitAlgo split algorithm to generate splits from region
176   * @param numSplitsPerRegion how many input splits to generate per one region
177   * @throws IOException if an error occurs
178   */
179  public static void setInput(JobConf job, String snapshotName, Path restoreDir,
180                              RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
181    TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir, splitAlgo, numSplitsPerRegion);
182  }
183}