001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.AbstractMap; 023import java.util.Collection; 024import java.util.List; 025import java.util.Map; 026import java.util.UUID; 027 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.fs.FileSystem; 030import org.apache.hadoop.fs.Path; 031import org.apache.hadoop.hbase.HRegionInfo; 032import org.apache.hadoop.hbase.client.Scan; 033import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; 034import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 035import org.apache.hadoop.hbase.util.ConfigurationUtil; 036import org.apache.hadoop.hbase.util.FSUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.apache.yetus.audience.InterfaceStability; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 043import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 044 045/** 046 * Shared implementation of mapreduce code over multiple table snapshots. 047 * Utilized by both mapreduce ({@link org.apache.hadoop.hbase.mapreduce 048 * .MultiTableSnapshotInputFormat} and mapred 049 * ({@link org.apache.hadoop.hbase.mapred.MultiTableSnapshotInputFormat} implementations. 050 */ 051@InterfaceAudience.LimitedPrivate({ "HBase" }) 052@InterfaceStability.Evolving 053public class MultiTableSnapshotInputFormatImpl { 054 private static final Logger LOG = 055 LoggerFactory.getLogger(MultiTableSnapshotInputFormatImpl.class); 056 057 public static final String RESTORE_DIRS_KEY = 058 "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping"; 059 public static final String SNAPSHOT_TO_SCANS_KEY = 060 "hbase.MultiTableSnapshotInputFormat.snapshotsToScans"; 061 062 /** 063 * Configure conf to read from snapshotScans, with snapshots restored to a subdirectory of 064 * restoreDir. 065 * Sets: {@link #RESTORE_DIRS_KEY}, {@link #SNAPSHOT_TO_SCANS_KEY} 066 * 067 * @param conf 068 * @param snapshotScans 069 * @param restoreDir 070 * @throws IOException 071 */ 072 public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans, 073 Path restoreDir) throws IOException { 074 Path rootDir = FSUtils.getRootDir(conf); 075 FileSystem fs = rootDir.getFileSystem(conf); 076 077 setSnapshotToScans(conf, snapshotScans); 078 Map<String, Path> restoreDirs = 079 generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir); 080 setSnapshotDirs(conf, restoreDirs); 081 restoreSnapshots(conf, restoreDirs, fs); 082 } 083 084 /** 085 * Return the list of splits extracted from the scans/snapshots pushed to conf by 086 * {@link 087 * #setInput(org.apache.hadoop.conf.Configuration, java.util.Map, org.apache.hadoop.fs.Path)} 088 * 089 * @param conf Configuration to determine splits from 090 * @return Return the list of splits extracted from the scans/snapshots pushed to conf 091 * @throws IOException 092 */ 093 public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf) 094 throws IOException { 095 Path rootDir = FSUtils.getRootDir(conf); 096 FileSystem fs = rootDir.getFileSystem(conf); 097 098 List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList(); 099 100 Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf); 101 Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf); 102 for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) { 103 String snapshotName = entry.getKey(); 104 105 Path restoreDir = snapshotsToRestoreDirs.get(snapshotName); 106 107 SnapshotManifest manifest = 108 TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs); 109 List<HRegionInfo> regionInfos = 110 TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); 111 112 for (Scan scan : entry.getValue()) { 113 List<TableSnapshotInputFormatImpl.InputSplit> splits = 114 TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf); 115 rtn.addAll(splits); 116 } 117 } 118 return rtn; 119 } 120 121 /** 122 * Retrieve the snapshot name -> list<scan> mapping pushed to configuration by 123 * {@link #setSnapshotToScans(org.apache.hadoop.conf.Configuration, java.util.Map)} 124 * 125 * @param conf Configuration to extract name -> list<scan> mappings from. 126 * @return the snapshot name -> list<scan> mapping pushed to configuration 127 * @throws IOException 128 */ 129 public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException { 130 131 Map<String, Collection<Scan>> rtn = Maps.newHashMap(); 132 133 for (Map.Entry<String, String> entry : ConfigurationUtil 134 .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) { 135 String snapshotName = entry.getKey(); 136 String scan = entry.getValue(); 137 138 Collection<Scan> snapshotScans = rtn.get(snapshotName); 139 if (snapshotScans == null) { 140 snapshotScans = Lists.newArrayList(); 141 rtn.put(snapshotName, snapshotScans); 142 } 143 144 snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan)); 145 } 146 147 return rtn; 148 } 149 150 /** 151 * Push snapshotScans to conf (under the key {@link #SNAPSHOT_TO_SCANS_KEY}) 152 * 153 * @param conf 154 * @param snapshotScans 155 * @throws IOException 156 */ 157 public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans) 158 throws IOException { 159 // flatten out snapshotScans for serialization to the job conf 160 List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList(); 161 162 for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) { 163 String snapshotName = entry.getKey(); 164 Collection<Scan> scans = entry.getValue(); 165 166 // serialize all scans and map them to the appropriate snapshot 167 for (Scan scan : scans) { 168 snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<>(snapshotName, 169 TableMapReduceUtil.convertScanToString(scan))); 170 } 171 } 172 173 ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans); 174 } 175 176 /** 177 * Retrieve the directories into which snapshots have been restored from 178 * ({@link #RESTORE_DIRS_KEY}) 179 * 180 * @param conf Configuration to extract restore directories from 181 * @return the directories into which snapshots have been restored from 182 * @throws IOException 183 */ 184 public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException { 185 List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY); 186 Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size()); 187 188 for (Map.Entry<String, String> kvp : kvps) { 189 rtn.put(kvp.getKey(), new Path(kvp.getValue())); 190 } 191 192 return rtn; 193 } 194 195 public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) { 196 Map<String, String> toSet = Maps.newHashMap(); 197 198 for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) { 199 toSet.put(entry.getKey(), entry.getValue().toString()); 200 } 201 202 ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet()); 203 } 204 205 /** 206 * Generate a random path underneath baseRestoreDir for each snapshot in snapshots and 207 * return a map from the snapshot to the restore directory. 208 * 209 * @param snapshots collection of snapshot names to restore 210 * @param baseRestoreDir base directory under which all snapshots in snapshots will be restored 211 * @return a mapping from snapshot name to the directory in which that snapshot has been restored 212 */ 213 private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots, 214 Path baseRestoreDir) { 215 Map<String, Path> rtn = Maps.newHashMap(); 216 217 for (String snapshotName : snapshots) { 218 Path restoreSnapshotDir = 219 new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString()); 220 rtn.put(snapshotName, restoreSnapshotDir); 221 } 222 223 return rtn; 224 } 225 226 /** 227 * Restore each (snapshot name, restore directory) pair in snapshotToDir 228 * 229 * @param conf configuration to restore with 230 * @param snapshotToDir mapping from snapshot names to restore directories 231 * @param fs filesystem to do snapshot restoration on 232 * @throws IOException 233 */ 234 public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs) 235 throws IOException { 236 // TODO: restore from record readers to parallelize. 237 Path rootDir = FSUtils.getRootDir(conf); 238 239 for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) { 240 String snapshotName = entry.getKey(); 241 Path restoreDir = entry.getValue(); 242 LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir 243 + " for MultiTableSnapshotInputFormat"); 244 restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs); 245 } 246 } 247 248 void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, 249 FileSystem fs) throws IOException { 250 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); 251 } 252 253}