View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.snapshot;
20  
21  import com.google.protobuf.CodedInputStream;
22  import com.google.protobuf.InvalidProtocolBufferException;
23  
24  import java.io.FileNotFoundException;
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.HashMap;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.concurrent.ThreadPoolExecutor;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FSDataOutputStream;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HRegionInfo;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.classification.InterfaceAudience;
44  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
45  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
46  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
47  import org.apache.hadoop.hbase.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
48  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
49  import org.apache.hadoop.hbase.regionserver.HRegion;
50  import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
51  import org.apache.hadoop.hbase.regionserver.Store;
52  import org.apache.hadoop.hbase.regionserver.StoreFile;
53  import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
54  import org.apache.hadoop.hbase.util.Bytes;
55  import org.apache.hadoop.hbase.util.FSUtils;
56  import org.apache.hadoop.hbase.util.FSTableDescriptors;
57  import org.apache.hadoop.hbase.util.Threads;
58  
59  /**
60   * Utility class to help read/write the Snapshot Manifest.
61   *
62   * The snapshot format is transparent for the users of this class,
63   * once the snapshot is written, it will never be modified.
64   * On open() the snapshot will be loaded to the current in-memory format.
65   */
66  @InterfaceAudience.Private
67  public class SnapshotManifest {
68    private static final Log LOG = LogFactory.getLog(SnapshotManifest.class);
69  
70    public static final String SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY = "snapshot.manifest.size.limit";
71  
72    public static final String DATA_MANIFEST_NAME = "data.manifest";
73  
74    private List<SnapshotRegionManifest> regionManifests;
75    private SnapshotDescription desc;
76    private HTableDescriptor htd;
77  
78    private final ForeignExceptionSnare monitor;
79    private final Configuration conf;
80    private final Path workingDir;
81    private final FileSystem fs;
82    private int manifestSizeLimit;
83  
84    private SnapshotManifest(final Configuration conf, final FileSystem fs,
85        final Path workingDir, final SnapshotDescription desc,
86        final ForeignExceptionSnare monitor) {
87      this.monitor = monitor;
88      this.desc = desc;
89      this.workingDir = workingDir;
90      this.conf = conf;
91      this.fs = fs;
92  
93      this.manifestSizeLimit = conf.getInt(SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY, 64 * 1024 * 1024);
94    }
95  
96    /**
97     * Return a SnapshotManifest instance, used for writing a snapshot.
98     *
99     * There are two usage pattern:
100    *  - The Master will create a manifest, add the descriptor, offline regions
101    *    and consolidate the snapshot by writing all the pending stuff on-disk.
102    *      manifest = SnapshotManifest.create(...)
103    *      manifest.addRegion(tableDir, hri)
104    *      manifest.consolidate()
105    *  - The RegionServer will create a single region manifest
106    *      manifest = SnapshotManifest.create(...)
107    *      manifest.addRegion(region)
108    */
109   public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
110       final Path workingDir, final SnapshotDescription desc,
111       final ForeignExceptionSnare monitor) {
112     return new SnapshotManifest(conf, fs, workingDir, desc, monitor);
113   }
114 
115   /**
116    * Return a SnapshotManifest instance with the information already loaded in-memory.
117    *    SnapshotManifest manifest = SnapshotManifest.open(...)
118    *    HTableDescriptor htd = manifest.getTableDescriptor()
119    *    for (SnapshotRegionManifest regionManifest: manifest.getRegionManifests())
120    *      hri = regionManifest.getRegionInfo()
121    *      for (regionManifest.getFamilyFiles())
122    *        ...
123    */
124   public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
125       final Path workingDir, final SnapshotDescription desc) throws IOException {
126     SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null);
127     manifest.load();
128     return manifest;
129   }
130 
131 
132   /**
133    * Add the table descriptor to the snapshot manifest
134    */
135   public void addTableDescriptor(final HTableDescriptor htd) throws IOException {
136     this.htd = htd;
137   }
138 
139   interface RegionVisitor<TRegion, TFamily> {
140     TRegion regionOpen(final HRegionInfo regionInfo) throws IOException;
141     void regionClose(final TRegion region) throws IOException;
142 
143     TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
144     void familyClose(final TRegion region, final TFamily family) throws IOException;
145 
146     void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
147       throws IOException;
148   }
149 
150   private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
151     switch (getSnapshotFormat(desc)) {
152       case SnapshotManifestV1.DESCRIPTOR_VERSION:
153         return new SnapshotManifestV1.ManifestBuilder(conf, fs, workingDir);
154       case SnapshotManifestV2.DESCRIPTOR_VERSION:
155         return new SnapshotManifestV2.ManifestBuilder(conf, fs, workingDir);
156       default:
157         throw new CorruptedSnapshotException("Invalid Snapshot version: "+ desc.getVersion(), desc);
158     }
159   }
160 
161   /**
162    * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
163    * This is used by the "online snapshot" when the table is enabled.
164    */
165   public void addRegion(final HRegion region) throws IOException {
166     // 0. Get the ManifestBuilder/RegionVisitor
167     RegionVisitor visitor = createRegionVisitor(desc);
168 
169     // 1. dump region meta info into the snapshot directory
170     LOG.debug("Storing '" + region + "' region-info for snapshot.");
171     Object regionData = visitor.regionOpen(region.getRegionInfo());
172     monitor.rethrowException();
173 
174     // 2. iterate through all the stores in the region
175     LOG.debug("Creating references for hfiles");
176 
177     for (Store store : region.getStores()) {
178       // 2.1. build the snapshot reference for the store
179       Object familyData = visitor.familyOpen(regionData, store.getFamily().getName());
180       monitor.rethrowException();
181 
182       List<StoreFile> storeFiles = new ArrayList<StoreFile>(store.getStorefiles());
183       if (LOG.isDebugEnabled()) {
184         LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
185       }
186 
187       // 2.2. iterate through all the store's files and create "references".
188       for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
189         StoreFile storeFile = storeFiles.get(i);
190         monitor.rethrowException();
191 
192         // create "reference" to this store file.
193         LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath());
194         visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
195       }
196       visitor.familyClose(regionData, familyData);
197     }
198     visitor.regionClose(regionData);
199   }
200 
201   /**
202    * Creates a 'manifest' for the specified region, by reading directly from the disk.
203    * This is used by the "offline snapshot" when the table is disabled.
204    */
205   public void addRegion(final Path tableDir, final HRegionInfo regionInfo) throws IOException {
206     // 0. Get the ManifestBuilder/RegionVisitor
207     RegionVisitor visitor = createRegionVisitor(desc);
208 
209     // Open the RegionFS
210     HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, fs,
211           tableDir, regionInfo, true);
212     monitor.rethrowException();
213 
214     // 1. dump region meta info into the snapshot directory
215     LOG.debug("Storing region-info for snapshot.");
216     Object regionData = visitor.regionOpen(regionInfo);
217     monitor.rethrowException();
218 
219     // 2. iterate through all the stores in the region
220     LOG.debug("Creating references for hfiles");
221 
222     // This ensures that we have an atomic view of the directory as long as we have < ls limit
223     // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files in
224     // batches and may miss files being added/deleted. This could be more robust (iteratively
225     // checking to see if we have all the files until we are sure), but the limit is currently 1000
226     // files/batch, far more than the number of store files under a single column family.
227     Collection<String> familyNames = regionFs.getFamilies();
228     if (familyNames != null) {
229       for (String familyName: familyNames) {
230         Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
231         monitor.rethrowException();
232 
233         Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName);
234         if (storeFiles == null) {
235           LOG.debug("No files under family: " + familyName);
236           continue;
237         }
238 
239         // 2.1. build the snapshot reference for the store
240         if (LOG.isDebugEnabled()) {
241           LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
242         }
243 
244         // 2.2. iterate through all the store's files and create "references".
245         int i = 0;
246         int sz = storeFiles.size();
247         for (StoreFileInfo storeFile: storeFiles) {
248           monitor.rethrowException();
249 
250           // create "reference" to this store file.
251           LOG.debug("Adding reference for file ("+ (++i) +"/" + sz + "): " + storeFile.getPath());
252           visitor.storeFile(regionData, familyData, storeFile);
253         }
254         visitor.familyClose(regionData, familyData);
255       }
256     }
257     visitor.regionClose(regionData);
258   }
259 
260   /**
261    * Load the information in the SnapshotManifest. Called by SnapshotManifest.open()
262    *
263    * If the format is v2 and there is no data-manifest, means that we are loading an
264    * in-progress snapshot. Since we support rolling-upgrades, we loook for v1 and v2
265    * regions format.
266    */
267   private void load() throws IOException {
268     switch (getSnapshotFormat(desc)) {
269       case SnapshotManifestV1.DESCRIPTOR_VERSION: {
270         this.htd = FSTableDescriptors.getTableDescriptorFromFs(fs, workingDir);
271         ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
272         try {
273           this.regionManifests =
274             SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
275         } finally {
276           tpool.shutdown();
277         }
278         break;
279       }
280       case SnapshotManifestV2.DESCRIPTOR_VERSION: {
281         SnapshotDataManifest dataManifest = readDataManifest();
282         if (dataManifest != null) {
283           htd = HTableDescriptor.convert(dataManifest.getTableSchema());
284           regionManifests = dataManifest.getRegionManifestsList();
285         } else {
286           // Compatibility, load the v1 regions
287           // This happens only when the snapshot is in-progress and the cache wants to refresh.
288           List<SnapshotRegionManifest> v1Regions, v2Regions;
289           ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
290           try {
291             v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
292             v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
293           } catch (InvalidProtocolBufferException e) {
294             throw new CorruptedSnapshotException("unable to parse region manifest " +
295                 e.getMessage(), e);
296           } finally {
297             tpool.shutdown();
298           }
299           if (v1Regions != null && v2Regions != null) {
300             regionManifests =
301               new ArrayList<SnapshotRegionManifest>(v1Regions.size() + v2Regions.size());
302             regionManifests.addAll(v1Regions);
303             regionManifests.addAll(v2Regions);
304           } else if (v1Regions != null) {
305             regionManifests = v1Regions;
306           } else /* if (v2Regions != null) */ {
307             regionManifests = v2Regions;
308           }
309         }
310         break;
311       }
312       default:
313         throw new CorruptedSnapshotException("Invalid Snapshot version: "+ desc.getVersion(), desc);
314     }
315   }
316 
317   /**
318    * Get the current snapshot working dir
319    */
320   public Path getSnapshotDir() {
321     return this.workingDir;
322   }
323 
324   /**
325    * Get the SnapshotDescription
326    */
327   public SnapshotDescription getSnapshotDescription() {
328     return this.desc;
329   }
330 
331   /**
332    * Get the table descriptor from the Snapshot
333    */
334   public HTableDescriptor getTableDescriptor() {
335     return this.htd;
336   }
337 
338   /**
339    * Get all the Region Manifest from the snapshot
340    */
341   public List<SnapshotRegionManifest> getRegionManifests() {
342     return this.regionManifests;
343   }
344 
345   /**
346    * Get all the Region Manifest from the snapshot.
347    * This is an helper to get a map with the region encoded name
348    */
349   public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
350     if (regionManifests == null || regionManifests.size() == 0) return null;
351 
352     HashMap<String, SnapshotRegionManifest> regionsMap =
353         new HashMap<String, SnapshotRegionManifest>(regionManifests.size());
354     for (SnapshotRegionManifest manifest: regionManifests) {
355       String regionName = getRegionNameFromManifest(manifest);
356       regionsMap.put(regionName, manifest);
357     }
358     return regionsMap;
359   }
360 
361   public void consolidate() throws IOException {
362     if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
363       Path rootDir = FSUtils.getRootDir(conf);
364       LOG.info("Using old Snapshot Format");
365       // write a copy of descriptor to the snapshot directory
366       new FSTableDescriptors(conf, fs, rootDir)
367         .createTableDescriptorForTableDirectory(workingDir, htd, false);
368     } else {
369       LOG.debug("Convert to Single Snapshot Manifest");
370       convertToV2SingleManifest();
371     }
372   }
373 
374   /*
375    * In case of rolling-upgrade, we try to read all the formats and build
376    * the snapshot with the latest format.
377    */
378   private void convertToV2SingleManifest() throws IOException {
379     // Try to load v1 and v2 regions
380     List<SnapshotRegionManifest> v1Regions, v2Regions;
381     ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
382     try {
383       v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, fs, workingDir, desc);
384       v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, fs, workingDir, desc);
385     } finally {
386       tpool.shutdown();
387     }
388 
389     SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
390     dataManifestBuilder.setTableSchema(htd.convert());
391 
392     if (v1Regions != null && v1Regions.size() > 0) {
393       dataManifestBuilder.addAllRegionManifests(v1Regions);
394     }
395     if (v2Regions != null && v2Regions.size() > 0) {
396       dataManifestBuilder.addAllRegionManifests(v2Regions);
397     }
398 
399     // Write the v2 Data Manifest.
400     // Once the data-manifest is written, the snapshot can be considered complete.
401     // Currently snapshots are written in a "temporary" directory and later
402     // moved to the "complated" snapshot directory.
403     SnapshotDataManifest dataManifest = dataManifestBuilder.build();
404     writeDataManifest(dataManifest);
405     this.regionManifests = dataManifest.getRegionManifestsList();
406 
407     // Remove the region manifests. Everything is now in the data-manifest.
408     // The delete operation is "relaxed", unless we get an exception we keep going.
409     // The extra files in the snapshot directory will not give any problem,
410     // since they have the same content as the data manifest, and even by re-reading
411     // them we will get the same information.
412     if (v1Regions != null && v1Regions.size() > 0) {
413       for (SnapshotRegionManifest regionManifest: v1Regions) {
414         SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest);
415       }
416     }
417     if (v2Regions != null && v2Regions.size() > 0) {
418       for (SnapshotRegionManifest regionManifest: v2Regions) {
419         SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest);
420       }
421     }
422   }
423 
424   /*
425    * Write the SnapshotDataManifest file
426    */
427   private void writeDataManifest(final SnapshotDataManifest manifest)
428       throws IOException {
429     FSDataOutputStream stream = fs.create(new Path(workingDir, DATA_MANIFEST_NAME));
430     try {
431       manifest.writeTo(stream);
432     } finally {
433       stream.close();
434     }
435   }
436 
437   /*
438    * Read the SnapshotDataManifest file
439    */
440   private SnapshotDataManifest readDataManifest() throws IOException {
441     FSDataInputStream in = null;
442     try {
443       in = fs.open(new Path(workingDir, DATA_MANIFEST_NAME));
444       CodedInputStream cin = CodedInputStream.newInstance(in);
445       cin.setSizeLimit(manifestSizeLimit);
446       return SnapshotDataManifest.parseFrom(cin);
447     } catch (FileNotFoundException e) {
448       return null;
449     } catch (InvalidProtocolBufferException e) {
450       throw new CorruptedSnapshotException("unable to parse data manifest " + e.getMessage(), e);
451     } finally {
452       if (in != null) in.close();
453     }
454   }
455 
456   private ThreadPoolExecutor createExecutor(final String name) {
457     return createExecutor(conf, name);
458   }
459 
460   public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
461     int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
462     return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
463               Threads.getNamedThreadFactory(name));
464   }
465 
466   /**
467    * Extract the region encoded name from the region manifest
468    */
469   static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
470     byte[] regionName = HRegionInfo.createRegionName(
471             ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
472             manifest.getRegionInfo().getStartKey().toByteArray(),
473             manifest.getRegionInfo().getRegionId(), true);
474     return HRegionInfo.encodeRegionName(regionName);
475   }
476 
477   /*
478    * Return the snapshot format
479    */
480   private static int getSnapshotFormat(final SnapshotDescription desc) {
481     return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
482   }
483 }