View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.lang.reflect.Method;
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.fs.Path;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.io.Writable;
37  import org.apache.hadoop.mapreduce.InputFormat;
38  import org.apache.hadoop.mapreduce.InputSplit;
39  import org.apache.hadoop.mapreduce.Job;
40  import org.apache.hadoop.mapreduce.JobContext;
41  import org.apache.hadoop.mapreduce.RecordReader;
42  import org.apache.hadoop.mapreduce.TaskAttemptContext;
43  
44  import com.google.common.annotations.VisibleForTesting;
45  
46  /**
47   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. The job
48   * bypasses HBase servers, and directly accesses the underlying files (hfile, recovered edits,
49   * wals, etc) directly to provide maximum performance. The snapshot is not required to be
50   * restored to the live cluster or cloned. This also allows to run the mapreduce job from an
51   * online or offline hbase cluster. The snapshot files can be exported by using the
52   * {@link org.apache.hadoop.hbase.snapshot.ExportSnapshot} tool, to a pure-hdfs cluster, 
53   * and this InputFormat can be used to run the mapreduce job directly over the snapshot files. 
54   * The snapshot should not be deleted while there are jobs reading from snapshot files.
55   * <p>
56   * Usage is similar to TableInputFormat, and
57   * {@link TableMapReduceUtil#initTableSnapshotMapperJob(String, Scan, Class, Class, Class, Job,
58   *   boolean, Path)}
59   * can be used to configure the job.
60   * <pre>{@code
61   * Job job = new Job(conf);
62   * Scan scan = new Scan();
63   * TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
64   *      scan, MyTableMapper.class, MyMapKeyOutput.class,
65   *      MyMapOutputValueWritable.class, job, true);
66   * }
67   * </pre>
68   * <p>
69   * Internally, this input format restores the snapshot into the given tmp directory. Similar to
70   * {@link TableInputFormat} an InputSplit is created per region. The region is opened for reading
71   * from each RecordReader. An internal RegionScanner is used to execute the 
72   * {@link org.apache.hadoop.hbase.CellScanner} obtained from the user.
73   * <p>
74   * HBase owns all the data and snapshot files on the filesystem. Only the 'hbase' user can read from
75   * snapshot files and data files.
76   * To read from snapshot files directly from the file system, the user who is running the MR job
77   * must have sufficient permissions to access snapshot and reference files.
78   * This means that to run mapreduce over snapshot files, the MR job has to be run as the HBase
79   * user or the user must have group or other privileges in the filesystem (See HBASE-8369).
80   * Note that, given other users access to read from snapshot/data files will completely circumvent
81   * the access control enforced by HBase.
82   * @see org.apache.hadoop.hbase.client.TableSnapshotScanner
83   */
84  @InterfaceAudience.Public
85  @InterfaceStability.Evolving
86  public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
87  
88    public static class TableSnapshotRegionSplit extends InputSplit implements Writable {
89      private TableSnapshotInputFormatImpl.InputSplit delegate;
90  
91      // constructor for mapreduce framework / Writable
92      public TableSnapshotRegionSplit() {
93        this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
94      }
95  
96      public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
97        this.delegate = delegate;
98      }
99  
100     public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
101         List<String> locations) {
102       this.delegate = new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations);
103     }
104 
105     @Override
106     public long getLength() throws IOException, InterruptedException {
107       return delegate.getLength();
108     }
109 
110     @Override
111     public String[] getLocations() throws IOException, InterruptedException {
112       return delegate.getLocations();
113     }
114 
115     @Override
116     public void write(DataOutput out) throws IOException {
117       delegate.write(out);
118     }
119 
120     @Override
121     public void readFields(DataInput in) throws IOException {
122       delegate.readFields(in);
123     }
124   }
125 
126   @VisibleForTesting
127   static class TableSnapshotRegionRecordReader extends
128       RecordReader<ImmutableBytesWritable, Result> {
129     private TableSnapshotInputFormatImpl.RecordReader delegate =
130       new TableSnapshotInputFormatImpl.RecordReader();
131     private TaskAttemptContext context;
132     private Method getCounter;
133 
134     @Override
135     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
136         InterruptedException {
137       this.context = context;
138       getCounter = TableRecordReaderImpl.retrieveGetCounterWithStringsParams(context);
139       delegate.initialize(
140         ((TableSnapshotRegionSplit) split).delegate,
141         context.getConfiguration());
142     }
143 
144     @Override
145     public boolean nextKeyValue() throws IOException, InterruptedException {
146       boolean result = delegate.nextKeyValue();
147       if (result) {
148         ScanMetrics scanMetrics = delegate.getScanner().getScanMetrics();
149         if (scanMetrics != null && context != null) {
150           TableRecordReaderImpl.updateCounters(scanMetrics, 0, getCounter, context, 0);
151         }
152       }
153       return result;
154     }
155 
156     @Override
157     public ImmutableBytesWritable getCurrentKey() throws IOException, InterruptedException {
158       return delegate.getCurrentKey();
159     }
160 
161     @Override
162     public Result getCurrentValue() throws IOException, InterruptedException {
163       return delegate.getCurrentValue();
164     }
165 
166     @Override
167     public float getProgress() throws IOException, InterruptedException {
168       return delegate.getProgress();
169     }
170 
171     @Override
172     public void close() throws IOException {
173       delegate.close();
174     }
175   }
176 
177   @Override
178   public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
179       InputSplit split, TaskAttemptContext context) throws IOException {
180     return new TableSnapshotRegionRecordReader();
181   }
182 
183   @Override
184   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
185     List<InputSplit> results = new ArrayList<InputSplit>();
186     for (TableSnapshotInputFormatImpl.InputSplit split :
187         TableSnapshotInputFormatImpl.getSplits(job.getConfiguration())) {
188       results.add(new TableSnapshotRegionSplit(split));
189     }
190     return results;
191   }
192 
193   /**
194    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
195    * @param job the job to configure
196    * @param snapshotName the name of the snapshot to read from
197    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
198    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
199    * After the job is finished, restoreDir can be deleted.
200    * @throws IOException if an error occurs
201    */
202   public static void setInput(Job job, String snapshotName, Path restoreDir)
203       throws IOException {
204     TableSnapshotInputFormatImpl.setInput(job.getConfiguration(), snapshotName, restoreDir);
205   }
206 }