View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapred;
20  
21  import org.apache.hadoop.fs.Path;
22  import org.apache.hadoop.hbase.HRegionInfo;
23  import org.apache.hadoop.hbase.HTableDescriptor;
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.hbase.client.Result;
27  import org.apache.hadoop.hbase.client.Scan;
28  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29  import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
30  import org.apache.hadoop.mapred.InputFormat;
31  import org.apache.hadoop.mapred.InputSplit;
32  import org.apache.hadoop.mapred.JobConf;
33  import org.apache.hadoop.mapred.RecordReader;
34  import org.apache.hadoop.mapred.Reporter;
35  
36  import java.io.DataInput;
37  import java.io.DataOutput;
38  import java.io.IOException;
39  import java.util.List;
40  
41  /**
42   * TableSnapshotInputFormat allows a MapReduce job to run over a table snapshot. Further
43   * documentation available on {@link org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat}.
44   *
45   * @see org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Evolving
49  public class TableSnapshotInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
50  
51    public static class TableSnapshotRegionSplit implements InputSplit {
52      private TableSnapshotInputFormatImpl.InputSplit delegate;
53  
54      // constructor for mapreduce framework / Writable
55      public TableSnapshotRegionSplit() {
56        this.delegate = new TableSnapshotInputFormatImpl.InputSplit();
57      }
58  
59      public TableSnapshotRegionSplit(TableSnapshotInputFormatImpl.InputSplit delegate) {
60        this.delegate = delegate;
61      }
62  
63      public TableSnapshotRegionSplit(HTableDescriptor htd, HRegionInfo regionInfo,
64          List<String> locations, Scan scan, Path restoreDir) {
65        this.delegate =
66            new TableSnapshotInputFormatImpl.InputSplit(htd, regionInfo, locations, scan, restoreDir);
67      }
68  
69      @Override
70      public long getLength() throws IOException {
71        return delegate.getLength();
72      }
73  
74      @Override
75      public String[] getLocations() throws IOException {
76        return delegate.getLocations();
77      }
78  
79      @Override
80      public void write(DataOutput out) throws IOException {
81        delegate.write(out);
82      }
83  
84      @Override
85      public void readFields(DataInput in) throws IOException {
86        delegate.readFields(in);
87      }
88    }
89  
90    static class TableSnapshotRecordReader
91      implements RecordReader<ImmutableBytesWritable, Result> {
92  
93      private TableSnapshotInputFormatImpl.RecordReader delegate;
94  
95      public TableSnapshotRecordReader(TableSnapshotRegionSplit split, JobConf job)
96          throws IOException {
97        delegate = new TableSnapshotInputFormatImpl.RecordReader();
98        delegate.initialize(split.delegate, job);
99      }
100 
101     @Override
102     public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
103       if (!delegate.nextKeyValue()) {
104         return false;
105       }
106       ImmutableBytesWritable currentKey = delegate.getCurrentKey();
107       key.set(currentKey.get(), currentKey.getOffset(), currentKey.getLength());
108       value.copyFrom(delegate.getCurrentValue());
109       return true;
110     }
111 
112     @Override
113     public ImmutableBytesWritable createKey() {
114       return new ImmutableBytesWritable();
115     }
116 
117     @Override
118     public Result createValue() {
119       return new Result();
120     }
121 
122     @Override
123     public long getPos() throws IOException {
124       return delegate.getPos();
125     }
126 
127     @Override
128     public void close() throws IOException {
129       delegate.close();
130     }
131 
132     @Override
133     public float getProgress() throws IOException {
134       return delegate.getProgress();
135     }
136   }
137 
138   @Override
139   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
140     List<TableSnapshotInputFormatImpl.InputSplit> splits =
141       TableSnapshotInputFormatImpl.getSplits(job);
142     InputSplit[] results = new InputSplit[splits.size()];
143     for (int i = 0; i < splits.size(); i++) {
144       results[i] = new TableSnapshotRegionSplit(splits.get(i));
145     }
146     return results;
147   }
148 
149   @Override
150   public RecordReader<ImmutableBytesWritable, Result>
151   getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
152     return new TableSnapshotRecordReader((TableSnapshotRegionSplit) split, job);
153   }
154 
155   /**
156    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
157    * @param job the job to configure
158    * @param snapshotName the name of the snapshot to read from
159    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
160    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
161    * After the job is finished, restoreDir can be deleted.
162    * @throws IOException if an error occurs
163    */
164   public static void setInput(JobConf job, String snapshotName, Path restoreDir)
165       throws IOException {
166     TableSnapshotInputFormatImpl.setInput(job, snapshotName, restoreDir);
167   }
168 }