001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ScheduledFuture;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.locks.ReadWriteLock;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.CommonPathCapabilities;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.permission.AclEntry;
047import org.apache.hadoop.fs.permission.AclStatus;
048import org.apache.hadoop.hbase.HBaseInterfaceAudience;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.Stoppable;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.errorhandling.ForeignException;
057import org.apache.hadoop.hbase.executor.ExecutorService;
058import org.apache.hadoop.hbase.ipc.RpcServer;
059import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
060import org.apache.hadoop.hbase.master.MasterFileSystem;
061import org.apache.hadoop.hbase.master.MasterServices;
062import org.apache.hadoop.hbase.master.MetricsMaster;
063import org.apache.hadoop.hbase.master.SnapshotSentinel;
064import org.apache.hadoop.hbase.master.WorkerAssigner;
065import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
066import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
067import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
071import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
072import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
073import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
074import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
075import org.apache.hadoop.hbase.procedure.Procedure;
076import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
077import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
078import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
079import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
081import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
082import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
083import org.apache.hadoop.hbase.security.AccessDeniedException;
084import org.apache.hadoop.hbase.security.User;
085import org.apache.hadoop.hbase.security.access.AccessChecker;
086import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
087import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
088import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
089import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
090import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
091import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
092import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
094import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
095import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
096import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
097import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
098import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
099import org.apache.hadoop.hbase.util.CommonFSUtils;
100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
101import org.apache.hadoop.hbase.util.NonceKey;
102import org.apache.hadoop.hbase.util.TableDescriptorChecker;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.apache.yetus.audience.InterfaceStability;
105import org.apache.zookeeper.KeeperException;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
110
111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
116
117/**
118 * This class manages the procedure of taking and restoring snapshots. There is only one
119 * SnapshotManager for the master.
120 * <p>
121 * The class provides methods for monitoring in-progress snapshot actions.
122 * <p>
123 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
124 * simplification in the current implementation.
125 */
126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
127@InterfaceStability.Unstable
128public class SnapshotManager extends MasterProcedureManager implements Stoppable {
129  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
130
131  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
132  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
133
134  /**
135   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
136   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
137   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
138   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
139   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
140   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
141   * done if exists or failed if it doesn't exists.
142   */
143  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
144    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
145  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
146
147  /** Enable or disable snapshot support */
148  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
149
150  /**
151   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
152   */
153  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
154
155  /** Name of the operation to use in the controller */
156  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
157
158  /** Conf key for # of threads used by the SnapshotManager thread pool */
159  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
160
161  /** number of current operations running on the master */
162  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
163
164  /** Conf key for preserving original max file size configs */
165  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
166    "hbase.snapshot.max.filesize.preserve";
167
168  /** Enable or disable snapshot procedure */
169  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
170
171  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
172
173  private boolean stopped;
174  private MasterServices master; // Needed by TableEventHandlers
175  private ProcedureCoordinator coordinator;
176
177  // Is snapshot feature enabled?
178  private boolean isSnapshotSupported = false;
179
180  // Snapshot handlers map, with table name as key.
181  // The map is always accessed and modified under the object lock using synchronized.
182  // snapshotTable() will insert an Handler in the table.
183  // isSnapshotDone() will remove the handler requested if the operation is finished.
184  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
185  private final ScheduledExecutorService scheduleThreadPool =
186    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
187      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
188  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
189
190  // Restore map, with table name as key, procedure ID as value.
191  // The map is always accessed and modified under the object lock using synchronized.
192  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
193  //
194  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
195  // restart/failover. This is just a stopgap implementation until implementation of taking
196  // snapshot using Procedure-V2.
197  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
198
199  // SnapshotDescription -> SnapshotProcId
200  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
201    new ConcurrentHashMap<>();
202
203  private WorkerAssigner verifyWorkerAssigner;
204
205  private Path rootDir;
206  private ExecutorService executorService;
207
208  /**
209   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
210   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
211   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
212   * start to work. (See HBASE-21387)
213   */
214  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
215
216  public SnapshotManager() {
217  }
218
219  /**
220   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
221   * @param master      services for the master where the manager is running
222   * @param coordinator procedure coordinator instance. exposed for testing.
223   * @param pool        HBase ExecutorServcie instance, exposed for testing.
224   */
225  @InterfaceAudience.Private
226  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
227    ExecutorService pool, int sentinelCleanInterval)
228    throws IOException, UnsupportedOperationException {
229    this.master = master;
230
231    this.rootDir = master.getMasterFileSystem().getRootDir();
232    Configuration conf = master.getConfiguration();
233    checkSnapshotSupport(conf, master.getMasterFileSystem());
234
235    this.coordinator = coordinator;
236    this.executorService = pool;
237    resetTempDir();
238    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
239      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
240  }
241
242  /**
243   * Gets the list of all completed snapshots.
244   * @return list of SnapshotDescriptions
245   * @throws IOException File system exception
246   */
247  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
248    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
249  }
250
251  /**
252   * Gets the list of all completed snapshots.
253   * @param snapshotDir snapshot directory
254   * @param withCpCall  Whether to call CP hooks
255   * @return list of SnapshotDescriptions
256   * @throws IOException File system exception
257   */
258  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
259    throws IOException {
260    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
261    // first create the snapshot root path and check to see if it exists
262    FileSystem fs = master.getMasterFileSystem().getFileSystem();
263    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
264
265    // if there are no snapshots, return an empty list
266    if (!fs.exists(snapshotDir)) {
267      return snapshotDescs;
268    }
269
270    // ignore all the snapshots in progress
271    FileStatus[] snapshots = fs.listStatus(snapshotDir,
272      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
273    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
274    withCpCall = withCpCall && cpHost != null;
275    // loop through all the completed snapshots
276    for (FileStatus snapshot : snapshots) {
277      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
278      // if the snapshot is bad
279      if (!fs.exists(info)) {
280        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
281        continue;
282      }
283      FSDataInputStream in = null;
284      try {
285        in = fs.open(info);
286        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
287        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
288          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
289        if (withCpCall) {
290          try {
291            cpHost.preListSnapshot(descPOJO);
292          } catch (AccessDeniedException e) {
293            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
294              + "Either you should be owner of this snapshot or admin user.");
295            // Skip this and try for next snapshot
296            continue;
297          }
298        }
299        snapshotDescs.add(desc);
300
301        // call coproc post hook
302        if (withCpCall) {
303          cpHost.postListSnapshot(descPOJO);
304        }
305      } catch (IOException e) {
306        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
307      } finally {
308        if (in != null) {
309          in.close();
310        }
311      }
312    }
313    return snapshotDescs;
314  }
315
316  /**
317   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
318   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
319   * directory.
320   * @throws IOException if we can't reach the filesystem
321   */
322  private void resetTempDir() throws IOException {
323    Set<String> workingProcedureCoordinatedSnapshotNames =
324      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
325
326    Path tmpdir =
327      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
328    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
329    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
330    if (workingSnapshotDirs == null) {
331      return;
332    }
333    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
334      String workingSnapshotName = workingSnapshotDir.getPath().getName();
335      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
336        try {
337          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
338            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
339              workingSnapshotDir.getPath());
340          } else {
341            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
342              workingSnapshotDir.getPath());
343          }
344        } catch (IOException e) {
345          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
346            workingSnapshotDir.getPath(), e);
347        }
348      } else {
349        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
350      }
351    }
352  }
353
354  /**
355   * Delete the specified snapshot
356   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
357   * @throws IOException                   For filesystem IOExceptions
358   */
359  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
360    // check to see if it is completed
361    if (!isSnapshotCompleted(snapshot)) {
362      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
363    }
364
365    String snapshotName = snapshot.getName();
366    // first create the snapshot description and check to see if it exists
367    FileSystem fs = master.getMasterFileSystem().getFileSystem();
368    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
369    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
370    // just the "name" and it does not contains the "real" snapshot information
371    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
372
373    // call coproc pre hook
374    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
375    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
376    if (cpHost != null) {
377      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
378      cpHost.preDeleteSnapshot(snapshotPOJO);
379    }
380
381    LOG.debug("Deleting snapshot: " + snapshotName);
382    // delete the existing snapshot
383    if (!fs.delete(snapshotDir, true)) {
384      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
385    }
386
387    // call coproc post hook
388    if (cpHost != null) {
389      cpHost.postDeleteSnapshot(snapshotPOJO);
390    }
391
392  }
393
394  /**
395   * Check if the specified snapshot is done
396   * @return true if snapshot is ready to be restored, false if it is still being taken.
397   * @throws IOException              IOException if error from HDFS or RPC
398   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
399   */
400  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
401    // check the request to make sure it has a snapshot
402    if (expected == null) {
403      throw new UnknownSnapshotException(
404        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
405    }
406
407    Long procId = snapshotToProcIdMap.get(expected);
408    if (procId != null) {
409      if (master.getMasterProcedureExecutor().isRunning()) {
410        return master.getMasterProcedureExecutor().isFinished(procId);
411      } else {
412        return false;
413      }
414    }
415
416    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
417
418    // check to see if the sentinel exists,
419    // and if the task is complete removes it from the in-progress snapshots map.
420    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
421
422    // stop tracking "abandoned" handlers
423    cleanupSentinels();
424
425    if (handler == null) {
426      // If there's no handler in the in-progress map, it means one of the following:
427      // - someone has already requested the snapshot state
428      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
429      // - the snapshot was never requested
430      // In those cases returns to the user the "done state" if the snapshots exists on disk,
431      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
432      if (!isSnapshotCompleted(expected)) {
433        throw new UnknownSnapshotException("Snapshot " + ssString
434          + " is not currently running or one of the known completed snapshots.");
435      }
436      // was done, return true;
437      return true;
438    }
439
440    // pass on any failure we find in the sentinel
441    try {
442      handler.rethrowExceptionIfFailed();
443    } catch (ForeignException e) {
444      // Give some procedure info on an exception.
445      String status;
446      Procedure p = coordinator.getProcedure(expected.getName());
447      if (p != null) {
448        status = p.getStatus();
449      } else {
450        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
451      }
452      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
453        ProtobufUtil.createSnapshotDesc(expected));
454    }
455
456    // check to see if we are done
457    if (handler.isFinished()) {
458      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
459      return true;
460    } else if (LOG.isDebugEnabled()) {
461      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
462    }
463    return false;
464  }
465
466  /**
467   * Check to see if there is a snapshot in progress with the same name or on the same table.
468   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
469   * don't allow snapshot with the same name.
470   * @param snapshot   description of the snapshot being checked.
471   * @param checkTable check if the table is already taking a snapshot.
472   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
473   *         table.
474   */
475  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
476    if (checkTable) {
477      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
478      if (isTakingSnapshot(snapshotTable)) {
479        return true;
480      }
481    }
482    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
483    while (it.hasNext()) {
484      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
485      SnapshotSentinel sentinel = entry.getValue();
486      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
487        return true;
488      }
489    }
490    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
491    while (spIt.hasNext()) {
492      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
493      if (
494        snapshot.getName().equals(entry.getKey().getName())
495          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
496      ) {
497        return true;
498      }
499    }
500    return false;
501  }
502
503  /**
504   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
505   * only allowing a single snapshot per table at a time.
506   * @param tableName name of the table being snapshotted.
507   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
508   */
509  public boolean isTakingSnapshot(final TableName tableName) {
510    return isTakingSnapshot(tableName, false);
511  }
512
513  public boolean isTableTakingAnySnapshot(final TableName tableName) {
514    return isTakingSnapshot(tableName, true);
515  }
516
517  /**
518   * Check to see if the specified table has a snapshot in progress. Since we introduce the
519   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
520   * can just consider tables in snapshotHandlers only, but for
521   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
522   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
523   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
524   * if table in snapshot.
525   * @param tableName      name of the table being snapshotted.
526   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
527   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
528   */
529  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
530    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
531    if (handler != null && !handler.isFinished()) {
532      return true;
533    }
534    if (checkProcedure) {
535      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
536        if (
537          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
538            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
539        ) {
540          return true;
541        }
542      }
543    }
544    return false;
545  }
546
547  /**
548   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
549   * aren't already running a snapshot or restore on the requested table.
550   * @param snapshot description of the snapshot we want to start
551   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
552   */
553  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
554    throws HBaseSnapshotException {
555    Path workingDir =
556      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
557
558    try {
559      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
560      // delete the working directory, since we aren't running the snapshot. Likely leftovers
561      // from a failed attempt.
562      workingDirFS.delete(workingDir, true);
563
564      // recreate the working directory for the snapshot
565      if (!workingDirFS.mkdirs(workingDir)) {
566        throw new SnapshotCreationException(
567          "Couldn't create working directory (" + workingDir + ") for snapshot",
568          ProtobufUtil.createSnapshotDesc(snapshot));
569      }
570      updateWorkingDirAclsIfRequired(workingDir, workingDirFS);
571    } catch (HBaseSnapshotException e) {
572      throw e;
573    } catch (IOException e) {
574      throw new SnapshotCreationException(
575        "Exception while checking to see if snapshot could be started.", e,
576        ProtobufUtil.createSnapshotDesc(snapshot));
577    }
578  }
579
580  /**
581   * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs,
582   * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that
583   * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of
584   * the working dir to new snapshot dir, the ACLs are retained.
585   * @param workingDir   working dir to build the snapshot.
586   * @param workingDirFS working dir file system.
587   * @throws IOException If ACL read/modify operation fails.
588   */
589  private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS)
590    throws IOException {
591    if (
592      !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS)
593        || workingDir.getParent() == null || workingDir.getParent().getParent() == null
594    ) {
595      return;
596    }
597    AclStatus snapshotWorkingParentDirStatus;
598    try {
599      snapshotWorkingParentDirStatus =
600        workingDirFS.getAclStatus(workingDir.getParent().getParent());
601    } catch (IOException e) {
602      LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}",
603        workingDir.getParent().getParent(), workingDir, e);
604      return;
605    }
606    List<AclEntry> snapshotWorkingParentDirAclStatusEntries =
607      snapshotWorkingParentDirStatus.getEntries();
608    if (
609      snapshotWorkingParentDirAclStatusEntries != null
610        && snapshotWorkingParentDirAclStatusEntries.size() > 0
611    ) {
612      workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries);
613    }
614  }
615
616  /**
617   * Take a snapshot of a disabled table.
618   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
619   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
620   *                     directory could not be determined
621   */
622  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
623    // setup the snapshot
624    prepareWorkingDirectory(snapshot);
625
626    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
627    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
628
629    // Take the snapshot of the disabled table
630    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
631    snapshotTable(snapshot, handler);
632  }
633
634  /**
635   * Take a snapshot of an enabled table.
636   * @param snapshot description of the snapshot to take.
637   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
638   *                     directory could not be determined
639   */
640  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
641    // setup the snapshot
642    prepareWorkingDirectory(snapshot);
643
644    // Take the snapshot of the enabled table
645    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
646    snapshotTable(snapshot, handler);
647  }
648
649  /**
650   * Take a snapshot using the specified handler. On failure the snapshot temporary working
651   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
652   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
653   * @param snapshot the snapshot description
654   * @param handler  the snapshot handler
655   */
656  private synchronized void snapshotTable(SnapshotDescription snapshot,
657    final TakeSnapshotHandler handler) throws IOException {
658    try {
659      handler.prepare();
660      this.executorService.submit(handler);
661      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
662    } catch (Exception e) {
663      // cleanup the working directory by trying to delete it from the fs.
664      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
665        master.getConfiguration());
666      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
667      try {
668        if (!workingDirFs.delete(workingDir, true)) {
669          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
670            + ClientSnapshotDescriptionUtils.toString(snapshot));
671        }
672      } catch (IOException e1) {
673        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
674          + ClientSnapshotDescriptionUtils.toString(snapshot));
675      }
676      // fail the snapshot
677      throw new SnapshotCreationException("Could not build snapshot handler", e,
678        ProtobufUtil.createSnapshotDesc(snapshot));
679    }
680  }
681
682  public ReadWriteLock getTakingSnapshotLock() {
683    return this.takingSnapshotLock;
684  }
685
686  /**
687   * The snapshot operation processing as following: <br>
688   * 1. Create a Snapshot Handler, and do some initialization; <br>
689   * 2. Put the handler into snapshotHandlers <br>
690   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
691   * and snapshotHandlers;
692   * @return true to indicate that there're some running snapshots.
693   */
694  public synchronized boolean isTakingAnySnapshot() {
695    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
696      || this.snapshotToProcIdMap.size() > 0;
697  }
698
699  /**
700   * Take a snapshot based on the enabled/disabled state of the table.
701   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
702   * @throws IOException            when some sort of generic IO exception occurs.
703   */
704  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
705    this.takingSnapshotLock.readLock().lock();
706    try {
707      takeSnapshotInternal(snapshot);
708    } finally {
709      this.takingSnapshotLock.readLock().unlock();
710    }
711  }
712
713  public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
714    throws IOException {
715    this.takingSnapshotLock.readLock().lock();
716    try {
717      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
718    } finally {
719      this.takingSnapshotLock.readLock().unlock();
720    }
721  }
722
723  private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup,
724    long nonce) throws IOException {
725    return MasterProcedureUtil
726      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
727        @Override
728        protected void run() throws IOException {
729          sanityCheckBeforeSnapshot(snapshot, false);
730
731          long procId = submitProcedure(new SnapshotProcedure(
732            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
733
734          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
735        }
736
737        @Override
738        protected String getDescription() {
739          return "SnapshotProcedure";
740        }
741      });
742  }
743
744  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
745    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
746
747    // call pre coproc hook
748    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
749    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
750    if (cpHost != null) {
751      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
752      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
753    }
754
755    // if the table is enabled, then have the RS run actually the snapshot work
756    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
757    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
758      if (LOG.isDebugEnabled()) {
759        LOG.debug("Table enabled, starting distributed snapshots for {}",
760          ClientSnapshotDescriptionUtils.toString(snapshot));
761      }
762      snapshotEnabledTable(snapshot);
763      if (LOG.isDebugEnabled()) {
764        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
765      }
766    }
767    // For disabled table, snapshot is created by the master
768    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
769      if (LOG.isDebugEnabled()) {
770        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
771          ClientSnapshotDescriptionUtils.toString(snapshot));
772      }
773      snapshotDisabledTable(snapshot);
774      if (LOG.isDebugEnabled()) {
775        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
776      }
777    } else {
778      LOG.error("Can't snapshot table '" + snapshot.getTable()
779        + "', isn't open or closed, we don't know what to do!");
780      TablePartiallyOpenException tpoe =
781        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
782      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
783        ProtobufUtil.createSnapshotDesc(snapshot));
784    }
785
786    // call post coproc hook
787    if (cpHost != null) {
788      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
789    }
790  }
791
792  /**
793   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
794   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
795   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
796   * @param snapshot   description of the snapshot being checked.
797   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
798   *                   we need to check if another zk-coordinated snapshot is in progress, for the
799   *                   snapshot procedure, this is unnecessary.
800   * @return the table descriptor of the table
801   */
802  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
803    boolean checkTable) throws IOException {
804    // check to see if we already completed the snapshot
805    if (isSnapshotCompleted(snapshot)) {
806      throw new SnapshotExistsException(
807        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
808        ProtobufUtil.createSnapshotDesc(snapshot));
809    }
810    LOG.debug("No existing snapshot, attempting snapshot...");
811
812    // stop tracking "abandoned" handlers
813    cleanupSentinels();
814
815    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
816    // make sure we aren't already running a snapshot
817    if (isTakingSnapshot(snapshot, checkTable)) {
818      throw new SnapshotCreationException(
819        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
820          + " because we are already running another snapshot"
821          + " on the same table or with the same name");
822    }
823
824    // make sure we aren't running a restore on the same table
825    if (isRestoringTable(snapshotTable)) {
826      throw new SnapshotCreationException(
827        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
828          + " because we are already have a restore in progress on the same snapshot.");
829    }
830
831    // check to see if the table exists
832    TableDescriptor desc = null;
833    try {
834      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
835    } catch (FileNotFoundException e) {
836      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
837      LOG.error(msg);
838      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
839    } catch (IOException e) {
840      throw new SnapshotCreationException(
841        "Error while geting table description for table " + snapshot.getTable(), e,
842        ProtobufUtil.createSnapshotDesc(snapshot));
843    }
844    if (desc == null) {
845      throw new SnapshotCreationException(
846        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
847        ProtobufUtil.createSnapshotDesc(snapshot));
848    }
849    return desc;
850  }
851
852  /**
853   * Set the handler for the current snapshot
854   * <p>
855   * Exposed for TESTING
856   * @param handler handler the master should use TODO get rid of this if possible, repackaging,
857   *                modify tests.
858   */
859  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
860    final SnapshotSentinel handler) {
861    if (handler != null) {
862      this.snapshotHandlers.put(tableName, handler);
863    } else {
864      this.snapshotHandlers.remove(tableName);
865    }
866  }
867
868  /** Returns distributed commit coordinator for all running snapshots */
869  ProcedureCoordinator getCoordinator() {
870    return coordinator;
871  }
872
873  /**
874   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
875   * snapshot exists in the "completed snapshots folder".
876   * @param snapshot expected snapshot to check
877   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
878   *         not stored
879   * @throws IOException              if the filesystem throws an unexpected exception,
880   * @throws IllegalArgumentException if snapshot name is invalid.
881   */
882  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
883    try {
884      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
885      FileSystem fs = master.getMasterFileSystem().getFileSystem();
886      // check to see if the snapshot already exists
887      return fs.exists(snapshotDir);
888    } catch (IllegalArgumentException iae) {
889      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
890    }
891  }
892
893  /**
894   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
895   * restore in progress.
896   * @param reqSnapshot       Snapshot Descriptor from request
897   * @param tableName         table to clone
898   * @param snapshot          Snapshot Descriptor
899   * @param snapshotTableDesc Table Descriptor
900   * @param nonceKey          unique identifier to prevent duplicated RPC
901   * @return procId the ID of the clone snapshot procedure
902   */
903  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
904    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
905    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
906    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
907    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
908    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
909    if (cpHost != null) {
910      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
911      cpHost.preCloneSnapshot(snapshotPOJO, htd);
912    }
913    long procId;
914    try {
915      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
916    } catch (IOException e) {
917      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
918        + tableName.getNameAsString(), e);
919      throw e;
920    }
921    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
922
923    if (cpHost != null) {
924      cpHost.postCloneSnapshot(snapshotPOJO, htd);
925    }
926    return procId;
927  }
928
929  /**
930   * Clone the specified snapshot into a new table. The operation will fail if the destination table
931   * has a snapshot or restore in progress.
932   * @param snapshot        Snapshot Descriptor
933   * @param tableDescriptor Table Descriptor of the table to create
934   * @param nonceKey        unique identifier to prevent duplicated RPC
935   * @return procId the ID of the clone snapshot procedure
936   */
937  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
938    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
939    final String customSFT) throws HBaseSnapshotException {
940    TableName tableName = tableDescriptor.getTableName();
941
942    // make sure we aren't running a snapshot on the same table
943    if (isTableTakingAnySnapshot(tableName)) {
944      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
945    }
946
947    // make sure we aren't running a restore on the same table
948    if (isRestoringTable(tableName)) {
949      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
950    }
951
952    try {
953      long procId = master.getMasterProcedureExecutor().submitProcedure(
954        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
955          tableDescriptor, snapshot, restoreAcl, customSFT),
956        nonceKey);
957      this.restoreTableToProcIdMap.put(tableName, procId);
958      return procId;
959    } catch (Exception e) {
960      String msg = "Couldn't clone the snapshot="
961        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
962      LOG.error(msg, e);
963      throw new RestoreSnapshotException(msg, e);
964    }
965  }
966
967  /**
968   * Restore or Clone the specified snapshot
969   * @param nonceKey unique identifier to prevent duplicated RPC
970   */
971  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
972    final boolean restoreAcl, String customSFT) throws IOException {
973    FileSystem fs = master.getMasterFileSystem().getFileSystem();
974    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
975
976    // check if the snapshot exists
977    if (!fs.exists(snapshotDir)) {
978      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
979      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
980    }
981
982    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
983    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
984    // information.
985    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
986    SnapshotManifest manifest =
987      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
988    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
989    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
990
991    // sanity check the new table descriptor
992    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
993
994    // stop tracking "abandoned" handlers
995    cleanupSentinels();
996
997    // Verify snapshot validity
998    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
999
1000    // Execute the restore/clone operation
1001    long procId;
1002    if (master.getTableDescriptors().exists(tableName)) {
1003      procId =
1004        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1005    } else {
1006      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
1007        restoreAcl, customSFT);
1008    }
1009    return procId;
1010  }
1011
1012  /**
1013   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1014   * or restore in progress.
1015   * @param reqSnapshot       Snapshot Descriptor from request
1016   * @param tableName         table to restore
1017   * @param snapshot          Snapshot Descriptor
1018   * @param snapshotTableDesc Table Descriptor
1019   * @param nonceKey          unique identifier to prevent duplicated RPC
1020   * @param restoreAcl        true to restore acl of snapshot
1021   * @return procId the ID of the restore snapshot procedure
1022   */
1023  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
1024    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
1025    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
1026    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
1027
1028    // have to check first if restoring the snapshot would break current SFT setup
1029    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
1030      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
1031
1032    if (
1033      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
1034        TableState.State.ENABLED)
1035    ) {
1036      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
1037        + "' must be disabled in order to " + "perform a restore operation.");
1038    }
1039
1040    // call Coprocessor pre hook
1041    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1042    if (cpHost != null) {
1043      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1044      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1045    }
1046
1047    long procId;
1048    try {
1049      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1050    } catch (IOException e) {
1051      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1052        + " as table " + tableName.getNameAsString(), e);
1053      throw e;
1054    }
1055    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1056
1057    if (cpHost != null) {
1058      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1059    }
1060
1061    return procId;
1062  }
1063
1064  /**
1065   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1066   * or restore in progress.
1067   * @param snapshot        Snapshot Descriptor
1068   * @param tableDescriptor Table Descriptor
1069   * @param nonceKey        unique identifier to prevent duplicated RPC
1070   * @param restoreAcl      true to restore acl of snapshot
1071   * @return procId the ID of the restore snapshot procedure
1072   */
1073  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1074    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1075    throws HBaseSnapshotException {
1076    final TableName tableName = tableDescriptor.getTableName();
1077
1078    // make sure we aren't running a snapshot on the same table
1079    if (isTableTakingAnySnapshot(tableName)) {
1080      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1081    }
1082
1083    // make sure we aren't running a restore on the same table
1084    if (isRestoringTable(tableName)) {
1085      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1086    }
1087
1088    try {
1089      TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName);
1090      long procId = master.getMasterProcedureExecutor().submitProcedure(
1091        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1092          oldDescriptor, tableDescriptor, snapshot, restoreAcl),
1093        nonceKey);
1094      this.restoreTableToProcIdMap.put(tableName, procId);
1095      return procId;
1096    } catch (Exception e) {
1097      String msg = "Couldn't restore the snapshot="
1098        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1099      LOG.error(msg, e);
1100      throw new RestoreSnapshotException(msg, e);
1101    }
1102  }
1103
1104  /**
1105   * Verify if the restore of the specified table is in progress.
1106   * @param tableName table under restore
1107   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1108   */
1109  private synchronized boolean isRestoringTable(final TableName tableName) {
1110    Long procId = this.restoreTableToProcIdMap.get(tableName);
1111    if (procId == null) {
1112      return false;
1113    }
1114    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1115    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1116      return true;
1117    } else {
1118      this.restoreTableToProcIdMap.remove(tableName);
1119      return false;
1120    }
1121  }
1122
1123  /**
1124   * Return the handler if it is currently live and has the same snapshot target name. The handler
1125   * is removed from the sentinels map if completed.
1126   * @param sentinels live handlers
1127   * @param snapshot  snapshot description
1128   * @return null if doesn't match, else a live handler.
1129   */
1130  private synchronized SnapshotSentinel removeSentinelIfFinished(
1131    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1132    if (!snapshot.hasTable()) {
1133      return null;
1134    }
1135
1136    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1137    SnapshotSentinel h = sentinels.get(snapshotTable);
1138    if (h == null) {
1139      return null;
1140    }
1141
1142    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1143      // specified snapshot is to the one currently running
1144      return null;
1145    }
1146
1147    // Remove from the "in-progress" list once completed
1148    if (h.isFinished()) {
1149      sentinels.remove(snapshotTable);
1150    }
1151
1152    return h;
1153  }
1154
1155  /**
1156   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1157   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1158   * the status of a completed task is requested. To avoid having sentinels staying around for long
1159   * time if something client side is failed, each operation tries to clean up the in-progress maps
1160   * sentinels finished from a long time.
1161   */
1162  private void cleanupSentinels() {
1163    cleanupSentinels(this.snapshotHandlers);
1164    cleanupCompletedRestoreInMap();
1165    cleanupCompletedSnapshotInMap();
1166  }
1167
1168  /**
1169   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1170   * removal timeout.
1171   * @param sentinels map of sentinels to clean
1172   */
1173  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1174    long currentTime = EnvironmentEdgeManager.currentTime();
1175    long sentinelsCleanupTimeoutMillis =
1176      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1177        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1178    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1179    while (it.hasNext()) {
1180      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1181      SnapshotSentinel sentinel = entry.getValue();
1182      if (
1183        sentinel.isFinished()
1184          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1185      ) {
1186        it.remove();
1187      }
1188    }
1189  }
1190
1191  /**
1192   * Remove the procedures that are marked as finished
1193   */
1194  private synchronized void cleanupCompletedRestoreInMap() {
1195    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1196    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1197    while (it.hasNext()) {
1198      Map.Entry<TableName, Long> entry = it.next();
1199      Long procId = entry.getValue();
1200      if (procExec.isRunning() && procExec.isFinished(procId)) {
1201        it.remove();
1202      }
1203    }
1204  }
1205
1206  /**
1207   * Remove the procedures that are marked as finished
1208   */
1209  private synchronized void cleanupCompletedSnapshotInMap() {
1210    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1211    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1212    while (it.hasNext()) {
1213      Map.Entry<SnapshotDescription, Long> entry = it.next();
1214      Long procId = entry.getValue();
1215      if (procExec.isRunning() && procExec.isFinished(procId)) {
1216        it.remove();
1217      }
1218    }
1219  }
1220
1221  //
1222  // Implementing Stoppable interface
1223  //
1224
1225  @Override
1226  public void stop(String why) {
1227    // short circuit
1228    if (this.stopped) return;
1229    // make sure we get stop
1230    this.stopped = true;
1231    // pass the stop onto take snapshot handlers
1232    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1233      snapshotHandler.cancel(why);
1234    }
1235    if (snapshotHandlerChoreCleanerTask != null) {
1236      snapshotHandlerChoreCleanerTask.cancel(true);
1237    }
1238    try {
1239      if (coordinator != null) {
1240        coordinator.close();
1241      }
1242    } catch (IOException e) {
1243      LOG.error("stop ProcedureCoordinator error", e);
1244    }
1245  }
1246
1247  @Override
1248  public boolean isStopped() {
1249    return this.stopped;
1250  }
1251
1252  /**
1253   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1254   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1255   * @throws UnsupportedOperationException if snapshot are not supported
1256   */
1257  public void checkSnapshotSupport() throws UnsupportedOperationException {
1258    if (!this.isSnapshotSupported) {
1259      throw new UnsupportedOperationException(
1260        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1261          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1262    }
1263  }
1264
1265  /**
1266   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1267   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1268   * up with snapshot data loss.
1269   * @param conf The {@link Configuration} object to use
1270   * @param mfs  The MasterFileSystem to use
1271   * @throws IOException                   in case of file-system operation failure
1272   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1273   *                                       system
1274   */
1275  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1276    throws IOException, UnsupportedOperationException {
1277    // Verify if snapshot is disabled by the user
1278    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1279    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1280    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1281
1282    // Extract cleaners from conf
1283    Set<String> hfileCleaners = new HashSet<>();
1284    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1285    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1286
1287    Set<String> logCleaners = new HashSet<>();
1288    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1289    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1290
1291    // check if an older version of snapshot directory was present
1292    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1293    FileSystem fs = mfs.getFileSystem();
1294    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1295    if (ss != null && !ss.isEmpty()) {
1296      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1297      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1298    }
1299
1300    // If the user has enabled the snapshot, we force the cleaners to be present
1301    // otherwise we still need to check if cleaners are enabled or not and verify
1302    // that there're no snapshot in the .snapshot folder.
1303    if (snapshotEnabled) {
1304      // Inject snapshot cleaners, if snapshot.enable is true
1305      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1306      hfileCleaners.add(HFileLinkCleaner.class.getName());
1307      // If sync acl to HDFS feature is enabled, then inject the cleaner
1308      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1309        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1310      }
1311
1312      // Set cleaners conf
1313      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1314        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1315      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1316        logCleaners.toArray(new String[logCleaners.size()]));
1317    } else {
1318      // There may be restore tables if snapshot is enabled and then disabled, so add
1319      // HFileLinkCleaner, see HBASE-26670 for more details.
1320      hfileCleaners.add(HFileLinkCleaner.class.getName());
1321      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1322        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1323      // Verify if SnapshotHFileCleaner are present
1324      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1325
1326      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1327      if (snapshotEnabled) {
1328        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1329          + HBASE_SNAPSHOT_ENABLED + "' property "
1330          + (userDisabled ? "is set to 'false'." : "is not set."));
1331      }
1332    }
1333
1334    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1335    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1336
1337    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1338    // otherwise we end up with snapshot data loss.
1339    if (!snapshotEnabled) {
1340      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1341      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1342      if (fs.exists(snapshotDir)) {
1343        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1344          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1345        if (snapshots != null) {
1346          LOG.error("Snapshots are present, but cleaners are not enabled.");
1347          checkSnapshotSupport();
1348        }
1349      }
1350    }
1351  }
1352
1353  @Override
1354  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1355    throws KeeperException, IOException, UnsupportedOperationException {
1356    this.master = master;
1357
1358    this.rootDir = master.getMasterFileSystem().getRootDir();
1359    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1360
1361    // get the configuration for the coordinator
1362    Configuration conf = master.getConfiguration();
1363    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1364    long timeoutMillis = Math.max(
1365      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1366        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1367      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1368        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1369    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1370
1371    // setup the default procedure coordinator
1372    String name = master.getServerName().toString();
1373    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1374    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1375      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1376
1377    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1378    this.executorService = master.getExecutorService();
1379    this.verifyWorkerAssigner =
1380      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1381        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1382    restoreUnfinishedSnapshotProcedure();
1383    restoreWorkers();
1384    resetTempDir();
1385    snapshotHandlerChoreCleanerTask =
1386      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1387  }
1388
1389  private void restoreUnfinishedSnapshotProcedure() {
1390    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1391      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1392      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1393        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1394        LOG.info("restore unfinished snapshot procedure {}", p);
1395      });
1396  }
1397
1398  @Override
1399  public String getProcedureSignature() {
1400    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1401  }
1402
1403  @Override
1404  public void execProcedure(ProcedureDescription desc) throws IOException {
1405    takeSnapshot(toSnapshotDescription(desc));
1406  }
1407
1408  @Override
1409  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1410    throws IOException {
1411    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1412    // In future, when we AC is removed for good, that check should be moved here.
1413  }
1414
1415  @Override
1416  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1417    return isSnapshotDone(toSnapshotDescription(desc));
1418  }
1419
1420  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1421    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1422    if (!desc.hasInstance()) {
1423      throw new IOException("Snapshot name is not defined: " + desc.toString());
1424    }
1425    String snapshotName = desc.getInstance();
1426    List<NameStringPair> props = desc.getConfigurationList();
1427    String table = null;
1428    for (NameStringPair prop : props) {
1429      if ("table".equalsIgnoreCase(prop.getName())) {
1430        table = prop.getValue();
1431      }
1432    }
1433    if (table == null) {
1434      throw new IOException("Snapshot table is not defined: " + desc.toString());
1435    }
1436    TableName tableName = TableName.valueOf(table);
1437    builder.setTable(tableName.getNameAsString());
1438    builder.setName(snapshotName);
1439    builder.setType(SnapshotDescription.Type.FLUSH);
1440    return builder.build();
1441  }
1442
1443  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1444    snapshotToProcIdMap.put(snapshot, procId);
1445    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1446      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1447  }
1448
1449  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1450    snapshotToProcIdMap.remove(snapshot, procId);
1451    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1452      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1453  }
1454
1455  public boolean snapshotProcedureEnabled() {
1456    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1457      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1458  }
1459
1460  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1461    throws ProcedureSuspendedException {
1462    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
1463    if (worker.isPresent()) {
1464      LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
1465      return worker.get();
1466    }
1467    verifyWorkerAssigner.suspend(procedure);
1468    throw new ProcedureSuspendedException();
1469  }
1470
1471  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
1472    MasterProcedureScheduler scheduler) {
1473    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1474    verifyWorkerAssigner.release(worker);
1475    verifyWorkerAssigner.wake(scheduler);
1476  }
1477
1478  private void restoreWorkers() {
1479    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1480      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1481      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1482        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1483        LOG.debug("{} restores used worker {}", p, p.getServerName());
1484      });
1485  }
1486
1487  public Integer getAvailableWorker(ServerName serverName) {
1488    return verifyWorkerAssigner.getAvailableWorker(serverName);
1489  }
1490}