001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.AbstractMap;
022import java.util.Collection;
023import java.util.List;
024import java.util.Map;
025import java.util.UUID;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.HRegionInfo;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
032import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
033import org.apache.hadoop.hbase.util.CommonFSUtils;
034import org.apache.hadoop.hbase.util.ConfigurationUtil;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.apache.yetus.audience.InterfaceStability;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
042
043/**
044 * Shared implementation of mapreduce code over multiple table snapshots. Utilized by both mapreduce
045 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred
046 * {@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations.
047 */
048@InterfaceAudience.LimitedPrivate({ "HBase" })
049@InterfaceStability.Evolving
050public class MultiTableSnapshotInputFormatImpl {
051  private static final Logger LOG =
052    LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class);
053
054  public static final String RESTORE_DIRS_KEY =
055    "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
056  public static final String SNAPSHOT_TO_SCANS_KEY =
057    "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
058
059  /**
060   * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of
061   * restoreDir.
062   * <p/>
063   * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY}
064   */
065  public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
066    Path restoreDir) throws IOException {
067    Path rootDir = CommonFSUtils.getRootDir(conf);
068    FileSystem fs = rootDir.getFileSystem(conf);
069
070    setSnapshotToScans(conf, snapshotScans);
071    Map<String, Path> restoreDirs =
072      generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
073    setSnapshotDirs(conf, restoreDirs);
074    restoreSnapshots(conf, restoreDirs, fs);
075  }
076
077  /**
078   * Return the list of splits extracted from the scans/snapshots pushed to conf by
079   * {@link #setInput(Configuration, Map, Path)}
080   * @param conf Configuration to determine splits from
081   * @return Return the list of splits extracted from the scans/snapshots pushed to conf n
082   */
083  public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
084    throws IOException {
085    Path rootDir = CommonFSUtils.getRootDir(conf);
086    FileSystem fs = rootDir.getFileSystem(conf);
087
088    List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
089
090    Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
091    Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
092    for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
093      String snapshotName = entry.getKey();
094
095      Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
096
097      SnapshotManifest manifest =
098        TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
099      List<HRegionInfo> regionInfos =
100        TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
101
102      for (Scan scan : entry.getValue()) {
103        List<TableSnapshotInputFormatImpl.InputSplit> splits =
104          TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
105        rtn.addAll(splits);
106      }
107    }
108    return rtn;
109  }
110
111  /**
112   * Retrieve the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration by
113   * {@link #setSnapshotToScans(Configuration, Map)}
114   * @param conf Configuration to extract name -&gt; list&lt;scan&gt; mappings from.
115   * @return the snapshot name -&gt; list&lt;scan&gt; mapping pushed to configuration n
116   */
117  public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
118
119    Map<String, Collection<Scan>> rtn = Maps.newHashMap();
120
121    for (Map.Entry<String, String> entry : ConfigurationUtil.getKeyValues(conf,
122      SNAPSHOT_TO_SCANS_KEY)) {
123      String snapshotName = entry.getKey();
124      String scan = entry.getValue();
125
126      Collection<Scan> snapshotScans = rtn.get(snapshotName);
127      if (snapshotScans == null) {
128        snapshotScans = Lists.newArrayList();
129        rtn.put(snapshotName, snapshotScans);
130      }
131
132      snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
133    }
134
135    return rtn;
136  }
137
138  /**
139   * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) nnn
140   */
141  public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
142    throws IOException {
143    // flatten out snapshotScans for serialization to the job conf
144    List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
145
146    for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
147      String snapshotName = entry.getKey();
148      Collection<Scan> scans = entry.getValue();
149
150      // serialize all scans and map them to the appropriate snapshot
151      for (Scan scan : scans) {
152        snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName,
153          TableMapReduceUtil.convertScanToString(scan)));
154      }
155    }
156
157    ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
158  }
159
160  /**
161   * Retrieve the directories into which snapshots have been restored from
162   * ({@link #RESTORE_DIRS_KEY})
163   * @param conf Configuration to extract restore directories from
164   * @return the directories into which snapshots have been restored from n
165   */
166  public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
167    List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
168    Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
169
170    for (Map.Entry<String, String> kvp : kvps) {
171      rtn.put(kvp.getKey(), new Path(kvp.getValue()));
172    }
173
174    return rtn;
175  }
176
177  public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
178    Map<String, String> toSet = Maps.newHashMap();
179
180    for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
181      toSet.put(entry.getKey(), entry.getValue().toString());
182    }
183
184    ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
185  }
186
187  /**
188   * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and return a
189   * map from the snapshot to the restore directory.
190   * @param snapshots      collection of snapshot names to restore
191   * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored
192   * @return a mapping from snapshot name to the directory in which that snapshot has been restored
193   */
194  private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
195    Path baseRestoreDir) {
196    Map<String, Path> rtn = Maps.newHashMap();
197
198    for (String snapshotName : snapshots) {
199      Path restoreSnapshotDir =
200        new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
201      rtn.put(snapshotName, restoreSnapshotDir);
202    }
203
204    return rtn;
205  }
206
207  /**
208   * Restore each (snapshot name, restore directory) pair in snapshotToDir
209   * @param conf          configuration to restore with
210   * @param snapshotToDir mapping from snapshot names to restore directories
211   * @param fs            filesystem to do snapshot restoration on
212   */
213  public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
214    throws IOException {
215    // TODO: restore from record readers to parallelize.
216    Path rootDir = CommonFSUtils.getRootDir(conf);
217
218    for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
219      String snapshotName = entry.getKey();
220      Path restoreDir = entry.getValue();
221      LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
222        + " for MultiTableSnapshotInputFormat");
223      restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
224    }
225  }
226
227  void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
228    FileSystem fs) throws IOException {
229    RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
230  }
231
232}