View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import org.apache.hadoop.fs.Path;
23  import org.apache.hadoop.hbase.HRegionInfo;
24  import org.apache.hadoop.hbase.HTableDescriptor;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.hbase.client.Result;
28  import org.apache.hadoop.hbase.client.Scan;
29  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.io.Writable;
32  import org.apache.hadoop.mapreduce.InputFormat;
33  import org.apache.hadoop.mapreduce.InputSplit;
34  import org.apache.hadoop.mapreduce.Job;
35  import org.apache.hadoop.mapreduce.JobContext;
36  import org.apache.hadoop.mapreduce.RecordReader;
37  import org.apache.hadoop.mapreduce.TaskAttemptContext;
38  
39  import java.io.DataInput;
40  import java.io.DataOutput;
41  import java.io.IOException;
42  import java.lang.reflect.Method;
43  import java.util.ArrayList;
44  import java.util.List;
45  
46  /**
47   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
48   * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
49   * wals, etc) directly to provide maximum performance. The snapshot is not required to be
50   * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
51   * online or offline hbase cluster. The snapshot files can be exported by using the
52   * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 
53   * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 
54   * The snapshot should not be deleted while there are jobs reading from snapshot files.
55   * <p>
56   * Usage is similar to TableInputFormat, and
57   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
58   *   boolean, Path)}
59   * can be used to configure the job.
60   * <pre>{@code
61   * Job job = new Job(conf);
62   * Scan scan = new Scan();
63   * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
64   *      scan, MyTableMapper.class, MyMapKeyOutput.class,
65   *      MyMapOutputValueWritable.class, job, true);
66   * }
67   * </pre>
68   * <p>
69   * Internally, this input format restores the snapshot into the given tmp directory. Similar to
70   * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
71   * from each RecordReader. An internal RegionScanner is used to execute the 
72   * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
73   * <p>
74   * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
75   * snapshot files and data files.
76   * To read from snapshot files directly from the file system, the user who is running the MR job
77   * must have sufficient permissions to access snapshot and reference files.
78   * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
79   * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
80   * Note that, given other users access to read from snapshot/data files will completely circumvent
81   * the access control enforced by HBase.
82   * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
87  
88    public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
89      private TableSnapshotInputFormatImpl.InputSplit delegate;
90  
91      // constructor for mapreduce framework / Writable
92      public TableSnapshotRegionSplit() {
93        this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
94      }
95  
96      public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
97        this.delegate = delegate;
98      }
99  
100     public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
101         List<String> locations, Scan scan, Path restoreDir) {
102       this.delegate =
103           new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
104     }
105 
106     @Override
107     public long getLength() throws IOException, InterruptedException {
108       return delegate.getLength();
109     }
110 
111     @Override
112     public String[] getLocations() throws IOException, InterruptedException {
113       return delegate.getLocations();
114     }
115 
116     @Override
117     public void write(DataOutput out) throws IOException {
118       delegate.write(out);
119     }
120 
121     @Override
122     public void readFields(DataInput in) throws IOException {
123       delegate.readFields(in);
124     }
125   }
126 
127   @VisibleForTesting
128   static class TableSnapshotRegionRecordReader extends
129       RecordReader<ImmutableBytesWritable, Result> {
130     private TableSnapshotInputFormatImpl.RecordReader delegate =
131       new TableSnapshotInputFormatImpl.RecordReader();
132     private TaskAttemptContext context;
133     private Method getCounter;
134 
135     @Override
136     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
137         InterruptedException {
138       this.context = context;
139       getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
140       delegate.initialize(
141         ((TableSnapshotRegionSplit) split).delegate,
142         context.getConfiguration());
143     }
144 
145     @Override
146     public boolean nextKeyValue() throws IOException, InterruptedException {
147       boolean result = delegate.nextKeyValue();
148       if (result) {
149         ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
150         if (scanMetrics != null && context != null) {
151           TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
152         }
153       }
154       return result;
155     }
156 
157     @Override
158     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
159       return delegate.getCurrentKey();
160     }
161 
162     @Override
163     public Result getCurrentValue() throws IOException, InterruptedException {
164       return delegate.getCurrentValue();
165     }
166 
167     @Override
168     public float getProgress() throws IOException, InterruptedException {
169       return delegate.getProgress();
170     }
171 
172     @Override
173     public void close() throws IOException {
174       delegate.close();
175     }
176   }
177 
178   @Override
179   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
180       InputSplit split, TaskAttemptContext context) throws IOException {
181     return new TableSnapshotRegionRecordReader();
182   }
183 
184   @Override
185   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
186     List<InputSplit> results = new ArrayList<InputSplit>();
187     for (TableSnapshotInputFormatImpl.InputSplit split :
188         TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
189       results.add(new TableSnapshotRegionSplit(split));
190     }
191     return results;
192   }
193 
194   /**
195    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
196    * @param job the job to configure
197    * @param snapshotName the name of the snapshot to read from
198    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
199    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
200    * After the job is finished, restoreDir can be deleted.
201    * @throws IOException if an error occurs
202    */
203   public static void setInput(Job job, String snapshotName, Path restoreDir)
204       throws IOException {
205     TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
206   }
207 }