001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.AbstractMap;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.UUID;
027
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.fs.FileSystem;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.HRegionInfo;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
034import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
035import org.apache.hadoop.hbase.util.ConfigurationUtil;
036import org.apache.hadoop.hbase.util.FSUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.apache.yetus.audience.InterfaceStability;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
043import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
044
045/**
046 * Shared implementation of mapreduce code over multiple table snapshots.
047 * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce
048 * .MultiTableSnapshotInputFormat} and mapred
049 * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
050 */
051@InterfaceAudience.LimitedPrivate({ "HBase" })
052@InterfaceStability.Evolving
053public class MultiTableSnapshotInputFormatImpl {
054  private static final Logger LOG =
055      LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class);
056
057  public static final String RESTORE_DIRS_KEY =
058      "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
059  public static final String SNAPSHOT_TO_SCANS_KEY =
060      "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
061
062  /**
063   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
064   * restoreDir.
065   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
066   *
067   * @param conf
068   * @param snapshotScans
069   * @param restoreDir
070   * @throws IOException
071   */
072  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
073      Path restoreDir) throws IOException {
074    Path rootDir = FSUtils.getRootDir(conf);
075    FileSystem fs = rootDir.getFileSystem(conf);
076
077    setSnapshotToScans(conf, snapshotScans);
078    Map<String, Path> restoreDirs =
079        generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
080    setSnapshotDirs(conf, restoreDirs);
081    restoreSnapshots(conf, restoreDirs, fs);
082  }
083
084  /**
085   * Return the list of splits extracted from the scans/snapshots pushed to conf by
086   * {@link
087   * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
088   *
089   * @param conf Configuration to determine splits from
090   * @return Return the list of splits extracted from the scans/snapshots pushed to conf
091   * @throws IOException
092   */
093  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
094      throws IOException {
095    Path rootDir = FSUtils.getRootDir(conf);
096    FileSystem fs = rootDir.getFileSystem(conf);
097
098    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
099
100    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
101    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
102    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
103      String snapshotName = entry.getKey();
104
105      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
106
107      SnapshotManifest manifest =
108          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
109      List<HRegionInfo> regionInfos =
110          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
111
112      for (Scan scan : entry.getValue()) {
113        List<TableSnapshotInputFormatImpl.InputSplit> splits =
114            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
115        rtn.addAll(splits);
116      }
117    }
118    return rtn;
119  }
120
121  /**
122   * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
123   * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
124   *
125   * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
126   * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration
127   * @throws IOException
128   */
129  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
130
131    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
132
133    for (Map.Entry<String, String> entry : ConfigurationUtil
134        .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
135      String snapshotName = entry.getKey();
136      String scan = entry.getValue();
137
138      Collection<Scan> snapshotScans = rtn.get(snapshotName);
139      if (snapshotScans == null) {
140        snapshotScans = Lists.newArrayList();
141        rtn.put(snapshotName, snapshotScans);
142      }
143
144      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
145    }
146
147    return rtn;
148  }
149
150  /**
151   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
152   *
153   * @param conf
154   * @param snapshotScans
155   * @throws IOException
156   */
157  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
158      throws IOException {
159    // flatten out snapshotScans for serialization to the job conf
160    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
161
162    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
163      String snapshotName = entry.getKey();
164      Collection<Scan> scans = entry.getValue();
165
166      // serialize all scans and map them to the appropriate snapshot
167      for (Scan scan : scans) {
168        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
169            TableMapReduceUtil.convertScanToString(scan)));
170      }
171    }
172
173    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
174  }
175
176  /**
177   * Retrieve the directories into which snapshots have been restored from
178   * ({@link #RESTORE_DIRS_KEY})
179   *
180   * @param conf Configuration to extract restore directories from
181   * @return the directories into which snapshots have been restored from
182   * @throws IOException
183   */
184  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
185    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
186    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
187
188    for (Map.Entry<String, String> kvp : kvps) {
189      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
190    }
191
192    return rtn;
193  }
194
195  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
196    Map<String, String> toSet = Maps.newHashMap();
197
198    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
199      toSet.put(entry.getKey(), entry.getValue().toString());
200    }
201
202    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
203  }
204
205  /**
206   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
207   * return a map from the snapshot to the restore directory.
208   *
209   * @param snapshots      collection of snapshot names to restore
210   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
211   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
212   */
213  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
214      Path baseRestoreDir) {
215    Map<String, Path> rtn = Maps.newHashMap();
216
217    for (String snapshotName : snapshots) {
218      Path restoreSnapshotDir =
219          new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
220      rtn.put(snapshotName, restoreSnapshotDir);
221    }
222
223    return rtn;
224  }
225
226  /**
227   * Restore each (snapshot name, restore directory) pair in snapshotToDir
228   *
229   * @param conf          configuration to restore with
230   * @param snapshotToDir mapping from snapshot names to restore directories
231   * @param fs            filesystem to do snapshot restoration on
232   * @throws IOException
233   */
234  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
235      throws IOException {
236    // TODO: restore from record readers to parallelize.
237    Path rootDir = FSUtils.getRootDir(conf);
238
239    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
240      String snapshotName = entry.getKey();
241      Path restoreDir = entry.getValue();
242      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
243          + " for MultiTableSnapshotInputFormat");
244      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
245    }
246  }
247
248  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
249      FileSystem fs) throws IOException {
250    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
251  }
252
253}