001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.snapshot;
020
021import java.io.FileNotFoundException;
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.ExecutorCompletionService;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FSDataInputStream;
035import org.apache.hadoop.fs.FSDataOutputStream;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
043import org.apache.hadoop.hbase.mob.MobUtils;
044import org.apache.hadoop.hbase.monitoring.MonitoredTask;
045import org.apache.hadoop.hbase.regionserver.HRegion;
046import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
047import org.apache.hadoop.hbase.regionserver.HStore;
048import org.apache.hadoop.hbase.regionserver.HStoreFile;
049import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.FSTableDescriptors;
053import org.apache.hadoop.hbase.util.Threads;
054import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
060import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
061import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
062
063import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
067
068/**
069 * Utility class to help read/write the Snapshot Manifest.
070 *
071 * The snapshot format is transparent for the users of this class,
072 * once the snapshot is written, it will never be modified.
073 * On open() the snapshot will be loaded to the current in-memory format.
074 */
075@InterfaceAudience.Private
076public final class SnapshotManifest {
077  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManifest.class);
078
079  public static final String SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY = "snapshot.manifest.size.limit";
080
081  public static final String DATA_MANIFEST_NAME = "data.manifest";
082
083  private List<SnapshotRegionManifest> regionManifests;
084  private SnapshotDescription desc;
085  private TableDescriptor htd;
086
087  private final ForeignExceptionSnare monitor;
088  private final Configuration conf;
089  private final Path workingDir;
090  private final FileSystem rootFs;
091  private final FileSystem workingDirFs;
092  private int manifestSizeLimit;
093  private final MonitoredTask statusTask;
094
095  /**
096   *
097   * @param conf configuration file for HBase setup
098   * @param rootFs root filesystem containing HFiles
099   * @param workingDir file path of where the manifest should be located
100   * @param desc description of snapshot being taken
101   * @param monitor monitor of foreign exceptions
102   * @throws IOException if the working directory file system cannot be
103   *                     determined from the config file
104   */
105  private SnapshotManifest(final Configuration conf, final FileSystem rootFs,
106      final Path workingDir, final SnapshotDescription desc,
107      final ForeignExceptionSnare monitor, final MonitoredTask statusTask) throws IOException {
108    this.monitor = monitor;
109    this.desc = desc;
110    this.workingDir = workingDir;
111    this.conf = conf;
112    this.rootFs = rootFs;
113    this.statusTask = statusTask;
114    this.workingDirFs = this.workingDir.getFileSystem(this.conf);
115    this.manifestSizeLimit = conf.getInt(SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY, 64 * 1024 * 1024);
116  }
117
118  /**
119   * Return a SnapshotManifest instance, used for writing a snapshot.
120   *
121   * There are two usage pattern:
122   *  - The Master will create a manifest, add the descriptor, offline regions
123   *    and consolidate the snapshot by writing all the pending stuff on-disk.
124   *      manifest = SnapshotManifest.create(...)
125   *      manifest.addRegion(tableDir, hri)
126   *      manifest.consolidate()
127   *  - The RegionServer will create a single region manifest
128   *      manifest = SnapshotManifest.create(...)
129   *      manifest.addRegion(region)
130   */
131  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
132      final Path workingDir, final SnapshotDescription desc,
133      final ForeignExceptionSnare monitor) throws IOException {
134    return create(conf, fs, workingDir, desc, monitor, null);
135
136  }
137
138  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
139      final Path workingDir, final SnapshotDescription desc, final ForeignExceptionSnare monitor,
140      final MonitoredTask statusTask) throws IOException {
141    return new SnapshotManifest(conf, fs, workingDir, desc, monitor, statusTask);
142
143  }
144
145  /**
146   * Return a SnapshotManifest instance with the information already loaded in-memory.
147   *    SnapshotManifest manifest = SnapshotManifest.open(...)
148   *    TableDescriptor htd = manifest.getDescriptor()
149   *    for (SnapshotRegionManifest regionManifest: manifest.getRegionManifests())
150   *      hri = regionManifest.getRegionInfo()
151   *      for (regionManifest.getFamilyFiles())
152   *        ...
153   */
154  public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
155      final Path workingDir, final SnapshotDescription desc) throws IOException {
156    SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null, null);
157    manifest.load();
158    return manifest;
159  }
160
161
162  /**
163   * Add the table descriptor to the snapshot manifest
164   */
165  public void addTableDescriptor(final TableDescriptor htd) throws IOException {
166    this.htd = htd;
167  }
168
169  interface RegionVisitor<TRegion, TFamily> {
170    TRegion regionOpen(final RegionInfo regionInfo) throws IOException;
171    void regionClose(final TRegion region) throws IOException;
172
173    TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
174    void familyClose(final TRegion region, final TFamily family) throws IOException;
175
176    void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
177      throws IOException;
178  }
179
180  private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
181    switch (getSnapshotFormat(desc)) {
182      case SnapshotManifestV1.DESCRIPTOR_VERSION:
183        return new SnapshotManifestV1.ManifestBuilder(conf, rootFs, workingDir);
184      case SnapshotManifestV2.DESCRIPTOR_VERSION:
185        return new SnapshotManifestV2.ManifestBuilder(conf, rootFs, workingDir);
186      default:
187      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
188        ProtobufUtil.createSnapshotDesc(desc));
189    }
190  }
191
192  public void addMobRegion(RegionInfo regionInfo) throws IOException {
193    // Get the ManifestBuilder/RegionVisitor
194    RegionVisitor visitor = createRegionVisitor(desc);
195
196    // Visit the region and add it to the manifest
197    addMobRegion(regionInfo, visitor);
198  }
199
200  @VisibleForTesting
201  protected void addMobRegion(RegionInfo regionInfo, RegionVisitor visitor) throws IOException {
202    // 1. dump region meta info into the snapshot directory
203    final String snapshotName = desc.getName();
204    LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot=" + snapshotName);
205    Object regionData = visitor.regionOpen(regionInfo);
206    monitor.rethrowException();
207
208    // 2. iterate through all the stores in the region
209    LOG.debug("Creating references for mob files");
210
211    Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
212    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
213      // 2.1. build the snapshot reference for the store if it's a mob store
214      if (!hcd.isMobEnabled()) {
215        continue;
216      }
217      Object familyData = visitor.familyOpen(regionData, hcd.getName());
218      monitor.rethrowException();
219
220      Path storePath = MobUtils.getMobFamilyPath(mobRegionPath, hcd.getNameAsString());
221      List<StoreFileInfo> storeFiles = getStoreFiles(storePath);
222      if (storeFiles == null) {
223        if (LOG.isDebugEnabled()) {
224          LOG.debug("No mob files under family: " + hcd.getNameAsString());
225        }
226        continue;
227      }
228
229      addReferenceFiles(visitor, regionData, familyData, storeFiles, true);
230
231      visitor.familyClose(regionData, familyData);
232    }
233    visitor.regionClose(regionData);
234  }
235
236  /**
237   * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
238   * This is used by the "online snapshot" when the table is enabled.
239   */
240  public void addRegion(final HRegion region) throws IOException {
241    // Get the ManifestBuilder/RegionVisitor
242    RegionVisitor visitor = createRegionVisitor(desc);
243
244    // Visit the region and add it to the manifest
245    addRegion(region, visitor);
246  }
247
248  @VisibleForTesting
249  protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException {
250    // 1. dump region meta info into the snapshot directory
251    final String snapshotName = desc.getName();
252    LOG.debug("Storing '" + region + "' region-info for snapshot=" + snapshotName);
253    Object regionData = visitor.regionOpen(region.getRegionInfo());
254    monitor.rethrowException();
255
256    // 2. iterate through all the stores in the region
257    LOG.debug("Creating references for hfiles");
258
259    for (HStore store : region.getStores()) {
260      // 2.1. build the snapshot reference for the store
261      Object familyData = visitor.familyOpen(regionData,
262          store.getColumnFamilyDescriptor().getName());
263      monitor.rethrowException();
264
265      List<HStoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
266      if (LOG.isDebugEnabled()) {
267        LOG.debug("Adding snapshot references for " + storeFiles  + " hfiles");
268      }
269
270      // 2.2. iterate through all the store's files and create "references".
271      for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
272        HStoreFile storeFile = storeFiles.get(i);
273        monitor.rethrowException();
274
275        // create "reference" to this store file.
276        LOG.debug("Adding reference for file (" + (i+1) + "/" + sz + "): " + storeFile.getPath() +
277                " for snapshot=" + snapshotName);
278        visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
279      }
280      visitor.familyClose(regionData, familyData);
281    }
282    visitor.regionClose(regionData);
283  }
284
285  /**
286   * Creates a 'manifest' for the specified region, by reading directly from the disk.
287   * This is used by the "offline snapshot" when the table is disabled.
288   */
289  public void addRegion(final Path tableDir, final RegionInfo regionInfo) throws IOException {
290    // Get the ManifestBuilder/RegionVisitor
291    RegionVisitor visitor = createRegionVisitor(desc);
292
293    // Visit the region and add it to the manifest
294    addRegion(tableDir, regionInfo, visitor);
295  }
296
297  @VisibleForTesting
298  protected void addRegion(final Path tableDir, final RegionInfo regionInfo, RegionVisitor visitor)
299      throws IOException {
300    boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
301    try {
302      Path baseDir = tableDir;
303      // Open the RegionFS
304      if (isMobRegion) {
305        baseDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), regionInfo.getTable());
306      }
307      HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(conf, rootFs,
308        baseDir, regionInfo, true);
309      monitor.rethrowException();
310
311      // 1. dump region meta info into the snapshot directory
312      LOG.debug("Storing region-info for snapshot.");
313      Object regionData = visitor.regionOpen(regionInfo);
314      monitor.rethrowException();
315
316      // 2. iterate through all the stores in the region
317      LOG.debug("Creating references for hfiles");
318
319      // This ensures that we have an atomic view of the directory as long as we have < ls limit
320      // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files
321      // in batches and may miss files being added/deleted. This could be more robust (iteratively
322      // checking to see if we have all the files until we are sure), but the limit is currently
323      // 1000 files/batch, far more than the number of store files under a single column family.
324      Collection<String> familyNames = regionFs.getFamilies();
325      if (familyNames != null) {
326        for (String familyName: familyNames) {
327          Object familyData = visitor.familyOpen(regionData, Bytes.toBytes(familyName));
328          monitor.rethrowException();
329
330          Collection<StoreFileInfo> storeFiles = regionFs.getStoreFiles(familyName);
331          if (storeFiles == null) {
332            if (LOG.isDebugEnabled()) {
333              LOG.debug("No files under family: " + familyName);
334            }
335            continue;
336          }
337
338          // 2.1. build the snapshot reference for the store
339          // iterate through all the store's files and create "references".
340          addReferenceFiles(visitor, regionData, familyData, storeFiles, false);
341
342          visitor.familyClose(regionData, familyData);
343        }
344      }
345      visitor.regionClose(regionData);
346    } catch (IOException e) {
347      // the mob directory might not be created yet, so do nothing when it is a mob region
348      if (!isMobRegion) {
349        throw e;
350      }
351    }
352  }
353
354  private List<StoreFileInfo> getStoreFiles(Path storeDir) throws IOException {
355    FileStatus[] stats = CommonFSUtils.listStatus(rootFs, storeDir);
356    if (stats == null) return null;
357
358    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>(stats.length);
359    for (int i = 0; i < stats.length; ++i) {
360      storeFiles.add(new StoreFileInfo(conf, rootFs, stats[i]));
361    }
362    return storeFiles;
363  }
364
365  private void addReferenceFiles(RegionVisitor visitor, Object regionData, Object familyData,
366      Collection<StoreFileInfo> storeFiles, boolean isMob) throws IOException {
367    final String fileType = isMob ? "mob file" : "hfile";
368
369    if (LOG.isDebugEnabled()) {
370      LOG.debug(String.format("Adding snapshot references for %s %ss", storeFiles, fileType));
371    }
372
373    int i = 0;
374    int sz = storeFiles.size();
375    for (StoreFileInfo storeFile: storeFiles) {
376      monitor.rethrowException();
377
378      LOG.debug(String.format("Adding reference for %s (%d/%d): %s",
379          fileType, ++i, sz, storeFile.getPath()));
380
381      // create "reference" to this store file.
382      visitor.storeFile(regionData, familyData, storeFile);
383    }
384  }
385
386  /**
387   * Load the information in the SnapshotManifest. Called by SnapshotManifest.open()
388   *
389   * If the format is v2 and there is no data-manifest, means that we are loading an
390   * in-progress snapshot. Since we support rolling-upgrades, we loook for v1 and v2
391   * regions format.
392   */
393  private void load() throws IOException {
394    switch (getSnapshotFormat(desc)) {
395      case SnapshotManifestV1.DESCRIPTOR_VERSION: {
396        this.htd = FSTableDescriptors.getTableDescriptorFromFs(workingDirFs, workingDir);
397        ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
398        try {
399          this.regionManifests =
400            SnapshotManifestV1.loadRegionManifests(conf, tpool, rootFs, workingDir, desc);
401        } finally {
402          tpool.shutdown();
403        }
404        break;
405      }
406      case SnapshotManifestV2.DESCRIPTOR_VERSION: {
407        SnapshotDataManifest dataManifest = readDataManifest();
408        if (dataManifest != null) {
409          htd = ProtobufUtil.toTableDescriptor(dataManifest.getTableSchema());
410          regionManifests = dataManifest.getRegionManifestsList();
411        } else {
412          // Compatibility, load the v1 regions
413          // This happens only when the snapshot is in-progress and the cache wants to refresh.
414          List<SnapshotRegionManifest> v1Regions, v2Regions;
415          ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
416          try {
417            v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, rootFs,
418                workingDir, desc);
419            v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, rootFs,
420                workingDir, desc, manifestSizeLimit);
421          } catch (InvalidProtocolBufferException e) {
422            throw new CorruptedSnapshotException("unable to parse region manifest " +
423                e.getMessage(), e);
424          } finally {
425            tpool.shutdown();
426          }
427          if (v1Regions != null && v2Regions != null) {
428            regionManifests = new ArrayList<>(v1Regions.size() + v2Regions.size());
429            regionManifests.addAll(v1Regions);
430            regionManifests.addAll(v2Regions);
431          } else if (v1Regions != null) {
432            regionManifests = v1Regions;
433          } else /* if (v2Regions != null) */ {
434            regionManifests = v2Regions;
435          }
436        }
437        break;
438      }
439      default:
440      throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
441        ProtobufUtil.createSnapshotDesc(desc));
442    }
443  }
444
445  /**
446   * Get the current snapshot working dir
447   */
448  public Path getSnapshotDir() {
449    return this.workingDir;
450  }
451
452  /**
453   * Get the SnapshotDescription
454   */
455  public SnapshotDescription getSnapshotDescription() {
456    return this.desc;
457  }
458
459  /**
460   * Get the table descriptor from the Snapshot
461   */
462  public TableDescriptor getTableDescriptor() {
463    return this.htd;
464  }
465
466  /**
467   * Get all the Region Manifest from the snapshot
468   */
469  public List<SnapshotRegionManifest> getRegionManifests() {
470    return this.regionManifests;
471  }
472
473  private void setStatusMsg(String msg) {
474    if (this.statusTask != null) {
475      statusTask.setStatus(msg);
476    }
477  }
478
479  /**
480   * Get all the Region Manifest from the snapshot.
481   * This is an helper to get a map with the region encoded name
482   */
483  public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
484    if (regionManifests == null || regionManifests.isEmpty()) return null;
485
486    HashMap<String, SnapshotRegionManifest> regionsMap = new HashMap<>(regionManifests.size());
487    for (SnapshotRegionManifest manifest: regionManifests) {
488      String regionName = getRegionNameFromManifest(manifest);
489      regionsMap.put(regionName, manifest);
490    }
491    return regionsMap;
492  }
493
494  public void consolidate() throws IOException {
495    if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
496      LOG.info("Using old Snapshot Format");
497      // write a copy of descriptor to the snapshot directory
498      FSTableDescriptors.createTableDescriptorForTableDirectory(workingDirFs, workingDir, htd,
499          false);
500    } else {
501      LOG.debug("Convert to Single Snapshot Manifest for {}", this.desc.getName());
502      convertToV2SingleManifest();
503    }
504  }
505
506  /*
507   * In case of rolling-upgrade, we try to read all the formats and build
508   * the snapshot with the latest format.
509   */
510  private void convertToV2SingleManifest() throws IOException {
511    // Try to load v1 and v2 regions
512    List<SnapshotRegionManifest> v1Regions, v2Regions;
513    ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
514    setStatusMsg("Loading Region manifests for " + this.desc.getName());
515    try {
516      v1Regions = SnapshotManifestV1.loadRegionManifests(conf, tpool, workingDirFs,
517          workingDir, desc);
518      v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, workingDirFs,
519          workingDir, desc, manifestSizeLimit);
520
521      SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
522      dataManifestBuilder.setTableSchema(ProtobufUtil.toTableSchema(htd));
523
524      if (v1Regions != null && v1Regions.size() > 0) {
525        dataManifestBuilder.addAllRegionManifests(v1Regions);
526      }
527      if (v2Regions != null && v2Regions.size() > 0) {
528        dataManifestBuilder.addAllRegionManifests(v2Regions);
529      }
530
531      // Write the v2 Data Manifest.
532      // Once the data-manifest is written, the snapshot can be considered complete.
533      // Currently snapshots are written in a "temporary" directory and later
534      // moved to the "complated" snapshot directory.
535      setStatusMsg("Writing data manifest for " + this.desc.getName());
536      SnapshotDataManifest dataManifest = dataManifestBuilder.build();
537      writeDataManifest(dataManifest);
538      this.regionManifests = dataManifest.getRegionManifestsList();
539
540      // Remove the region manifests. Everything is now in the data-manifest.
541      // The delete operation is "relaxed", unless we get an exception we keep going.
542      // The extra files in the snapshot directory will not give any problem,
543      // since they have the same content as the data manifest, and even by re-reading
544      // them we will get the same information.
545      int totalDeletes = 0;
546      ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(tpool);
547      if (v1Regions != null) {
548        for (SnapshotRegionManifest regionManifest: v1Regions) {
549          ++totalDeletes;
550          completionService.submit(() -> {
551            SnapshotManifestV1.deleteRegionManifest(workingDirFs, workingDir, regionManifest);
552            return null;
553          });
554        }
555      }
556      if (v2Regions != null) {
557        for (SnapshotRegionManifest regionManifest: v2Regions) {
558          ++totalDeletes;
559          completionService.submit(() -> {
560            SnapshotManifestV2.deleteRegionManifest(workingDirFs, workingDir, regionManifest);
561            return null;
562          });
563        }
564      }
565      // Wait for the deletes to finish.
566      for (int i = 0; i < totalDeletes; i++) {
567        try {
568          completionService.take().get();
569        } catch (InterruptedException ie) {
570          throw new InterruptedIOException(ie.getMessage());
571        } catch (ExecutionException e) {
572          throw new IOException("Error deleting region manifests", e.getCause());
573        }
574      }
575    } finally {
576      tpool.shutdown();
577    }
578  }
579
580  /*
581   * Write the SnapshotDataManifest file
582   */
583  private void writeDataManifest(final SnapshotDataManifest manifest)
584      throws IOException {
585    FSDataOutputStream stream = workingDirFs.create(new Path(workingDir, DATA_MANIFEST_NAME));
586    try {
587      manifest.writeTo(stream);
588    } finally {
589      stream.close();
590    }
591  }
592
593  /*
594   * Read the SnapshotDataManifest file
595   */
596  private SnapshotDataManifest readDataManifest() throws IOException {
597    FSDataInputStream in = null;
598    try {
599      in = workingDirFs.open(new Path(workingDir, DATA_MANIFEST_NAME));
600      CodedInputStream cin = CodedInputStream.newInstance(in);
601      cin.setSizeLimit(manifestSizeLimit);
602      return SnapshotDataManifest.parseFrom(cin);
603    } catch (FileNotFoundException e) {
604      return null;
605    } catch (InvalidProtocolBufferException e) {
606      throw new CorruptedSnapshotException("unable to parse data manifest " + e.getMessage(), e);
607    } finally {
608      if (in != null) in.close();
609    }
610  }
611
612  private ThreadPoolExecutor createExecutor(final String name) {
613    return createExecutor(conf, name);
614  }
615
616  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
617    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
618    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
619      new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d")
620        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
621  }
622
623  /**
624   * Extract the region encoded name from the region manifest
625   */
626  static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
627    byte[] regionName = RegionInfo.createRegionName(
628            ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
629            manifest.getRegionInfo().getStartKey().toByteArray(),
630            manifest.getRegionInfo().getRegionId(), true);
631    return RegionInfo.encodeRegionName(regionName);
632  }
633
634  /*
635   * Return the snapshot format
636   */
637  private static int getSnapshotFormat(final SnapshotDescription desc) {
638    return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
639  }
640}