001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.HRegionInfo;
027import org.apache.hadoop.hbase.HTableDescriptor;
028import org.apache.hadoop.hbase.client.RegionInfo;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
032import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
033import org.apache.hadoop.hbase.util.RegionSplitter;
034import org.apache.hadoop.io.Writable;
035import org.apache.hadoop.mapreduce.InputFormat;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.Job;
038import org.apache.hadoop.mapreduce.JobContext;
039import org.apache.hadoop.mapreduce.RecordReader;
040import org.apache.hadoop.mapreduce.TaskAttemptContext;
041import org.apache.yetus.audience.InterfaceAudience;
042
043/**
044 * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job bypasses
045 * HBase servers, and directly accesses the underlying files (hfile, recovered edits, wals, etc)
046 * directly to provide maximum performance. The snapshot is not required to be restored to the live
047 * cluster or cloned. This also allows to run the mapreduce job from an online or offline hbase
048 * cluster. The snapshot files can be exported by using the
049 * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, and this
050 * InputFormat can be used to run the mapreduce job directly over the snapshot files. The snapshot
051 * should not be deleted while there are jobs reading from snapshot files.
052 * <p>
053 * Usage is similar to TableInputFormat, and
054 * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job, boolean, Path)}
055 * can be used to configure the job.
056 *
057 * <pre>
058 * {
059 *   &#64;code
060 *   Job job = new Job(conf);
061 *   Scan scan = new Scan();
062 *   TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan, MyTableMapper.class,
063 *     MyMapKeyOutput.class, MyMapOutputValueWritable.class, job, true);
064 * }
065 * </pre>
066 * <p>
067 * Internally, this input format restores the snapshot into the given tmp directory. By default, and
068 * similar to {@link TableInputFormat} an InputSplit is created per region, but optionally you can
069 * run N mapper tasks per every region, in which case the region key range will be split to N
070 * sub-ranges and an InputSplit will be created per sub-range. The region is opened for reading from
071 * each RecordReader. An internal RegionScanner is used to execute the
072 * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
073 * <p>
074 * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
075 * snapshot files and data files. To read from snapshot files directly from the file system, the
076 * user who is running the MR job must have sufficient permissions to access snapshot and reference
077 * files. This means that to run mapreduce over snapshot files, the MR job has to be run as the
078 * HBase user or the user must have group or other privileges in the filesystem (See HBASE-8369).
079 * Note that, given other users access to read from snapshot/data files will completely circumvent
080 * the access control enforced by HBase.
081 * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
082 */
083@InterfaceAudience.Public
084public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
085
086  public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
087    private TableSnapshotInputFormatImpl.InputSplit delegate;
088
089    // constructor for mapreduce framework / Writable
090    public TableSnapshotRegionSplit() {
091      this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
092    }
093
094    public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
095      this.delegate = delegate;
096    }
097
098    public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
099      List<String> locations, Scan scan, Path restoreDir) {
100      this.delegate =
101        new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
102    }
103
104    @Override
105    public long getLength() throws IOException, InterruptedException {
106      return delegate.getLength();
107    }
108
109    @Override
110    public String[] getLocations() throws IOException, InterruptedException {
111      return delegate.getLocations();
112    }
113
114    @Override
115    public void write(DataOutput out) throws IOException {
116      delegate.write(out);
117    }
118
119    @Override
120    public void readFields(DataInput in) throws IOException {
121      delegate.readFields(in);
122    }
123
124    /**
125     * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 Use {@link #getRegion()}
126     */
127    @Deprecated
128    public HRegionInfo getRegionInfo() {
129      return delegate.getRegionInfo();
130    }
131
132    public RegionInfo getRegion() {
133      return delegate.getRegionInfo();
134    }
135
136    TableSnapshotInputFormatImpl.InputSplit getDelegate() {
137      return this.delegate;
138    }
139  }
140
141  @InterfaceAudience.Private
142  static class TableSnapshotRegionRecordReader
143    extends RecordReader<ImmutableBytesWritable, Result> {
144    private TableSnapshotInputFormatImpl.RecordReader delegate =
145      new TableSnapshotInputFormatImpl.RecordReader();
146    private TaskAttemptContext context;
147
148    @Override
149    public void initialize(InputSplit split, TaskAttemptContext context)
150      throws IOException, InterruptedException {
151      this.context = context;
152      delegate.initialize(((TableSnapshotRegionSplit) split).delegate, context.getConfiguration());
153    }
154
155    @Override
156    public boolean nextKeyValue() throws IOException, InterruptedException {
157      boolean result = delegate.nextKeyValue();
158      if (result) {
159        ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
160        if (scanMetrics != null && context != null) {
161          TableRecordReaderImpl.updateCounters(scanMetrics, 0, context, 0);
162        }
163      }
164      return result;
165    }
166
167    @Override
168    public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
169      return delegate.getCurrentKey();
170    }
171
172    @Override
173    public Result getCurrentValue() throws IOException, InterruptedException {
174      return delegate.getCurrentValue();
175    }
176
177    @Override
178    public float getProgress() throws IOException, InterruptedException {
179      return delegate.getProgress();
180    }
181
182    @Override
183    public void close() throws IOException {
184      delegate.close();
185    }
186  }
187
188  @Override
189  public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split,
190    TaskAttemptContext context) throws IOException {
191    return new TableSnapshotRegionRecordReader();
192  }
193
194  @Override
195  public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
196    List<InputSplit> results = new ArrayList<>();
197    for (TableSnapshotInputFormatImpl.InputSplit split : TableSnapshotInputFormatImpl
198      .getSplits(job.getConfiguration())) {
199      results.add(new TableSnapshotRegionSplit(split));
200    }
201    return results;
202  }
203
204  /**
205   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
206   * @param job          the job to configure
207   * @param snapshotName the name of the snapshot to read from
208   * @param restoreDir   a temporary directory to restore the snapshot into. Current user should
209   *                     have write permissions to this directory, and this should not be a
210   *                     subdirectory of rootdir. After the job is finished, restoreDir can be
211   *                     deleted.
212   * @throws IOException if an error occurs
213   */
214  public static void setInput(Job job, String snapshotName, Path restoreDir) throws IOException {
215    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
216  }
217
218  /**
219   * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
220   * @param job                the job to configure
221   * @param snapshotName       the name of the snapshot to read from
222   * @param restoreDir         a temporary directory to restore the snapshot into. Current user
223   *                           should have write permissions to this directory, and this should not
224   *                           be a subdirectory of rootdir. After the job is finished, restoreDir
225   *                           can be deleted.
226   * @param splitAlgo          split algorithm to generate splits from region
227   * @param numSplitsPerRegion how many input splits to generate per one region
228   * @throws IOException if an error occurs
229   */
230  public static void setInput(Job job, String snapshotName, Path restoreDir,
231    RegionSplitter.SplitAlgorithm splitAlgo, int numSplitsPerRegion) throws IOException {
232    TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir,
233      splitAlgo, numSplitsPerRegion);
234  }
235
236  /**
237   * clean restore directory after snapshot scan job
238   * @param job          the snapshot scan job
239   * @param snapshotName the name of the snapshot to read from
240   * @throws IOException if an error occurs
241   */
242  public static void cleanRestoreDir(Job job, String snapshotName) throws IOException {
243    TableSnapshotInputFormatImpl.cleanRestoreDir(job, snapshotName);
244  }
245}