001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.AbstractMap; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.UUID; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.HRegionInfo; 031import org.apache.hadoop.hbase.client.Scan; 032import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 033import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 034import org.apache.hadoop.hbase.util.CommonFSUtils; 035import org.apache.hadoop.hbase.util.ConfigurationUtil; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.apache.yetus.audience.InterfaceStability; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 042import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 043 044/** 045 * Shared implementation of mapreduce code over multiple table snapshots. 046 * Utilized by both mapreduce 047 * {@link org.apache.hadoop.hbase.mapreduce.MultiTableSnapshotInputFormat} and mapred 048 * {@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. 049 */ 050@InterfaceAudience.LimitedPrivate({ "HBase" }) 051@InterfaceStability.Evolving 052public class MultiTableSnapshotInputFormatImpl { 053 private static final Logger LOG = 054 LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class); 055 056 public static final String RESTORE_DIRS_KEY = 057 "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; 058 public static final String SNAPSHOT_TO_SCANS_KEY = 059 "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; 060 061 /** 062 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 063 * restoreDir. 064 * <p/> 065 * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} 066 */ 067 public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 068 Path restoreDir) throws IOException { 069 Path rootDir = CommonFSUtils.getRootDir(conf); 070 FileSystem fs = rootDir.getFileSystem(conf); 071 072 setSnapshotToScans(conf, snapshotScans); 073 Map<String, Path> restoreDirs = 074 generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); 075 setSnapshotDirs(conf, restoreDirs); 076 restoreSnapshots(conf, restoreDirs, fs); 077 } 078 079 /** 080 * Return the list of splits extracted from the scans/snapshots pushed to conf by 081 * {@link 082 * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} 083 * 084 * @param conf Configuration to determine splits from 085 * @return Return the list of splits extracted from the scans/snapshots pushed to conf 086 * @throws IOException 087 */ 088 public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf) 089 throws IOException { 090 Path rootDir = CommonFSUtils.getRootDir(conf); 091 FileSystem fs = rootDir.getFileSystem(conf); 092 093 List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList(); 094 095 Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf); 096 Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf); 097 for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) { 098 String snapshotName = entry.getKey(); 099 100 Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); 101 102 SnapshotManifest manifest = 103 TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); 104 List<HRegionInfo> regionInfos = 105 TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); 106 107 for (Scan scan : entry.getValue()) { 108 List<TableSnapshotInputFormatImpl.InputSplit> splits = 109 TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); 110 rtn.addAll(splits); 111 } 112 } 113 return rtn; 114 } 115 116 /** 117 * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by 118 * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} 119 * 120 * @param conf Configuration to extract name -> list<scan> mappings from. 121 * @return the snapshot name -> list<scan> mapping pushed to configuration 122 * @throws IOException 123 */ 124 public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException { 125 126 Map<String, Collection<Scan>> rtn = Maps.newHashMap(); 127 128 for (Map.Entry<String, String> entry : ConfigurationUtil 129 .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { 130 String snapshotName = entry.getKey(); 131 String scan = entry.getValue(); 132 133 Collection<Scan> snapshotScans = rtn.get(snapshotName); 134 if (snapshotScans == null) { 135 snapshotScans = Lists.newArrayList(); 136 rtn.put(snapshotName, snapshotScans); 137 } 138 139 snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); 140 } 141 142 return rtn; 143 } 144 145 /** 146 * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) 147 * 148 * @param conf 149 * @param snapshotScans 150 * @throws IOException 151 */ 152 public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans) 153 throws IOException { 154 // flatten out snapshotScans for serialization to the job conf 155 List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList(); 156 157 for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { 158 String snapshotName = entry.getKey(); 159 Collection<Scan> scans = entry.getValue(); 160 161 // serialize all scans and map them to the appropriate snapshot 162 for (Scan scan : scans) { 163 snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, 164 TableMapReduceUtil.convertScanToString(scan))); 165 } 166 } 167 168 ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); 169 } 170 171 /** 172 * Retrieve the directories into which snapshots have been restored from 173 * ({@link #RESTORE_DIRS_KEY}) 174 * 175 * @param conf Configuration to extract restore directories from 176 * @return the directories into which snapshots have been restored from 177 * @throws IOException 178 */ 179 public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException { 180 List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); 181 Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size()); 182 183 for (Map.Entry<String, String> kvp : kvps) { 184 rtn.put(kvp.getKey(), new Path(kvp.getValue())); 185 } 186 187 return rtn; 188 } 189 190 public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) { 191 Map<String, String> toSet = Maps.newHashMap(); 192 193 for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { 194 toSet.put(entry.getKey(), entry.getValue().toString()); 195 } 196 197 ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); 198 } 199 200 /** 201 * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and 202 * return a map from the snapshot to the restore directory. 203 * 204 * @param snapshots collection of snapshot names to restore 205 * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored 206 * @return a mapping from snapshot name to the directory in which that snapshot has been restored 207 */ 208 private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots, 209 Path baseRestoreDir) { 210 Map<String, Path> rtn = Maps.newHashMap(); 211 212 for (String snapshotName : snapshots) { 213 Path restoreSnapshotDir = 214 new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); 215 rtn.put(snapshotName, restoreSnapshotDir); 216 } 217 218 return rtn; 219 } 220 221 /** 222 * Restore each (snapshot name, restore directory) pair in snapshotToDir 223 * 224 * @param conf configuration to restore with 225 * @param snapshotToDir mapping from snapshot names to restore directories 226 * @param fs filesystem to do snapshot restoration on 227 */ 228 public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs) 229 throws IOException { 230 // TODO: restore from record readers to parallelize. 231 Path rootDir = CommonFSUtils.getRootDir(conf); 232 233 for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) { 234 String snapshotName = entry.getKey(); 235 Path restoreDir = entry.getValue(); 236 LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir 237 + " for MultiTableSnapshotInputFormat"); 238 restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); 239 } 240 } 241 242 void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, 243 FileSystem fs) throws IOException { 244 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 245 } 246 247}