View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import org.apache.hadoop.hbase.classification.InterfaceAudience;
22  import org.apache.hadoop.hbase.classification.InterfaceStability;
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.CellUtil;
27  import org.apache.hadoop.hbase.HDFSBlocksDistribution;
28  import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
29  import org.apache.hadoop.hbase.HRegionInfo;
30  import org.apache.hadoop.hbase.HTableDescriptor;
31  import org.apache.hadoop.hbase.client.ClientSideRegionScanner;
32  import org.apache.hadoop.hbase.client.IsolationLevel;
33  import org.apache.hadoop.hbase.client.Result;
34  import org.apache.hadoop.hbase.client.Scan;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
37  import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos.TableSnapshotRegionSplit;
38  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
39  import org.apache.hadoop.hbase.regionserver.HRegion;
40  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
41  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
42  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.FSUtils;
45  import org.apache.hadoop.io.Writable;
46  
47  import java.io.ByteArrayOutputStream;
48  import java.io.DataInput;
49  import java.io.DataOutput;
50  import java.io.IOException;
51  import java.util.ArrayList;
52  import java.util.List;
53  import java.util.UUID;
54  
55  /**
56   * Hadoop MR API-agnostic implementation for mapreduce over table snapshots.
57   */
58  @InterfaceAudience.Private
59  @InterfaceStability.Evolving
60  public class TableSnapshotInputFormatImpl {
61    // TODO: Snapshots files are owned in fs by the hbase user. There is no
62    // easy way to delegate access.
63  
64    private static final String SNAPSHOT_NAME_KEY = "hbase.TableSnapshotInputFormat.snapshot.name";
65    // key for specifying the root dir of the restored snapshot
66    private static final String RESTORE_DIR_KEY = "hbase.TableSnapshotInputFormat.restore.dir";
67  
68    /** See {@link #getBestLocations(Configuration, HDFSBlocksDistribution)} */
69    private static final String LOCALITY_CUTOFF_MULTIPLIER =
70      "hbase.tablesnapshotinputformat.locality.cutoff.multiplier";
71    private static final float DEFAULT_LOCALITY_CUTOFF_MULTIPLIER = 0.8f;
72  
73    /**
74     * Implementation class for InputSplit logic common between mapred and mapreduce.
75     */
76    public static class InputSplit implements Writable {
77      private HTableDescriptor htd;
78      private HRegionInfo regionInfo;
79      private String[] locations;
80  
81      // constructor for mapreduce framework / Writable
82      public InputSplit() {}
83  
84      public InputSplit(HTableDescriptor htd, HRegionInfo regionInfo, List<String> locations) {
85        this.htd = htd;
86        this.regionInfo = regionInfo;
87        if (locations == null || locations.isEmpty()) {
88          this.locations = new String[0];
89        } else {
90          this.locations = locations.toArray(new String[locations.size()]);
91        }
92      }
93  
94      public long getLength() {
95        //TODO: We can obtain the file sizes of the snapshot here.
96        return 0;
97      }
98  
99      public String[] getLocations() {
100       return locations;
101     }
102 
103     public HTableDescriptor getTableDescriptor() {
104       return htd;
105     }
106 
107     public HRegionInfo getRegionInfo() {
108       return regionInfo;
109     }
110 
111     // TODO: We should have ProtobufSerialization in Hadoop, and directly use PB objects instead of
112     // doing this wrapping with Writables.
113     @Override
114     public void write(DataOutput out) throws IOException {
115       TableSnapshotRegionSplit.Builder builder = TableSnapshotRegionSplit.newBuilder()
116           .setTable(htd.convert())
117           .setRegion(HRegionInfo.convert(regionInfo));
118 
119       for (String location : locations) {
120         builder.addLocations(location);
121       }
122 
123       TableSnapshotRegionSplit split = builder.build();
124 
125       ByteArrayOutputStream baos = new ByteArrayOutputStream();
126       split.writeTo(baos);
127       baos.close();
128       byte[] buf = baos.toByteArray();
129       out.writeInt(buf.length);
130       out.write(buf);
131     }
132 
133     @Override
134     public void readFields(DataInput in) throws IOException {
135       int len = in.readInt();
136       byte[] buf = new byte[len];
137       in.readFully(buf);
138       TableSnapshotRegionSplit split = TableSnapshotRegionSplit.PARSER.parseFrom(buf);
139       this.htd = HTableDescriptor.convert(split.getTable());
140       this.regionInfo = HRegionInfo.convert(split.getRegion());
141       List<String> locationsList = split.getLocationsList();
142       this.locations = locationsList.toArray(new String[locationsList.size()]);
143     }
144   }
145 
146   /**
147    * Implementation class for RecordReader logic common between mapred and mapreduce.
148    */
149   public static class RecordReader {
150     private InputSplit split;
151     private Scan scan;
152     private Result result = null;
153     private ImmutableBytesWritable row = null;
154     private ClientSideRegionScanner scanner;
155 
156     public ClientSideRegionScanner getScanner() {
157       return scanner;
158     }
159 
160     public void initialize(InputSplit split, Configuration conf) throws IOException {
161       this.split = split;
162       HTableDescriptor htd = split.htd;
163       HRegionInfo hri = this.split.getRegionInfo();
164       FileSystem fs = FSUtils.getCurrentFileSystem(conf);
165 
166       Path tmpRootDir = new Path(conf.get(RESTORE_DIR_KEY)); // This is the user specified root
167       // directory where snapshot was restored
168 
169       // create scan
170       // TODO: mapred does not support scan as input API. Work around for now.
171       if (conf.get(TableInputFormat.SCAN) != null) {
172         scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
173       } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
174         String[] columns =
175           conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
176         scan = new Scan();
177         for (String col : columns) {
178           scan.addFamily(Bytes.toBytes(col));
179         }
180       } else {
181         throw new IllegalArgumentException("A Scan is not configured for this job");
182       }
183 
184       // region is immutable, this should be fine,
185       // otherwise we have to set the thread read point
186       scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
187       // disable caching of data blocks
188       scan.setCacheBlocks(false);
189 
190       scanner = new ClientSideRegionScanner(conf, fs, tmpRootDir, htd, hri, scan, null);
191     }
192 
193     public boolean nextKeyValue() throws IOException {
194       result = scanner.next();
195       if (result == null) {
196         //we are done
197         return false;
198       }
199 
200       if (this.row == null) {
201         this.row = new ImmutableBytesWritable();
202       }
203       this.row.set(result.getRow());
204       return true;
205     }
206 
207     public ImmutableBytesWritable getCurrentKey() {
208       return row;
209     }
210 
211     public Result getCurrentValue() {
212       return result;
213     }
214 
215     public long getPos() {
216       return 0;
217     }
218 
219     public float getProgress() {
220       return 0; // TODO: use total bytes to estimate
221     }
222 
223     public void close() {
224       if (this.scanner != null) {
225         this.scanner.close();
226       }
227     }
228   }
229 
230   public static List<InputSplit> getSplits(Configuration conf) throws IOException {
231     String snapshotName = getSnapshotName(conf);
232 
233     Path rootDir = FSUtils.getRootDir(conf);
234     FileSystem fs = rootDir.getFileSystem(conf);
235 
236     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
237     SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
238     SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc);
239     List<SnapshotRegionManifest> regionManifests = manifest.getRegionManifests();
240     if (regionManifests == null) {
241       throw new IllegalArgumentException("Snapshot seems empty");
242     }
243 
244     // load table descriptor
245     HTableDescriptor htd = manifest.getTableDescriptor();
246 
247     // TODO: mapred does not support scan as input API. Work around for now.
248     Scan scan = null;
249     if (conf.get(TableInputFormat.SCAN) != null) {
250       scan = TableMapReduceUtil.convertStringToScan(conf.get(TableInputFormat.SCAN));
251     } else if (conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST) != null) {
252       String[] columns =
253         conf.get(org.apache.hadoop.hbase.mapred.TableInputFormat.COLUMN_LIST).split(" ");
254       scan = new Scan();
255       for (String col : columns) {
256         scan.addFamily(Bytes.toBytes(col));
257       }
258     } else {
259       throw new IllegalArgumentException("Unable to create scan");
260     }
261     // the temp dir where the snapshot is restored
262     Path restoreDir = new Path(conf.get(RESTORE_DIR_KEY));
263     Path tableDir = FSUtils.getTableDir(restoreDir, htd.getTableName());
264 
265     List<InputSplit> splits = new ArrayList<InputSplit>();
266     for (SnapshotRegionManifest regionManifest : regionManifests) {
267       // load region descriptor
268       HRegionInfo hri = HRegionInfo.convert(regionManifest.getRegionInfo());
269 
270       if (CellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(),
271         hri.getStartKey(), hri.getEndKey())) {
272         // compute HDFS locations from snapshot files (which will get the locations for
273         // referred hfiles)
274         List<String> hosts = getBestLocations(conf,
275           HRegion.computeHDFSBlocksDistribution(conf, htd, hri, tableDir));
276 
277         int len = Math.min(3, hosts.size());
278         hosts = hosts.subList(0, len);
279         splits.add(new InputSplit(htd, hri, hosts));
280       }
281     }
282 
283     return splits;
284   }
285 
286   /**
287    * This computes the locations to be passed from the InputSplit. MR/Yarn schedulers does not take
288    * weights into account, thus will treat every location passed from the input split as equal. We
289    * do not want to blindly pass all the locations, since we are creating one split per region, and
290    * the region's blocks are all distributed throughout the cluster unless favorite node assignment
291    * is used. On the expected stable case, only one location will contain most of the blocks as
292    * local.
293    * On the other hand, in favored node assignment, 3 nodes will contain highly local blocks. Here
294    * we are doing a simple heuristic, where we will pass all hosts which have at least 80%
295    * (hbase.tablesnapshotinputformat.locality.cutoff.multiplier) as much block locality as the top
296    * host with the best locality.
297    */
298   public static List<String> getBestLocations(
299       Configuration conf, HDFSBlocksDistribution blockDistribution) {
300     List<String> locations = new ArrayList<String>(3);
301 
302     HostAndWeight[] hostAndWeights = blockDistribution.getTopHostsWithWeights();
303 
304     if (hostAndWeights.length == 0) {
305       return locations;
306     }
307 
308     HostAndWeight topHost = hostAndWeights[0];
309     locations.add(topHost.getHost());
310 
311     // Heuristic: filter all hosts which have at least cutoffMultiplier % of block locality
312     double cutoffMultiplier
313       = conf.getFloat(LOCALITY_CUTOFF_MULTIPLIER, DEFAULT_LOCALITY_CUTOFF_MULTIPLIER);
314 
315     double filterWeight = topHost.getWeight() * cutoffMultiplier;
316 
317     for (int i = 1; i < hostAndWeights.length; i++) {
318       if (hostAndWeights[i].getWeight() >= filterWeight) {
319         locations.add(hostAndWeights[i].getHost());
320       } else {
321         break;
322       }
323     }
324 
325     return locations;
326   }
327 
328   private static String getSnapshotName(Configuration conf) {
329     String snapshotName = conf.get(SNAPSHOT_NAME_KEY);
330     if (snapshotName == null) {
331       throw new IllegalArgumentException("Snapshot name must be provided");
332     }
333     return snapshotName;
334   }
335 
336   /**
337    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
338    * @param conf the job to configuration
339    * @param snapshotName the name of the snapshot to read from
340    * @param restoreDir a temporary directory to restore the snapshot into. Current user should
341    * have write permissions to this directory, and this should not be a subdirectory of rootdir.
342    * After the job is finished, restoreDir can be deleted.
343    * @throws IOException if an error occurs
344    */
345   public static void setInput(Configuration conf, String snapshotName, Path restoreDir)
346       throws IOException {
347     conf.set(SNAPSHOT_NAME_KEY, snapshotName);
348 
349     Path rootDir = FSUtils.getRootDir(conf);
350     FileSystem fs = rootDir.getFileSystem(conf);
351 
352     restoreDir = new Path(restoreDir, UUID.randomUUID().toString());
353 
354     // TODO: restore from record readers to parallelize.
355     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
356 
357     conf.set(RESTORE_DIR_KEY, restoreDir.toString());
358   }
359 }