001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.lang.reflect.Method;
025import java.util.ArrayList;
026import java.util.List;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.TableDescriptor;
032import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.util.RegionSplitter;
035import org.apache.hadoop.io.Writable;
036import org.apache.hadoop.mapreduce.InputFormat;
037import org.apache.hadoop.mapreduce.InputSplit;
038import org.apache.hadoop.mapreduce.Job;
039import org.apache.hadoop.mapreduce.JobContext;
040import org.apache.hadoop.mapreduce.RecordReader;
041import org.apache.hadoop.mapreduce.TaskAttemptContext;
042import org.apache.yetus.audience.InterfaceAudience;
043
044import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
045
046/**
047 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
048 * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
049 * wals, etc) directly to provide maximum performance. The snapshot is not required to be
050 * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
051 * online or offline hbase cluster. The snapshot files can be exported by using the
052 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster,
053 * and this InputFormat can be used to run the mapreduce job directly over the snapshot files.
054 * The snapshot should not be deleted while there are jobs reading from snapshot files.
055 * <p>
056 * Usage is similar to TableInputFormat, and
057 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
058 * can be used to configure the job.
059 * <pre>{@code
060 * Job job = new Job(conf);
061 * Scan scan = new Scan();
062 * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
063 *      scan, MyTableMapper.class, MyMapKeyOutput.class,
064 *      MyMapOutputValueWritable.class, job, true);
065 * }
066 * </pre>
067 * <p>
068 * Internally, this input format restores the snapshot into the given tmp directory. By default,
069 * and similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you
070 * can run N mapper tasks per every region, in which case the region key range will be split to
071 * N sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading
072 * from each RecordReader. An internal RegionScanner is used to execute the
073 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
074 * <p>
075 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
076 * snapshot files and data files.
077 * To read from snapshot files directly from the file system, the user who is running the MR job
078 * must have sufficient permissions to access snapshot and reference files.
079 * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
080 * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
081 * Note that, given other users access to read from snapshot/data files will completely circumvent
082 * the access control enforced by HBase.
083 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
084 */
085@InterfaceAudience.Public
086public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
087
088  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
089    private TableSnapshotInputFormatImpl.InputSplit delegate;
090
091    // constructor for mapreduce framework / Writable
092    public TableSnapshotRegionSplit() {
093      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
094    }
095
096    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
097      this.delegate = delegate;
098    }
099
100    public TableSnapshotRegionSplit(TableDescriptor htd, RegionInfo regionInfo,
101        List<String> locations, Scan scan, Path restoreDir) {
102      this.delegate =
103          new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
104    }
105
106    @Override
107    public long getLength() throws IOException, InterruptedException {
108      return delegate.getLength();
109    }
110
111    @Override
112    public String[] getLocations() throws IOException, InterruptedException {
113      return delegate.getLocations();
114    }
115
116    @Override
117    public void write(DataOutput out) throws IOException {
118      delegate.write(out);
119    }
120
121    @Override
122    public void readFields(DataInput in) throws IOException {
123      delegate.readFields(in);
124    }
125
126    public RegionInfo getRegion() {
127      return delegate.getRegionInfo();
128    }
129
130    TableSnapshotInputFormatImpl.InputSplit getDelegate() {
131      return this.delegate;
132    }
133  }
134
135  @VisibleForTesting
136  static class TableSnapshotRegionRecordReader extends
137      RecordReader<ImmutableBytesWritable, Result> {
138    private TableSnapshotInputFormatImpl.RecordReader delegate =
139      new TableSnapshotInputFormatImpl.RecordReader();
140    private TaskAttemptContext context;
141    private Method getCounter;
142
143    @Override
144    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
145        InterruptedException {
146      this.context = context;
147      getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
148      delegate.initialize(
149        ((TableSnapshotRegionSplit) split).delegate,
150        context.getConfiguration());
151    }
152
153    @Override
154    public boolean nextKeyValue() throws IOException, InterruptedException {
155      boolean result = delegate.nextKeyValue();
156      if (result) {
157        ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
158        if (scanMetrics != null && context != null) {
159          TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
160        }
161      }
162      return result;
163    }
164
165    @Override
166    public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
167      return delegate.getCurrentKey();
168    }
169
170    @Override
171    public Result getCurrentValue() throws IOException, InterruptedException {
172      return delegate.getCurrentValue();
173    }
174
175    @Override
176    public float getProgress() throws IOException, InterruptedException {
177      return delegate.getProgress();
178    }
179
180    @Override
181    public void close() throws IOException {
182      delegate.close();
183    }
184  }
185
186  @Override
187  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
188      InputSplit split, TaskAttemptContext context) throws IOException {
189    return new TableSnapshotRegionRecordReader();
190  }
191
192  @Override
193  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
194    List<InputSplit> results = new ArrayList<>();
195    for (TableSnapshotInputFormatImpl.InputSplit split :
196        TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
197      results.add(new TableSnapshotRegionSplit(split));
198    }
199    return results;
200  }
201
202  /**
203   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
204   * @param job the job to configure
205   * @param snapshotName the name of the snapshot to read from
206   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
207   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
208   * After the job is finished, restoreDir can be deleted.
209   * @throws IOException if an error occurs
210   */
211  public static void setInput(Job job, String snapshotName, Path restoreDir)
212      throws IOException {
213    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
214  }
215
216  /**
217   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
218   * @param job the job to configure
219   * @param snapshotName the name of the snapshot to read from
220   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
221   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
222   * After the job is finished, restoreDir can be deleted.
223   * @param splitAlgo split algorithm to generate splits from region
224   * @param numSplitsPerRegion how many input splits to generate per one region
225   * @throws IOException if an error occurs
226   */
227   public static void setInput(Job job, String snapshotName, Path restoreDir,
228                               RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
229     TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
230             splitAlgo, numSplitsPerRegion);
231   }
232}