1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.mapreduce;
20
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FileSystem;
27 import org.apache.hadoop.fs.Path;
28 import org.apache.hadoop.hbase.HRegionInfo;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
33 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
34 import org.apache.hadoop.hbase.util.ConfigurationUtil;
35 import org.apache.hadoop.hbase.util.FSUtils;
36
37 import java.io.IOException;
38 import java.util.AbstractMap;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.UUID;
43
44
45
46
47
48
49
50 @InterfaceAudience.LimitedPrivate({ "HBase" })
51 @InterfaceStability.Evolving
52 public class MultiTableSnapshotInputFormatImpl {
53
54 private static final Log LOG = LogFactory.getLog(MultiTableSnapshotInputFormatImpl.class);
55
56 public static final String RESTORE_DIRS_KEY =
57 "hbase.MultiTableSnapshotInputFormat.restore.snapshotDirMapping";
58 public static final String SNAPSHOT_TO_SCANS_KEY =
59 "hbase.MultiTableSnapshotInputFormat.snapshotsToScans";
60
61
62
63
64
65
66
67
68
69
70
71 public void setInput(Configuration conf, Map<String, Collection<Scan>> snapshotScans,
72 Path restoreDir) throws IOException {
73 Path rootDir = FSUtils.getRootDir(conf);
74 FileSystem fs = rootDir.getFileSystem(conf);
75
76 setSnapshotToScans(conf, snapshotScans);
77 Map<String, Path> restoreDirs =
78 generateSnapshotToRestoreDirMapping(snapshotScans.keySet(), restoreDir);
79 setSnapshotDirs(conf, restoreDirs);
80 restoreSnapshots(conf, restoreDirs, fs);
81 }
82
83
84
85
86
87
88
89
90
91
92 public List<TableSnapshotInputFormatImpl.InputSplit> getSplits(Configuration conf)
93 throws IOException {
94 Path rootDir = FSUtils.getRootDir(conf);
95 FileSystem fs = rootDir.getFileSystem(conf);
96
97 List<TableSnapshotInputFormatImpl.InputSplit> rtn = Lists.newArrayList();
98
99 Map<String, Collection<Scan>> snapshotsToScans = getSnapshotsToScans(conf);
100 Map<String, Path> snapshotsToRestoreDirs = getSnapshotDirs(conf);
101 for (Map.Entry<String, Collection<Scan>> entry : snapshotsToScans.entrySet()) {
102 String snapshotName = entry.getKey();
103
104 Path restoreDir = snapshotsToRestoreDirs.get(snapshotName);
105
106 SnapshotManifest manifest =
107 TableSnapshotInputFormatImpl.getSnapshotManifest(conf, snapshotName, rootDir, fs);
108 List<HRegionInfo> regionInfos =
109 TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest);
110
111 for (Scan scan : entry.getValue()) {
112 List<TableSnapshotInputFormatImpl.InputSplit> splits =
113 TableSnapshotInputFormatImpl.getSplits(scan, manifest, regionInfos, restoreDir, conf);
114 rtn.addAll(splits);
115 }
116 }
117 return rtn;
118 }
119
120
121
122
123
124
125
126
127
128 public Map<String, Collection<Scan>> getSnapshotsToScans(Configuration conf) throws IOException {
129
130 Map<String, Collection<Scan>> rtn = Maps.newHashMap();
131
132 for (Map.Entry<String, String> entry : ConfigurationUtil
133 .getKeyValues(conf, SNAPSHOT_TO_SCANS_KEY)) {
134 String snapshotName = entry.getKey();
135 String scan = entry.getValue();
136
137 Collection<Scan> snapshotScans = rtn.get(snapshotName);
138 if (snapshotScans == null) {
139 snapshotScans = Lists.newArrayList();
140 rtn.put(snapshotName, snapshotScans);
141 }
142
143 snapshotScans.add(TableMapReduceUtil.convertStringToScan(scan));
144 }
145
146 return rtn;
147 }
148
149
150
151
152
153
154
155
156 public void setSnapshotToScans(Configuration conf, Map<String, Collection<Scan>> snapshotScans)
157 throws IOException {
158
159 List<Map.Entry<String, String>> snapshotToSerializedScans = Lists.newArrayList();
160
161 for (Map.Entry<String, Collection<Scan>> entry : snapshotScans.entrySet()) {
162 String snapshotName = entry.getKey();
163 Collection<Scan> scans = entry.getValue();
164
165
166 for (Scan scan : scans) {
167 snapshotToSerializedScans.add(new AbstractMap.SimpleImmutableEntry<String, String>(snapshotName,
168 TableMapReduceUtil.convertScanToString(scan)));
169 }
170 }
171
172 ConfigurationUtil.setKeyValues(conf, SNAPSHOT_TO_SCANS_KEY, snapshotToSerializedScans);
173 }
174
175
176
177
178
179
180
181
182
183 public Map<String, Path> getSnapshotDirs(Configuration conf) throws IOException {
184 List<Map.Entry<String, String>> kvps = ConfigurationUtil.getKeyValues(conf, RESTORE_DIRS_KEY);
185 Map<String, Path> rtn = Maps.newHashMapWithExpectedSize(kvps.size());
186
187 for (Map.Entry<String, String> kvp : kvps) {
188 rtn.put(kvp.getKey(), new Path(kvp.getValue()));
189 }
190
191 return rtn;
192 }
193
194 public void setSnapshotDirs(Configuration conf, Map<String, Path> snapshotDirs) {
195 Map<String, String> toSet = Maps.newHashMap();
196
197 for (Map.Entry<String, Path> entry : snapshotDirs.entrySet()) {
198 toSet.put(entry.getKey(), entry.getValue().toString());
199 }
200
201 ConfigurationUtil.setKeyValues(conf, RESTORE_DIRS_KEY, toSet.entrySet());
202 }
203
204
205
206
207
208
209
210
211
212 private Map<String, Path> generateSnapshotToRestoreDirMapping(Collection<String> snapshots,
213 Path baseRestoreDir) {
214 Map<String, Path> rtn = Maps.newHashMap();
215
216 for (String snapshotName : snapshots) {
217 Path restoreSnapshotDir =
218 new Path(baseRestoreDir, snapshotName + "__" + UUID.randomUUID().toString());
219 rtn.put(snapshotName, restoreSnapshotDir);
220 }
221
222 return rtn;
223 }
224
225
226
227
228
229
230
231
232
233 public void restoreSnapshots(Configuration conf, Map<String, Path> snapshotToDir, FileSystem fs)
234 throws IOException {
235
236 Path rootDir = FSUtils.getRootDir(conf);
237
238 for (Map.Entry<String, Path> entry : snapshotToDir.entrySet()) {
239 String snapshotName = entry.getKey();
240 Path restoreDir = entry.getValue();
241 LOG.info("Restoring snapshot " + snapshotName + " into " + restoreDir
242 + " for MultiTableSnapshotInputFormat");
243 restoreSnapshot(conf, snapshotName, rootDir, restoreDir, fs);
244 }
245 }
246
247 void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir,
248 FileSystem fs) throws IOException {
249 RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName);
250 }
251
252
253
254 }