001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import java.util.AbstractMap;
023import java.util.Collection;
024import java.util.List;
025import java.util.Map;
026import java.util.UUID;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.HRegionInfo;
031import org.apache.hadoop.hbase.client.Scan;
032import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
033import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.hbase.util.ConfigurationUtil;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
042import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
043
044/**
045 * Shared implementation of mapreduce code over multiple table snapshots.
046 * Utilized by both mapreduce
047 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred
048 * {@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
049 */
050@InterfaceAudience.LimitedPrivate({ "HBase" })
051@InterfaceStability.Evolving
052public class MultiTableSnapshotInputFormatImpl {
053  private static final Logger LOG =
054      LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class);
055
056  public static final String RESTORE_DIRS_KEY =
057      "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
058  public static final String SNAPSHOT_TO_SCANS_KEY =
059      "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
060
061  /**
062   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
063   * restoreDir.
064   * <p/>
065   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
066   */
067  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
068      Path restoreDir) throws IOException {
069    Path rootDir = CommonFSUtils.getRootDir(conf);
070    FileSystem fs = rootDir.getFileSystem(conf);
071
072    setSnapshotToScans(conf, snapshotScans);
073    Map<String, Path> restoreDirs =
074        generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
075    setSnapshotDirs(conf, restoreDirs);
076    restoreSnapshots(conf, restoreDirs, fs);
077  }
078
079  /**
080   * Return the list of splits extracted from the scans/snapshots pushed to conf by
081   * {@link
082   * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)}
083   *
084   * @param conf Configuration to determine splits from
085   * @return Return the list of splits extracted from the scans/snapshots pushed to conf
086   * @throws IOException
087   */
088  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
089      throws IOException {
090    Path rootDir = CommonFSUtils.getRootDir(conf);
091    FileSystem fs = rootDir.getFileSystem(conf);
092
093    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
094
095    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
096    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
097    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
098      String snapshotName = entry.getKey();
099
100      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
101
102      SnapshotManifest manifest =
103          TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
104      List<HRegionInfo> regionInfos =
105          TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
106
107      for (Scan scan : entry.getValue()) {
108        List<TableSnapshotInputFormatImpl.InputSplit> splits =
109            TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
110        rtn.addAll(splits);
111      }
112    }
113    return rtn;
114  }
115
116  /**
117   * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
118   * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)}
119   *
120   * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
121   * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration
122   * @throws IOException
123   */
124  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
125
126    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
127
128    for (Map.Entry<String, String> entry : ConfigurationUtil
129        .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
130      String snapshotName = entry.getKey();
131      String scan = entry.getValue();
132
133      Collection<Scan> snapshotScans = rtn.get(snapshotName);
134      if (snapshotScans == null) {
135        snapshotScans = Lists.newArrayList();
136        rtn.put(snapshotName, snapshotScans);
137      }
138
139      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
140    }
141
142    return rtn;
143  }
144
145  /**
146   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY})
147   *
148   * @param conf
149   * @param snapshotScans
150   * @throws IOException
151   */
152  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
153      throws IOException {
154    // flatten out snapshotScans for serialization to the job conf
155    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
156
157    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
158      String snapshotName = entry.getKey();
159      Collection<Scan> scans = entry.getValue();
160
161      // serialize all scans and map them to the appropriate snapshot
162      for (Scan scan : scans) {
163        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
164            TableMapReduceUtil.convertScanToString(scan)));
165      }
166    }
167
168    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
169  }
170
171  /**
172   * Retrieve the directories into which snapshots have been restored from
173   * ({@link #RESTORE_DIRS_KEY})
174   *
175   * @param conf Configuration to extract restore directories from
176   * @return the directories into which snapshots have been restored from
177   * @throws IOException
178   */
179  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
180    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
181    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
182
183    for (Map.Entry<String, String> kvp : kvps) {
184      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
185    }
186
187    return rtn;
188  }
189
190  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
191    Map<String, String> toSet = Maps.newHashMap();
192
193    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
194      toSet.put(entry.getKey(), entry.getValue().toString());
195    }
196
197    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
198  }
199
200  /**
201   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and
202   * return a map from the snapshot to the restore directory.
203   *
204   * @param snapshots      collection of snapshot names to restore
205   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
206   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
207   */
208  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
209      Path baseRestoreDir) {
210    Map<String, Path> rtn = Maps.newHashMap();
211
212    for (String snapshotName : snapshots) {
213      Path restoreSnapshotDir =
214          new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
215      rtn.put(snapshotName, restoreSnapshotDir);
216    }
217
218    return rtn;
219  }
220
221  /**
222   * Restore each (snapshot name, restore directory) pair in snapshotToDir
223   *
224   * @param conf          configuration to restore with
225   * @param snapshotToDir mapping from snapshot names to restore directories
226   * @param fs            filesystem to do snapshot restoration on
227   */
228  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
229      throws IOException {
230    // TODO: restore from record readers to parallelize.
231    Path rootDir = CommonFSUtils.getRootDir(conf);
232
233    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
234      String snapshotName = entry.getKey();
235      Path restoreDir = entry.getValue();
236      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
237          + " for MultiTableSnapshotInputFormat");
238      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
239    }
240  }
241
242  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
243      FileSystem fs) throws IOException {
244    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
245  }
246
247}