View Javadoc

1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import com.google.common.collect.Lists;
22  import com.google.common.collect.Maps;
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.conf.Configuration;
26  import org.apache.hadoop.fs.FileSystem;
27  import org.apache.hadoop.fs.Path;
28  import org.apache.hadoop.hbase.HRegionInfo;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.client.Scan;
32  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
33  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
34  import org.apache.hadoop.hbase.util.ConfigurationUtil;
35  import org.apache.hadoop.hbase.util.FSUtils;
36  
37  import java.io.IOException;
38  import java.util.AbstractMap;
39  import java.util.Collection;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.UUID;
43  
44  /**
45   * Shared implementation of mapreduce code over multiple table snapshots.
46   * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
47   * .MultiTableSnapshotInputFormat} and mapred
48   * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
49   */
50  @InterfaceAudience.LimitedPrivate({ "HBase" })
51  @InterfaceStability.Evolving
52  public class MultiTableSnapshotInputFormatImpl {
53  
54    private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
55  
56    public static final String RESTORE_DIRS_KEY =
57        "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
58    public static final String SNAPSHOT_TO_SCANS_KEY =
59        "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
60  
61    /**
62     * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
63     * restoreDir.
64     * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
65     *
66     * @param conf
67     * @param snapshotScans
68     * @param restoreDir
69     * @throws IOException
70     */
71    public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
72        Path restoreDir) throws IOException {
73      Path rootDir = FSUtils.getRootDir(conf);
74      FileSystem fs = rootDir.getFileSystem(conf);
75  
76      setSnapshotToScans(conf, snapshotScans);
77      Map<String, Path> restoreDirs =
78          generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
79      setSnapshotDirs(conf, restoreDirs);
80      restoreSnapshots(conf, restoreDirs, fs);
81    }
82  
83    /**
84     * Return the list of splits extracted from the scans/snapshots pushed to conf by
85     * {@link
86     * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
87     *
88     * @param conf Configuration to determine splits from
89     * @return Return the list of splits extracted from the scans/snapshots pushed to conf
90     * @throws IOException
91     */
92    public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
93        throws IOException {
94      Path rootDir = FSUtils.getRootDir(conf);
95      FileSystem fs = rootDir.getFileSystem(conf);
96  
97      List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
98  
99      Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
100     Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
101     for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
102       String snapshotName = entry.getKey();
103 
104       Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
105 
106       SnapshotManifest manifest =
107           TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
108       List<HRegionInfo> regionInfos =
109           TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
110 
111       for (Scan scan : entry.getValue()) {
112         List<TableSnapshotInputFormatImpl.InputSplit> splits =
113             TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
114         rtn.addAll(splits);
115       }
116     }
117     return rtn;
118   }
119 
120   /**
121    * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
122    * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
123    *
124    * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
125    * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration
126    * @throws IOException
127    */
128   public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
129 
130     Map<String, Collection<Scan>> rtn = Maps.newHashMap();
131 
132     for (Map.Entry<String, String> entry : ConfigurationUtil
133         .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
134       String snapshotName = entry.getKey();
135       String scan = entry.getValue();
136 
137       Collection<Scan> snapshotScans = rtn.get(snapshotName);
138       if (snapshotScans == null) {
139         snapshotScans = Lists.newArrayList();
140         rtn.put(snapshotName, snapshotScans);
141       }
142 
143       snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
144     }
145 
146     return rtn;
147   }
148 
149   /**
150    * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
151    *
152    * @param conf
153    * @param snapshotScans
154    * @throws IOException
155    */
156   public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
157       throws IOException {
158     // flatten out snapshotScans for serialization to the job conf
159     List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
160 
161     for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
162       String snapshotName = entry.getKey();
163       Collection<Scan> scans = entry.getValue();
164 
165       // serialize all scans and map them to the appropriate snapshot
166       for (Scan scan : scans) {
167         snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<String, String>(snapshotName,
168             TableMapReduceUtil.convertScanToString(scan)));
169       }
170     }
171 
172     ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
173   }
174 
175   /**
176    * Retrieve the directories into which snapshots have been restored from
177    * ({@link #RESTORE_DIRS_KEY})
178    *
179    * @param conf Configuration to extract restore directories from
180    * @return the directories into which snapshots have been restored from
181    * @throws IOException
182    */
183   public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
184     List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
185     Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
186 
187     for (Map.Entry<String, String> kvp : kvps) {
188       rtn.put(kvp.getKey(), new Path(kvp.getValue()));
189     }
190 
191     return rtn;
192   }
193 
194   public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
195     Map<String, String> toSet = Maps.newHashMap();
196 
197     for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
198       toSet.put(entry.getKey(), entry.getValue().toString());
199     }
200 
201     ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
202   }
203 
204   /**
205    * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
206    * return a map from the snapshot to the restore directory.
207    *
208    * @param snapshots      collection of snapshot names to restore
209    * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
210    * @return a mapping from snapshot name to the directory in which that snapshot has been restored
211    */
212   private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
213       Path baseRestoreDir) {
214     Map<String, Path> rtn = Maps.newHashMap();
215 
216     for (String snapshotName : snapshots) {
217       Path restoreSnapshotDir =
218           new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
219       rtn.put(snapshotName, restoreSnapshotDir);
220     }
221 
222     return rtn;
223   }
224 
225   /**
226    * Restore each (snapshot name, restore directory) pair in snapshotToDir
227    *
228    * @param conf          configuration to restore with
229    * @param snapshotToDir mapping from snapshot names to restore directories
230    * @param fs            filesystem to do snapshot restoration on
231    * @throws IOException
232    */
233   public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
234       throws IOException {
235     // TODO: restore from record readers to parallelize.
236     Path rootDir = FSUtils.getRootDir(conf);
237 
238     for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
239       String snapshotName = entry.getKey();
240       Path restoreDir = entry.getValue();
241       LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
242           + " for MultiTableSnapshotInputFormat");
243       restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
244     }
245   }
246 
247   void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
248       FileSystem fs) throws IOException {
249     RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
250   }
251 
252   // TODO: these probably belong elsewhere/may already be implemented elsewhere.
253 
254 }