001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.io.InterruptedIOException;
023import java.util.ArrayList;
024import java.util.Collection;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.ExecutorCompletionService;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.fs.FSDataInputStream;
034import org.apache.hadoop.fs.FSDataOutputStream;
035import org.apache.hadoop.fs.FileStatus;
036import org.apache.hadoop.fs.FileSystem;
037import org.apache.hadoop.fs.Path;
038import org.apache.hadoop.hbase.HConstants;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
044import org.apache.hadoop.hbase.mob.MobUtils;
045import org.apache.hadoop.hbase.monitoring.MonitoredTask;
046import org.apache.hadoop.hbase.regionserver.HRegion;
047import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
048import org.apache.hadoop.hbase.regionserver.HStore;
049import org.apache.hadoop.hbase.regionserver.HStoreFile;
050import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
051import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
052import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.FSTableDescriptors;
055import org.apache.hadoop.hbase.util.Threads;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
061import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
062import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
063
064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDataManifest;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotRegionManifest;
068
069/**
070 * Utility class to help read/write the Snapshot Manifest. The snapshot format is transparent for
071 * the users of this class, once the snapshot is written, it will never be modified. On open() the
072 * snapshot will be loaded to the current in-memory format.
073 */
074@InterfaceAudience.Private
075public final class SnapshotManifest {
076  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManifest.class);
077
078  public static final String SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY = "snapshot.manifest.size.limit";
079
080  public static final String DATA_MANIFEST_NAME = "data.manifest";
081
082  private List<SnapshotRegionManifest> regionManifests;
083  private SnapshotDescription desc;
084  private TableDescriptor htd;
085
086  private final ForeignExceptionSnare monitor;
087  private final Configuration conf;
088  private final Path workingDir;
089  private final FileSystem rootFs;
090  private final FileSystem workingDirFs;
091  private int manifestSizeLimit;
092  private final MonitoredTask statusTask;
093
094  /**
095   * @param conf       configuration file for HBase setup
096   * @param rootFs     root filesystem containing HFiles
097   * @param workingDir file path of where the manifest should be located
098   * @param desc       description of snapshot being taken
099   * @param monitor    monitor of foreign exceptions
100   * @throws IOException if the working directory file system cannot be determined from the config
101   *                     file
102   */
103  private SnapshotManifest(final Configuration conf, final FileSystem rootFs, final Path workingDir,
104    final SnapshotDescription desc, final ForeignExceptionSnare monitor,
105    final MonitoredTask statusTask) throws IOException {
106    this.monitor = monitor;
107    this.desc = desc;
108    this.workingDir = workingDir;
109    this.conf = conf;
110    this.rootFs = rootFs;
111    this.statusTask = statusTask;
112    this.workingDirFs = this.workingDir.getFileSystem(this.conf);
113    this.manifestSizeLimit = conf.getInt(SNAPSHOT_MANIFEST_SIZE_LIMIT_CONF_KEY, 64 * 1024 * 1024);
114  }
115
116  /**
117   * Return a SnapshotManifest instance, used for writing a snapshot. There are two usage pattern: -
118   * The Master will create a manifest, add the descriptor, offline regions and consolidate the
119   * snapshot by writing all the pending stuff on-disk. manifest = SnapshotManifest.create(...)
120   * manifest.addRegion(tableDir, hri) manifest.consolidate() - The RegionServer will create a
121   * single region manifest manifest = SnapshotManifest.create(...) manifest.addRegion(region)
122   */
123  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
124    final Path workingDir, final SnapshotDescription desc, final ForeignExceptionSnare monitor)
125    throws IOException {
126    return create(conf, fs, workingDir, desc, monitor, null);
127
128  }
129
130  public static SnapshotManifest create(final Configuration conf, final FileSystem fs,
131    final Path workingDir, final SnapshotDescription desc, final ForeignExceptionSnare monitor,
132    final MonitoredTask statusTask) throws IOException {
133    return new SnapshotManifest(conf, fs, workingDir, desc, monitor, statusTask);
134
135  }
136
137  /**
138   * Return a SnapshotManifest instance with the information already loaded in-memory.
139   * SnapshotManifest manifest = SnapshotManifest.open(...) TableDescriptor htd =
140   * manifest.getDescriptor() for (SnapshotRegionManifest regionManifest:
141   * manifest.getRegionManifests()) hri = regionManifest.getRegionInfo() for
142   * (regionManifest.getFamilyFiles()) ...
143   */
144  public static SnapshotManifest open(final Configuration conf, final FileSystem fs,
145    final Path workingDir, final SnapshotDescription desc) throws IOException {
146    SnapshotManifest manifest = new SnapshotManifest(conf, fs, workingDir, desc, null, null);
147    manifest.load();
148    return manifest;
149  }
150
151  /**
152   * Add the table descriptor to the snapshot manifest
153   */
154  public void addTableDescriptor(final TableDescriptor htd) throws IOException {
155    this.htd = htd;
156  }
157
158  interface RegionVisitor<TRegion, TFamily> {
159    TRegion regionOpen(final RegionInfo regionInfo) throws IOException;
160
161    void regionClose(final TRegion region) throws IOException;
162
163    TFamily familyOpen(final TRegion region, final byte[] familyName) throws IOException;
164
165    void familyClose(final TRegion region, final TFamily family) throws IOException;
166
167    void storeFile(final TRegion region, final TFamily family, final StoreFileInfo storeFile)
168      throws IOException;
169  }
170
171  private RegionVisitor createRegionVisitor(final SnapshotDescription desc) throws IOException {
172    switch (getSnapshotFormat(desc)) {
173      case SnapshotManifestV1.DESCRIPTOR_VERSION:
174        return new SnapshotManifestV1.ManifestBuilder(conf, rootFs, workingDir);
175      case SnapshotManifestV2.DESCRIPTOR_VERSION:
176        return new SnapshotManifestV2.ManifestBuilder(conf, rootFs, workingDir);
177      default:
178        throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
179          ProtobufUtil.createSnapshotDesc(desc));
180    }
181  }
182
183  public void addMobRegion(RegionInfo regionInfo) throws IOException {
184    // Get the ManifestBuilder/RegionVisitor
185    RegionVisitor visitor = createRegionVisitor(desc);
186
187    // Visit the region and add it to the manifest
188    addMobRegion(regionInfo, visitor);
189  }
190
191  protected void addMobRegion(RegionInfo regionInfo, RegionVisitor visitor) throws IOException {
192    // 1. dump region meta info into the snapshot directory
193    final String snapshotName = desc.getName();
194    LOG.debug("Storing mob region '" + regionInfo + "' region-info for snapshot=" + snapshotName);
195    Object regionData = visitor.regionOpen(regionInfo);
196    monitor.rethrowException();
197
198    // 2. iterate through all the stores in the region
199    LOG.debug("Creating references for mob files");
200
201    Path mobRegionPath = MobUtils.getMobRegionPath(conf, regionInfo.getTable());
202    for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
203      // 2.1. build the snapshot reference for the store if it's a mob store
204      if (!hcd.isMobEnabled()) {
205        continue;
206      }
207      Object familyData = visitor.familyOpen(regionData, hcd.getName());
208      monitor.rethrowException();
209
210      Path storePath = MobUtils.getMobFamilyPath(mobRegionPath, hcd.getNameAsString());
211      List<StoreFileInfo> storeFiles = getStoreFiles(storePath, htd, hcd, regionInfo);
212      if (storeFiles == null) {
213        if (LOG.isDebugEnabled()) {
214          LOG.debug("No mob files under family: " + hcd.getNameAsString());
215        }
216        continue;
217      }
218
219      addReferenceFiles(visitor, regionData, familyData, storeFiles, true);
220
221      visitor.familyClose(regionData, familyData);
222    }
223    visitor.regionClose(regionData);
224  }
225
226  /**
227   * Creates a 'manifest' for the specified region, by reading directly from the HRegion object.
228   * This is used by the "online snapshot" when the table is enabled.
229   */
230  public void addRegion(final HRegion region) throws IOException {
231    // Get the ManifestBuilder/RegionVisitor
232    RegionVisitor visitor = createRegionVisitor(desc);
233
234    // Visit the region and add it to the manifest
235    addRegion(region, visitor);
236  }
237
238  protected void addRegion(final HRegion region, RegionVisitor visitor) throws IOException {
239    // 1. dump region meta info into the snapshot directory
240    final String snapshotName = desc.getName();
241    LOG.debug("Storing '" + region + "' region-info for snapshot=" + snapshotName);
242    Object regionData = visitor.regionOpen(region.getRegionInfo());
243    monitor.rethrowException();
244
245    // 2. iterate through all the stores in the region
246    LOG.debug("Creating references for hfiles");
247
248    for (HStore store : region.getStores()) {
249      // 2.1. build the snapshot reference for the store
250      Object familyData =
251        visitor.familyOpen(regionData, store.getColumnFamilyDescriptor().getName());
252      monitor.rethrowException();
253
254      List<HStoreFile> storeFiles = new ArrayList<>(store.getStorefiles());
255      if (LOG.isDebugEnabled()) {
256        LOG.debug("Adding snapshot references for " + storeFiles + " hfiles");
257      }
258
259      // 2.2. iterate through all the store's files and create "references".
260      for (int i = 0, sz = storeFiles.size(); i < sz; i++) {
261        HStoreFile storeFile = storeFiles.get(i);
262        monitor.rethrowException();
263
264        // create "reference" to this store file.
265        LOG.debug("Adding reference for file (" + (i + 1) + "/" + sz + "): " + storeFile.getPath()
266          + " for snapshot=" + snapshotName);
267        visitor.storeFile(regionData, familyData, storeFile.getFileInfo());
268      }
269      visitor.familyClose(regionData, familyData);
270    }
271    visitor.regionClose(regionData);
272  }
273
274  /**
275   * Creates a 'manifest' for the specified region, by reading directly from the disk. This is used
276   * by the "offline snapshot" when the table is disabled.
277   */
278  public void addRegion(final Path tableDir, final RegionInfo regionInfo) throws IOException {
279    // Get the ManifestBuilder/RegionVisitor
280    RegionVisitor visitor = createRegionVisitor(desc);
281
282    // Visit the region and add it to the manifest
283    addRegion(tableDir, regionInfo, visitor);
284  }
285
286  protected void addRegion(Path tableDir, RegionInfo regionInfo, RegionVisitor visitor)
287    throws IOException {
288    boolean isMobRegion = MobUtils.isMobRegionInfo(regionInfo);
289    try {
290      Path baseDir = tableDir;
291      // Open the RegionFS
292      if (isMobRegion) {
293        baseDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf), regionInfo.getTable());
294      }
295      HRegionFileSystem regionFs =
296        HRegionFileSystem.openRegionFromFileSystem(conf, rootFs, baseDir, regionInfo, true);
297      monitor.rethrowException();
298
299      // 1. dump region meta info into the snapshot directory
300      LOG.debug("Storing region-info for snapshot.");
301      Object regionData = visitor.regionOpen(regionInfo);
302      monitor.rethrowException();
303
304      // 2. iterate through all the stores in the region
305      LOG.debug("Creating references for hfiles");
306
307      // This ensures that we have an atomic view of the directory as long as we have < ls limit
308      // (batch size of the files in a directory) on the namenode. Otherwise, we get back the files
309      // in batches and may miss files being added/deleted. This could be more robust (iteratively
310      // checking to see if we have all the files until we are sure), but the limit is currently
311      // 1000 files/batch, far more than the number of store files under a single column family.
312      for (ColumnFamilyDescriptor cfd : htd.getColumnFamilies()) {
313        Object familyData = visitor.familyOpen(regionData, cfd.getName());
314        monitor.rethrowException();
315        StoreFileTracker tracker = null;
316        if (isMobRegion) {
317          // MOB regions are always using the default SFT implementation
318          ColumnFamilyDescriptor defaultSFTCfd = ColumnFamilyDescriptorBuilder.newBuilder(cfd)
319            .setValue(StoreFileTrackerFactory.TRACKER_IMPL,
320              StoreFileTrackerFactory.Trackers.DEFAULT.name())
321            .build();
322          tracker = StoreFileTrackerFactory.create(conf, htd, defaultSFTCfd, regionFs);
323        } else {
324          tracker = StoreFileTrackerFactory.create(conf, htd, cfd, regionFs);
325        }
326        List<StoreFileInfo> storeFiles = tracker.load();
327        if (storeFiles.isEmpty()) {
328          LOG.debug("No files under family: {}", cfd.getNameAsString());
329          continue;
330        }
331        // 2.1. build the snapshot reference for the store
332        // iterate through all the store's files and create "references".
333        addReferenceFiles(visitor, regionData, familyData, storeFiles, false);
334        visitor.familyClose(regionData, familyData);
335      }
336      visitor.regionClose(regionData);
337    } catch (IOException e) {
338      // the mob directory might not be created yet, so do nothing when it is a mob region
339      if (!isMobRegion) {
340        throw e;
341      }
342    }
343  }
344
345  private List<StoreFileInfo> getStoreFiles(Path storePath, TableDescriptor htd,
346    ColumnFamilyDescriptor hcd, RegionInfo regionInfo) throws IOException {
347    FileStatus[] stats = CommonFSUtils.listStatus(rootFs, storePath);
348    if (stats == null) return null;
349
350    HRegionFileSystem regionFS = HRegionFileSystem.create(conf, rootFs,
351      MobUtils.getMobTableDir(new Path(conf.get(HConstants.HBASE_DIR)), htd.getTableName()),
352      regionInfo);
353    StoreFileTracker sft = StoreFileTrackerFactory.create(conf, htd, hcd, regionFS, false);
354    ArrayList<StoreFileInfo> storeFiles = new ArrayList<>(stats.length);
355    for (int i = 0; i < stats.length; ++i) {
356      storeFiles.add(sft.getStoreFileInfo(stats[i], stats[i].getPath(), false));
357    }
358    return storeFiles;
359  }
360
361  private void addReferenceFiles(RegionVisitor visitor, Object regionData, Object familyData,
362    Collection<StoreFileInfo> storeFiles, boolean isMob) throws IOException {
363    final String fileType = isMob ? "mob file" : "hfile";
364
365    if (LOG.isDebugEnabled()) {
366      LOG.debug(String.format("Adding snapshot references for %s %ss", storeFiles, fileType));
367    }
368
369    int i = 0;
370    int sz = storeFiles.size();
371    for (StoreFileInfo storeFile : storeFiles) {
372      monitor.rethrowException();
373
374      LOG.debug(String.format("Adding reference for %s (%d/%d): %s", fileType, ++i, sz,
375        storeFile.getPath()));
376
377      // create "reference" to this store file.
378      visitor.storeFile(regionData, familyData, storeFile);
379    }
380  }
381
382  /**
383   * Load the information in the SnapshotManifest. Called by SnapshotManifest.open() If the format
384   * is v2 and there is no data-manifest, means that we are loading an in-progress snapshot. Since
385   * we support rolling-upgrades, we loook for v1 and v2 regions format.
386   */
387  private void load() throws IOException {
388    switch (getSnapshotFormat(desc)) {
389      case SnapshotManifestV1.DESCRIPTOR_VERSION: {
390        this.htd = FSTableDescriptors.getTableDescriptorFromFs(workingDirFs, workingDir);
391        ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
392        try {
393          this.regionManifests =
394            SnapshotManifestV1.loadRegionManifests(conf, tpool, rootFs, workingDir, desc, htd);
395        } finally {
396          tpool.shutdown();
397        }
398        break;
399      }
400      case SnapshotManifestV2.DESCRIPTOR_VERSION: {
401        SnapshotDataManifest dataManifest = readDataManifest();
402        if (dataManifest != null) {
403          htd = ProtobufUtil.toTableDescriptor(dataManifest.getTableSchema());
404          regionManifests = dataManifest.getRegionManifestsList();
405        } else {
406          // Compatibility, load the v1 regions
407          // This happens only when the snapshot is in-progress and the cache wants to refresh.
408          List<SnapshotRegionManifest> v1Regions, v2Regions;
409          ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
410          try {
411            v1Regions =
412              SnapshotManifestV1.loadRegionManifests(conf, tpool, rootFs, workingDir, desc, htd);
413            v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, rootFs, workingDir,
414              desc, manifestSizeLimit);
415          } catch (InvalidProtocolBufferException e) {
416            throw new CorruptedSnapshotException(
417              "unable to parse region manifest " + e.getMessage(), e);
418          } finally {
419            tpool.shutdown();
420          }
421          if (v1Regions != null && v2Regions != null) {
422            regionManifests = new ArrayList<>(v1Regions.size() + v2Regions.size());
423            regionManifests.addAll(v1Regions);
424            regionManifests.addAll(v2Regions);
425          } else if (v1Regions != null) {
426            regionManifests = v1Regions;
427          } else /* if (v2Regions != null) */ {
428            regionManifests = v2Regions;
429          }
430        }
431        break;
432      }
433      default:
434        throw new CorruptedSnapshotException("Invalid Snapshot version: " + desc.getVersion(),
435          ProtobufUtil.createSnapshotDesc(desc));
436    }
437  }
438
439  /**
440   * Get the current snapshot working dir
441   */
442  public Path getSnapshotDir() {
443    return this.workingDir;
444  }
445
446  /**
447   * Get the SnapshotDescription
448   */
449  public SnapshotDescription getSnapshotDescription() {
450    return this.desc;
451  }
452
453  /**
454   * Get the table descriptor from the Snapshot
455   */
456  public TableDescriptor getTableDescriptor() {
457    return this.htd;
458  }
459
460  /**
461   * Get all the Region Manifest from the snapshot
462   */
463  public List<SnapshotRegionManifest> getRegionManifests() {
464    return this.regionManifests;
465  }
466
467  private void setStatusMsg(String msg) {
468    if (this.statusTask != null) {
469      statusTask.setStatus(msg);
470    }
471  }
472
473  /**
474   * Get all the Region Manifest from the snapshot. This is an helper to get a map with the region
475   * encoded name
476   */
477  public Map<String, SnapshotRegionManifest> getRegionManifestsMap() {
478    if (regionManifests == null || regionManifests.isEmpty()) return null;
479
480    HashMap<String, SnapshotRegionManifest> regionsMap = new HashMap<>(regionManifests.size());
481    for (SnapshotRegionManifest manifest : regionManifests) {
482      String regionName = getRegionNameFromManifest(manifest);
483      regionsMap.put(regionName, manifest);
484    }
485    return regionsMap;
486  }
487
488  public void consolidate() throws IOException {
489    if (getSnapshotFormat(desc) == SnapshotManifestV1.DESCRIPTOR_VERSION) {
490      LOG.info("Using old Snapshot Format");
491      // write a copy of descriptor to the snapshot directory
492      FSTableDescriptors.createTableDescriptorForTableDirectory(workingDirFs, workingDir, htd,
493        false);
494    } else {
495      LOG.debug("Convert to Single Snapshot Manifest for {}", this.desc.getName());
496      convertToV2SingleManifest();
497    }
498  }
499
500  /*
501   * In case of rolling-upgrade, we try to read all the formats and build the snapshot with the
502   * latest format.
503   */
504  private void convertToV2SingleManifest() throws IOException {
505    // Try to load v1 and v2 regions
506    List<SnapshotRegionManifest> v1Regions, v2Regions;
507    ThreadPoolExecutor tpool = createExecutor("SnapshotManifestLoader");
508    setStatusMsg("Loading Region manifests for " + this.desc.getName());
509    try {
510      v1Regions =
511        SnapshotManifestV1.loadRegionManifests(conf, tpool, workingDirFs, workingDir, desc, htd);
512      v2Regions = SnapshotManifestV2.loadRegionManifests(conf, tpool, workingDirFs, workingDir,
513        desc, manifestSizeLimit);
514
515      SnapshotDataManifest.Builder dataManifestBuilder = SnapshotDataManifest.newBuilder();
516      dataManifestBuilder.setTableSchema(ProtobufUtil.toTableSchema(htd));
517
518      if (v1Regions != null && v1Regions.size() > 0) {
519        dataManifestBuilder.addAllRegionManifests(v1Regions);
520      }
521      if (v2Regions != null && v2Regions.size() > 0) {
522        dataManifestBuilder.addAllRegionManifests(v2Regions);
523      }
524
525      // Write the v2 Data Manifest.
526      // Once the data-manifest is written, the snapshot can be considered complete.
527      // Currently snapshots are written in a "temporary" directory and later
528      // moved to the "complated" snapshot directory.
529      setStatusMsg("Writing data manifest for " + this.desc.getName());
530      SnapshotDataManifest dataManifest = dataManifestBuilder.build();
531      writeDataManifest(dataManifest);
532      this.regionManifests = dataManifest.getRegionManifestsList();
533
534      // Remove the region manifests. Everything is now in the data-manifest.
535      // The delete operation is "relaxed", unless we get an exception we keep going.
536      // The extra files in the snapshot directory will not give any problem,
537      // since they have the same content as the data manifest, and even by re-reading
538      // them we will get the same information.
539      int totalDeletes = 0;
540      ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(tpool);
541      if (v1Regions != null) {
542        for (SnapshotRegionManifest regionManifest : v1Regions) {
543          ++totalDeletes;
544          completionService.submit(() -> {
545            SnapshotManifestV1.deleteRegionManifest(workingDirFs, workingDir, regionManifest);
546            return null;
547          });
548        }
549      }
550      if (v2Regions != null) {
551        for (SnapshotRegionManifest regionManifest : v2Regions) {
552          ++totalDeletes;
553          completionService.submit(() -> {
554            SnapshotManifestV2.deleteRegionManifest(workingDirFs, workingDir, regionManifest);
555            return null;
556          });
557        }
558      }
559      // Wait for the deletes to finish.
560      for (int i = 0; i < totalDeletes; i++) {
561        try {
562          completionService.take().get();
563        } catch (InterruptedException ie) {
564          throw new InterruptedIOException(ie.getMessage());
565        } catch (ExecutionException e) {
566          throw new IOException("Error deleting region manifests", e.getCause());
567        }
568      }
569    } finally {
570      tpool.shutdown();
571    }
572  }
573
574  /*
575   * Write the SnapshotDataManifest file
576   */
577  private void writeDataManifest(final SnapshotDataManifest manifest) throws IOException {
578    try (
579      FSDataOutputStream stream = workingDirFs.create(new Path(workingDir, DATA_MANIFEST_NAME))) {
580      manifest.writeTo(stream);
581    }
582  }
583
584  /*
585   * Read the SnapshotDataManifest file
586   */
587  private SnapshotDataManifest readDataManifest() throws IOException {
588    try (FSDataInputStream in = workingDirFs.open(new Path(workingDir, DATA_MANIFEST_NAME))) {
589      CodedInputStream cin = CodedInputStream.newInstance(in);
590      cin.setSizeLimit(manifestSizeLimit);
591      return SnapshotDataManifest.parseFrom(cin);
592    } catch (FileNotFoundException e) {
593      return null;
594    } catch (InvalidProtocolBufferException e) {
595      throw new CorruptedSnapshotException("unable to parse data manifest " + e.getMessage(), e);
596    }
597  }
598
599  private ThreadPoolExecutor createExecutor(final String name) {
600    return createExecutor(conf, name);
601  }
602
603  public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
604    int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
605    return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
606      new ThreadFactoryBuilder().setNameFormat(name + "-pool-%d").setDaemon(true)
607        .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
608  }
609
610  /**
611   * Extract the region encoded name from the region manifest
612   */
613  static String getRegionNameFromManifest(final SnapshotRegionManifest manifest) {
614    byte[] regionName =
615      RegionInfo.createRegionName(ProtobufUtil.toTableName(manifest.getRegionInfo().getTableName()),
616        manifest.getRegionInfo().getStartKey().toByteArray(),
617        manifest.getRegionInfo().getRegionId(), true);
618    return RegionInfo.encodeRegionName(regionName);
619  }
620
621  /*
622   * Return the snapshot format
623   */
624  private static int getSnapshotFormat(final SnapshotDescription desc) {
625    return desc.hasVersion() ? desc.getVersion() : SnapshotManifestV1.DESCRIPTOR_VERSION;
626  }
627}