001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ScheduledFuture;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.locks.ReadWriteLock;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.hbase.HBaseInterfaceAudience;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.Stoppable;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.client.TableState;
053import org.apache.hadoop.hbase.errorhandling.ForeignException;
054import org.apache.hadoop.hbase.executor.ExecutorService;
055import org.apache.hadoop.hbase.ipc.RpcServer;
056import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
057import org.apache.hadoop.hbase.master.MasterFileSystem;
058import org.apache.hadoop.hbase.master.MasterServices;
059import org.apache.hadoop.hbase.master.MetricsMaster;
060import org.apache.hadoop.hbase.master.SnapshotSentinel;
061import org.apache.hadoop.hbase.master.WorkerAssigner;
062import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
063import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
064import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
065import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
066import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
067import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
068import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
069import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
070import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
071import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
072import org.apache.hadoop.hbase.procedure.Procedure;
073import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
074import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
075import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
076import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
077import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
078import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
079import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
080import org.apache.hadoop.hbase.security.AccessDeniedException;
081import org.apache.hadoop.hbase.security.User;
082import org.apache.hadoop.hbase.security.access.AccessChecker;
083import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
084import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
085import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
086import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
087import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
088import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
089import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
090import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
091import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
092import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
093import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
094import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
095import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
096import org.apache.hadoop.hbase.util.CommonFSUtils;
097import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
098import org.apache.hadoop.hbase.util.NonceKey;
099import org.apache.hadoop.hbase.util.TableDescriptorChecker;
100import org.apache.yetus.audience.InterfaceAudience;
101import org.apache.yetus.audience.InterfaceStability;
102import org.apache.zookeeper.KeeperException;
103import org.slf4j.Logger;
104import org.slf4j.LoggerFactory;
105
106import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
107
108import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
109import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
113
114/**
115 * This class manages the procedure of taking and restoring snapshots. There is only one
116 * SnapshotManager for the master.
117 * <p>
118 * The class provides methods for monitoring in-progress snapshot actions.
119 * <p>
120 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
121 * simplification in the current implementation.
122 */
123@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
124@InterfaceStability.Unstable
125public class SnapshotManager extends MasterProcedureManager implements Stoppable {
126  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
127
128  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
129  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
130
131  /**
132   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
133   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
134   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
135   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
136   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
137   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
138   * done if exists or failed if it doesn't exists.
139   */
140  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
141    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
142  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
143
144  /** Enable or disable snapshot support */
145  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
146
147  /**
148   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
149   */
150  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
151
152  /** Name of the operation to use in the controller */
153  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
154
155  /** Conf key for # of threads used by the SnapshotManager thread pool */
156  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
157
158  /** number of current operations running on the master */
159  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
160
161  /** Conf key for preserving original max file size configs */
162  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
163    "hbase.snapshot.max.filesize.preserve";
164
165  /** Enable or disable snapshot procedure */
166  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
167
168  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
169
170  private boolean stopped;
171  private MasterServices master; // Needed by TableEventHandlers
172  private ProcedureCoordinator coordinator;
173
174  // Is snapshot feature enabled?
175  private boolean isSnapshotSupported = false;
176
177  // Snapshot handlers map, with table name as key.
178  // The map is always accessed and modified under the object lock using synchronized.
179  // snapshotTable() will insert an Handler in the table.
180  // isSnapshotDone() will remove the handler requested if the operation is finished.
181  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
182  private final ScheduledExecutorService scheduleThreadPool =
183    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
184      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
185  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
186
187  // Restore map, with table name as key, procedure ID as value.
188  // The map is always accessed and modified under the object lock using synchronized.
189  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
190  //
191  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
192  // restart/failover. This is just a stopgap implementation until implementation of taking
193  // snapshot using Procedure-V2.
194  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
195
196  // SnapshotDescription -> SnapshotProcId
197  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
198    new ConcurrentHashMap<>();
199
200  private WorkerAssigner verifyWorkerAssigner;
201
202  private Path rootDir;
203  private ExecutorService executorService;
204
205  /**
206   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
207   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
208   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
209   * start to work. (See HBASE-21387)
210   */
211  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
212
213  public SnapshotManager() {
214  }
215
216  /**
217   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
218   * @param master      services for the master where the manager is running
219   * @param coordinator procedure coordinator instance. exposed for testing.
220   * @param pool        HBase ExecutorServcie instance, exposed for testing.
221   */
222  @InterfaceAudience.Private
223  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
224    ExecutorService pool, int sentinelCleanInterval)
225    throws IOException, UnsupportedOperationException {
226    this.master = master;
227
228    this.rootDir = master.getMasterFileSystem().getRootDir();
229    Configuration conf = master.getConfiguration();
230    checkSnapshotSupport(conf, master.getMasterFileSystem());
231
232    this.coordinator = coordinator;
233    this.executorService = pool;
234    resetTempDir();
235    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
236      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
237  }
238
239  /**
240   * Gets the list of all completed snapshots.
241   * @return list of SnapshotDescriptions
242   * @throws IOException File system exception
243   */
244  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
245    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
246  }
247
248  /**
249   * Gets the list of all completed snapshots.
250   * @param snapshotDir snapshot directory
251   * @param withCpCall  Whether to call CP hooks
252   * @return list of SnapshotDescriptions
253   * @throws IOException File system exception
254   */
255  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
256    throws IOException {
257    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
258    // first create the snapshot root path and check to see if it exists
259    FileSystem fs = master.getMasterFileSystem().getFileSystem();
260    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
261
262    // if there are no snapshots, return an empty list
263    if (!fs.exists(snapshotDir)) {
264      return snapshotDescs;
265    }
266
267    // ignore all the snapshots in progress
268    FileStatus[] snapshots = fs.listStatus(snapshotDir,
269      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
270    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
271    withCpCall = withCpCall && cpHost != null;
272    // loop through all the completed snapshots
273    for (FileStatus snapshot : snapshots) {
274      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
275      // if the snapshot is bad
276      if (!fs.exists(info)) {
277        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
278        continue;
279      }
280      FSDataInputStream in = null;
281      try {
282        in = fs.open(info);
283        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
284        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
285          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
286        if (withCpCall) {
287          try {
288            cpHost.preListSnapshot(descPOJO);
289          } catch (AccessDeniedException e) {
290            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
291              + "Either you should be owner of this snapshot or admin user.");
292            // Skip this and try for next snapshot
293            continue;
294          }
295        }
296        snapshotDescs.add(desc);
297
298        // call coproc post hook
299        if (withCpCall) {
300          cpHost.postListSnapshot(descPOJO);
301        }
302      } catch (IOException e) {
303        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
304      } finally {
305        if (in != null) {
306          in.close();
307        }
308      }
309    }
310    return snapshotDescs;
311  }
312
313  /**
314   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
315   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
316   * directory.
317   * @throws IOException if we can't reach the filesystem
318   */
319  private void resetTempDir() throws IOException {
320    Set<String> workingProcedureCoordinatedSnapshotNames =
321      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
322
323    Path tmpdir =
324      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
325    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
326    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
327    if (workingSnapshotDirs == null) {
328      return;
329    }
330    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
331      String workingSnapshotName = workingSnapshotDir.getPath().getName();
332      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
333        try {
334          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
335            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
336              workingSnapshotDir.getPath());
337          } else {
338            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
339              workingSnapshotDir.getPath());
340          }
341        } catch (IOException e) {
342          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
343            workingSnapshotDir.getPath(), e);
344        }
345      } else {
346        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
347      }
348    }
349  }
350
351  /**
352   * Delete the specified snapshot n * @throws SnapshotDoesNotExistException If the specified
353   * snapshot does not exist.
354   * @throws IOException For filesystem IOExceptions
355   */
356  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
357    // check to see if it is completed
358    if (!isSnapshotCompleted(snapshot)) {
359      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
360    }
361
362    String snapshotName = snapshot.getName();
363    // first create the snapshot description and check to see if it exists
364    FileSystem fs = master.getMasterFileSystem().getFileSystem();
365    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
366    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
367    // just the "name" and it does not contains the "real" snapshot information
368    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
369
370    // call coproc pre hook
371    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
372    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
373    if (cpHost != null) {
374      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
375      cpHost.preDeleteSnapshot(snapshotPOJO);
376    }
377
378    LOG.debug("Deleting snapshot: " + snapshotName);
379    // delete the existing snapshot
380    if (!fs.delete(snapshotDir, true)) {
381      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
382    }
383
384    // call coproc post hook
385    if (cpHost != null) {
386      cpHost.postDeleteSnapshot(snapshotPOJO);
387    }
388
389  }
390
391  /**
392   * Check if the specified snapshot is done n * @return true if snapshot is ready to be restored,
393   * false if it is still being taken.
394   * @throws IOException              IOException if error from HDFS or RPC
395   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
396   */
397  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
398    // check the request to make sure it has a snapshot
399    if (expected == null) {
400      throw new UnknownSnapshotException(
401        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
402    }
403
404    Long procId = snapshotToProcIdMap.get(expected);
405    if (procId != null) {
406      if (master.getMasterProcedureExecutor().isRunning()) {
407        return master.getMasterProcedureExecutor().isFinished(procId);
408      } else {
409        return false;
410      }
411    }
412
413    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
414
415    // check to see if the sentinel exists,
416    // and if the task is complete removes it from the in-progress snapshots map.
417    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
418
419    // stop tracking "abandoned" handlers
420    cleanupSentinels();
421
422    if (handler == null) {
423      // If there's no handler in the in-progress map, it means one of the following:
424      // - someone has already requested the snapshot state
425      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
426      // - the snapshot was never requested
427      // In those cases returns to the user the "done state" if the snapshots exists on disk,
428      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
429      if (!isSnapshotCompleted(expected)) {
430        throw new UnknownSnapshotException("Snapshot " + ssString
431          + " is not currently running or one of the known completed snapshots.");
432      }
433      // was done, return true;
434      return true;
435    }
436
437    // pass on any failure we find in the sentinel
438    try {
439      handler.rethrowExceptionIfFailed();
440    } catch (ForeignException e) {
441      // Give some procedure info on an exception.
442      String status;
443      Procedure p = coordinator.getProcedure(expected.getName());
444      if (p != null) {
445        status = p.getStatus();
446      } else {
447        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
448      }
449      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
450        ProtobufUtil.createSnapshotDesc(expected));
451    }
452
453    // check to see if we are done
454    if (handler.isFinished()) {
455      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
456      return true;
457    } else if (LOG.isDebugEnabled()) {
458      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
459    }
460    return false;
461  }
462
463  /**
464   * Check to see if there is a snapshot in progress with the same name or on the same table.
465   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
466   * don't allow snapshot with the same name.
467   * @param snapshot   description of the snapshot being checked.
468   * @param checkTable check if the table is already taking a snapshot.
469   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
470   *         table.
471   */
472  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
473    if (checkTable) {
474      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
475      if (isTakingSnapshot(snapshotTable)) {
476        return true;
477      }
478    }
479    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
480    while (it.hasNext()) {
481      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
482      SnapshotSentinel sentinel = entry.getValue();
483      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
484        return true;
485      }
486    }
487    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
488    while (spIt.hasNext()) {
489      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
490      if (
491        snapshot.getName().equals(entry.getKey().getName())
492          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
493      ) {
494        return true;
495      }
496    }
497    return false;
498  }
499
500  /**
501   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
502   * only allowing a single snapshot per table at a time.
503   * @param tableName name of the table being snapshotted.
504   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
505   */
506  public boolean isTakingSnapshot(final TableName tableName) {
507    return isTakingSnapshot(tableName, false);
508  }
509
510  public boolean isTableTakingAnySnapshot(final TableName tableName) {
511    return isTakingSnapshot(tableName, true);
512  }
513
514  /**
515   * Check to see if the specified table has a snapshot in progress. Since we introduce the
516   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
517   * can just consider tables in snapshotHandlers only, but for
518   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
519   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
520   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
521   * if table in snapshot.
522   * @param tableName      name of the table being snapshotted.
523   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
524   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
525   */
526  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
527    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
528    if (handler != null && !handler.isFinished()) {
529      return true;
530    }
531    if (checkProcedure) {
532      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
533        if (
534          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
535            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
536        ) {
537          return true;
538        }
539      }
540    }
541    return false;
542  }
543
544  /**
545   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
546   * aren't already running a snapshot or restore on the requested table.
547   * @param snapshot description of the snapshot we want to start
548   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
549   */
550  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
551    throws HBaseSnapshotException {
552    Path workingDir =
553      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
554
555    try {
556      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
557      // delete the working directory, since we aren't running the snapshot. Likely leftovers
558      // from a failed attempt.
559      workingDirFS.delete(workingDir, true);
560
561      // recreate the working directory for the snapshot
562      if (!workingDirFS.mkdirs(workingDir)) {
563        throw new SnapshotCreationException(
564          "Couldn't create working directory (" + workingDir + ") for snapshot",
565          ProtobufUtil.createSnapshotDesc(snapshot));
566      }
567    } catch (HBaseSnapshotException e) {
568      throw e;
569    } catch (IOException e) {
570      throw new SnapshotCreationException(
571        "Exception while checking to see if snapshot could be started.", e,
572        ProtobufUtil.createSnapshotDesc(snapshot));
573    }
574  }
575
576  /**
577   * Take a snapshot of a disabled table.
578   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
579   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
580   *                     directory could not be determined
581   */
582  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
583    // setup the snapshot
584    prepareWorkingDirectory(snapshot);
585
586    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
587    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
588
589    // Take the snapshot of the disabled table
590    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
591    snapshotTable(snapshot, handler);
592  }
593
594  /**
595   * Take a snapshot of an enabled table.
596   * @param snapshot description of the snapshot to take.
597   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
598   *                     directory could not be determined
599   */
600  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
601    // setup the snapshot
602    prepareWorkingDirectory(snapshot);
603
604    // Take the snapshot of the enabled table
605    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
606    snapshotTable(snapshot, handler);
607  }
608
609  /**
610   * Take a snapshot using the specified handler. On failure the snapshot temporary working
611   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
612   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
613   * @param snapshot the snapshot description
614   * @param handler  the snapshot handler
615   */
616  private synchronized void snapshotTable(SnapshotDescription snapshot,
617    final TakeSnapshotHandler handler) throws IOException {
618    try {
619      handler.prepare();
620      this.executorService.submit(handler);
621      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
622    } catch (Exception e) {
623      // cleanup the working directory by trying to delete it from the fs.
624      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
625        master.getConfiguration());
626      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
627      try {
628        if (!workingDirFs.delete(workingDir, true)) {
629          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
630            + ClientSnapshotDescriptionUtils.toString(snapshot));
631        }
632      } catch (IOException e1) {
633        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
634          + ClientSnapshotDescriptionUtils.toString(snapshot));
635      }
636      // fail the snapshot
637      throw new SnapshotCreationException("Could not build snapshot handler", e,
638        ProtobufUtil.createSnapshotDesc(snapshot));
639    }
640  }
641
642  public ReadWriteLock getTakingSnapshotLock() {
643    return this.takingSnapshotLock;
644  }
645
646  /**
647   * The snapshot operation processing as following: <br>
648   * 1. Create a Snapshot Handler, and do some initialization; <br>
649   * 2. Put the handler into snapshotHandlers <br>
650   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
651   * and snapshotHandlers;
652   * @return true to indicate that there're some running snapshots.
653   */
654  public synchronized boolean isTakingAnySnapshot() {
655    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
656      || this.snapshotToProcIdMap.size() > 0;
657  }
658
659  /**
660   * Take a snapshot based on the enabled/disabled state of the table. n * @throws
661   * HBaseSnapshotException when a snapshot specific exception occurs.
662   * @throws IOException when some sort of generic IO exception occurs.
663   */
664  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
665    this.takingSnapshotLock.readLock().lock();
666    try {
667      takeSnapshotInternal(snapshot);
668    } finally {
669      this.takingSnapshotLock.readLock().unlock();
670    }
671  }
672
673  public synchronized long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
674    throws IOException {
675    this.takingSnapshotLock.readLock().lock();
676    try {
677      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
678    } finally {
679      this.takingSnapshotLock.readLock().unlock();
680    }
681  }
682
683  private long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup, long nonce)
684    throws IOException {
685    return MasterProcedureUtil
686      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
687        @Override
688        protected void run() throws IOException {
689          sanityCheckBeforeSnapshot(snapshot, false);
690
691          long procId = submitProcedure(new SnapshotProcedure(
692            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
693
694          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
695        }
696
697        @Override
698        protected String getDescription() {
699          return "SnapshotProcedure";
700        }
701      });
702  }
703
704  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
705    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
706
707    // call pre coproc hook
708    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
709    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
710    if (cpHost != null) {
711      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
712      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
713    }
714
715    // if the table is enabled, then have the RS run actually the snapshot work
716    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
717    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
718      if (LOG.isDebugEnabled()) {
719        LOG.debug("Table enabled, starting distributed snapshots for {}",
720          ClientSnapshotDescriptionUtils.toString(snapshot));
721      }
722      snapshotEnabledTable(snapshot);
723      if (LOG.isDebugEnabled()) {
724        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
725      }
726    }
727    // For disabled table, snapshot is created by the master
728    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
729      if (LOG.isDebugEnabled()) {
730        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
731          ClientSnapshotDescriptionUtils.toString(snapshot));
732      }
733      snapshotDisabledTable(snapshot);
734      if (LOG.isDebugEnabled()) {
735        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
736      }
737    } else {
738      LOG.error("Can't snapshot table '" + snapshot.getTable()
739        + "', isn't open or closed, we don't know what to do!");
740      TablePartiallyOpenException tpoe =
741        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
742      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
743        ProtobufUtil.createSnapshotDesc(snapshot));
744    }
745
746    // call post coproc hook
747    if (cpHost != null) {
748      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
749    }
750  }
751
752  /**
753   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
754   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
755   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
756   * @param snapshot   description of the snapshot being checked.
757   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
758   *                   we need to check if another zk-coordinated snapshot is in progress, for the
759   *                   snapshot procedure, this is unnecessary.
760   * @return the table descriptor of the table
761   */
762  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
763    boolean checkTable) throws IOException {
764    // check to see if we already completed the snapshot
765    if (isSnapshotCompleted(snapshot)) {
766      throw new SnapshotExistsException(
767        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
768        ProtobufUtil.createSnapshotDesc(snapshot));
769    }
770    LOG.debug("No existing snapshot, attempting snapshot...");
771
772    // stop tracking "abandoned" handlers
773    cleanupSentinels();
774
775    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
776    // make sure we aren't already running a snapshot
777    if (isTakingSnapshot(snapshot, checkTable)) {
778      throw new SnapshotCreationException(
779        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
780          + " because we are already running another snapshot"
781          + " on the same table or with the same name");
782    }
783
784    // make sure we aren't running a restore on the same table
785    if (isRestoringTable(snapshotTable)) {
786      throw new SnapshotCreationException(
787        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
788          + " because we are already have a restore in progress on the same snapshot.");
789    }
790
791    // check to see if the table exists
792    TableDescriptor desc = null;
793    try {
794      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
795    } catch (FileNotFoundException e) {
796      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
797      LOG.error(msg);
798      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
799    } catch (IOException e) {
800      throw new SnapshotCreationException(
801        "Error while geting table description for table " + snapshot.getTable(), e,
802        ProtobufUtil.createSnapshotDesc(snapshot));
803    }
804    if (desc == null) {
805      throw new SnapshotCreationException(
806        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
807        ProtobufUtil.createSnapshotDesc(snapshot));
808    }
809    return desc;
810  }
811
812  /**
813   * Set the handler for the current snapshot
814   * <p>
815   * Exposed for TESTING n * @param handler handler the master should use TODO get rid of this if
816   * possible, repackaging, modify tests.
817   */
818  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
819    final SnapshotSentinel handler) {
820    if (handler != null) {
821      this.snapshotHandlers.put(tableName, handler);
822    } else {
823      this.snapshotHandlers.remove(tableName);
824    }
825  }
826
827  /**
828   * @return distributed commit coordinator for all running snapshots
829   */
830  ProcedureCoordinator getCoordinator() {
831    return coordinator;
832  }
833
834  /**
835   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
836   * snapshot exists in the "completed snapshots folder".
837   * @param snapshot expected snapshot to check
838   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
839   *         not stored
840   * @throws IOException              if the filesystem throws an unexpected exception,
841   * @throws IllegalArgumentException if snapshot name is invalid.
842   */
843  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
844    try {
845      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
846      FileSystem fs = master.getMasterFileSystem().getFileSystem();
847      // check to see if the snapshot already exists
848      return fs.exists(snapshotDir);
849    } catch (IllegalArgumentException iae) {
850      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
851    }
852  }
853
854  /**
855   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
856   * restore in progress.
857   * @param reqSnapshot       Snapshot Descriptor from request
858   * @param tableName         table to clone
859   * @param snapshot          Snapshot Descriptor
860   * @param snapshotTableDesc Table Descriptor
861   * @param nonceKey          unique identifier to prevent duplicated RPC
862   * @return procId the ID of the clone snapshot procedure n
863   */
864  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
865    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
866    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
867    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
868    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
869    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
870    if (cpHost != null) {
871      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
872      cpHost.preCloneSnapshot(snapshotPOJO, htd);
873    }
874    long procId;
875    try {
876      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
877    } catch (IOException e) {
878      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
879        + tableName.getNameAsString(), e);
880      throw e;
881    }
882    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
883
884    if (cpHost != null) {
885      cpHost.postCloneSnapshot(snapshotPOJO, htd);
886    }
887    return procId;
888  }
889
890  /**
891   * Clone the specified snapshot into a new table. The operation will fail if the destination table
892   * has a snapshot or restore in progress.
893   * @param snapshot        Snapshot Descriptor
894   * @param tableDescriptor Table Descriptor of the table to create
895   * @param nonceKey        unique identifier to prevent duplicated RPC
896   * @return procId the ID of the clone snapshot procedure
897   */
898  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
899    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
900    final String customSFT) throws HBaseSnapshotException {
901    TableName tableName = tableDescriptor.getTableName();
902
903    // make sure we aren't running a snapshot on the same table
904    if (isTableTakingAnySnapshot(tableName)) {
905      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
906    }
907
908    // make sure we aren't running a restore on the same table
909    if (isRestoringTable(tableName)) {
910      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
911    }
912
913    try {
914      long procId = master.getMasterProcedureExecutor().submitProcedure(
915        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
916          tableDescriptor, snapshot, restoreAcl, customSFT),
917        nonceKey);
918      this.restoreTableToProcIdMap.put(tableName, procId);
919      return procId;
920    } catch (Exception e) {
921      String msg = "Couldn't clone the snapshot="
922        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
923      LOG.error(msg, e);
924      throw new RestoreSnapshotException(msg, e);
925    }
926  }
927
928  /**
929   * Restore or Clone the specified snapshot n * @param nonceKey unique identifier to prevent
930   * duplicated RPC n
931   */
932  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
933    final boolean restoreAcl, String customSFT) throws IOException {
934    FileSystem fs = master.getMasterFileSystem().getFileSystem();
935    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
936
937    // check if the snapshot exists
938    if (!fs.exists(snapshotDir)) {
939      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
940      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
941    }
942
943    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
944    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
945    // information.
946    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
947    SnapshotManifest manifest =
948      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
949    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
950    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
951
952    // sanity check the new table descriptor
953    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
954
955    // stop tracking "abandoned" handlers
956    cleanupSentinels();
957
958    // Verify snapshot validity
959    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
960
961    // Execute the restore/clone operation
962    long procId;
963    if (master.getTableDescriptors().exists(tableName)) {
964      procId =
965        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
966    } else {
967      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
968        restoreAcl, customSFT);
969    }
970    return procId;
971  }
972
973  /**
974   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
975   * or restore in progress.
976   * @param reqSnapshot       Snapshot Descriptor from request
977   * @param tableName         table to restore
978   * @param snapshot          Snapshot Descriptor
979   * @param snapshotTableDesc Table Descriptor
980   * @param nonceKey          unique identifier to prevent duplicated RPC
981   * @param restoreAcl        true to restore acl of snapshot
982   * @return procId the ID of the restore snapshot procedure n
983   */
984  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
985    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
986    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
987    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
988
989    // have to check first if restoring the snapshot would break current SFT setup
990    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
991      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
992
993    if (
994      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
995        TableState.State.ENABLED)
996    ) {
997      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
998        + "' must be disabled in order to " + "perform a restore operation.");
999    }
1000
1001    // call Coprocessor pre hook
1002    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1003    if (cpHost != null) {
1004      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1005      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1006    }
1007
1008    long procId;
1009    try {
1010      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1011    } catch (IOException e) {
1012      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1013        + " as table " + tableName.getNameAsString(), e);
1014      throw e;
1015    }
1016    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1017
1018    if (cpHost != null) {
1019      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1020    }
1021
1022    return procId;
1023  }
1024
1025  /**
1026   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1027   * or restore in progress.
1028   * @param snapshot        Snapshot Descriptor
1029   * @param tableDescriptor Table Descriptor
1030   * @param nonceKey        unique identifier to prevent duplicated RPC
1031   * @param restoreAcl      true to restore acl of snapshot
1032   * @return procId the ID of the restore snapshot procedure
1033   */
1034  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1035    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1036    throws HBaseSnapshotException {
1037    final TableName tableName = tableDescriptor.getTableName();
1038
1039    // make sure we aren't running a snapshot on the same table
1040    if (isTableTakingAnySnapshot(tableName)) {
1041      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1042    }
1043
1044    // make sure we aren't running a restore on the same table
1045    if (isRestoringTable(tableName)) {
1046      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1047    }
1048
1049    try {
1050      long procId = master.getMasterProcedureExecutor().submitProcedure(
1051        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1052          tableDescriptor, snapshot, restoreAcl),
1053        nonceKey);
1054      this.restoreTableToProcIdMap.put(tableName, procId);
1055      return procId;
1056    } catch (Exception e) {
1057      String msg = "Couldn't restore the snapshot="
1058        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1059      LOG.error(msg, e);
1060      throw new RestoreSnapshotException(msg, e);
1061    }
1062  }
1063
1064  /**
1065   * Verify if the restore of the specified table is in progress.
1066   * @param tableName table under restore
1067   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1068   */
1069  private synchronized boolean isRestoringTable(final TableName tableName) {
1070    Long procId = this.restoreTableToProcIdMap.get(tableName);
1071    if (procId == null) {
1072      return false;
1073    }
1074    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1075    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1076      return true;
1077    } else {
1078      this.restoreTableToProcIdMap.remove(tableName);
1079      return false;
1080    }
1081  }
1082
1083  /**
1084   * Return the handler if it is currently live and has the same snapshot target name. The handler
1085   * is removed from the sentinels map if completed.
1086   * @param sentinels live handlers
1087   * @param snapshot  snapshot description
1088   * @return null if doesn't match, else a live handler.
1089   */
1090  private synchronized SnapshotSentinel removeSentinelIfFinished(
1091    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1092    if (!snapshot.hasTable()) {
1093      return null;
1094    }
1095
1096    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1097    SnapshotSentinel h = sentinels.get(snapshotTable);
1098    if (h == null) {
1099      return null;
1100    }
1101
1102    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1103      // specified snapshot is to the one currently running
1104      return null;
1105    }
1106
1107    // Remove from the "in-progress" list once completed
1108    if (h.isFinished()) {
1109      sentinels.remove(snapshotTable);
1110    }
1111
1112    return h;
1113  }
1114
1115  /**
1116   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1117   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1118   * the status of a completed task is requested. To avoid having sentinels staying around for long
1119   * time if something client side is failed, each operation tries to clean up the in-progress maps
1120   * sentinels finished from a long time.
1121   */
1122  private void cleanupSentinels() {
1123    cleanupSentinels(this.snapshotHandlers);
1124    cleanupCompletedRestoreInMap();
1125    cleanupCompletedSnapshotInMap();
1126  }
1127
1128  /**
1129   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1130   * removal timeout.
1131   * @param sentinels map of sentinels to clean
1132   */
1133  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1134    long currentTime = EnvironmentEdgeManager.currentTime();
1135    long sentinelsCleanupTimeoutMillis =
1136      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1137        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1138    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1139    while (it.hasNext()) {
1140      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1141      SnapshotSentinel sentinel = entry.getValue();
1142      if (
1143        sentinel.isFinished()
1144          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1145      ) {
1146        it.remove();
1147      }
1148    }
1149  }
1150
1151  /**
1152   * Remove the procedures that are marked as finished
1153   */
1154  private synchronized void cleanupCompletedRestoreInMap() {
1155    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1156    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1157    while (it.hasNext()) {
1158      Map.Entry<TableName, Long> entry = it.next();
1159      Long procId = entry.getValue();
1160      if (procExec.isRunning() && procExec.isFinished(procId)) {
1161        it.remove();
1162      }
1163    }
1164  }
1165
1166  /**
1167   * Remove the procedures that are marked as finished
1168   */
1169  private synchronized void cleanupCompletedSnapshotInMap() {
1170    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1171    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1172    while (it.hasNext()) {
1173      Map.Entry<SnapshotDescription, Long> entry = it.next();
1174      Long procId = entry.getValue();
1175      if (procExec.isRunning() && procExec.isFinished(procId)) {
1176        it.remove();
1177      }
1178    }
1179  }
1180
1181  //
1182  // Implementing Stoppable interface
1183  //
1184
1185  @Override
1186  public void stop(String why) {
1187    // short circuit
1188    if (this.stopped) return;
1189    // make sure we get stop
1190    this.stopped = true;
1191    // pass the stop onto take snapshot handlers
1192    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1193      snapshotHandler.cancel(why);
1194    }
1195    if (snapshotHandlerChoreCleanerTask != null) {
1196      snapshotHandlerChoreCleanerTask.cancel(true);
1197    }
1198    try {
1199      if (coordinator != null) {
1200        coordinator.close();
1201      }
1202    } catch (IOException e) {
1203      LOG.error("stop ProcedureCoordinator error", e);
1204    }
1205  }
1206
1207  @Override
1208  public boolean isStopped() {
1209    return this.stopped;
1210  }
1211
1212  /**
1213   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1214   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1215   * @throws UnsupportedOperationException if snapshot are not supported
1216   */
1217  public void checkSnapshotSupport() throws UnsupportedOperationException {
1218    if (!this.isSnapshotSupported) {
1219      throw new UnsupportedOperationException(
1220        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1221          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1222    }
1223  }
1224
1225  /**
1226   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1227   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1228   * up with snapshot data loss.
1229   * @param conf The {@link Configuration} object to use
1230   * @param mfs  The MasterFileSystem to use
1231   * @throws IOException                   in case of file-system operation failure
1232   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1233   *                                       system
1234   */
1235  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1236    throws IOException, UnsupportedOperationException {
1237    // Verify if snapshot is disabled by the user
1238    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1239    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1240    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1241
1242    // Extract cleaners from conf
1243    Set<String> hfileCleaners = new HashSet<>();
1244    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1245    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1246
1247    Set<String> logCleaners = new HashSet<>();
1248    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1249    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1250
1251    // check if an older version of snapshot directory was present
1252    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1253    FileSystem fs = mfs.getFileSystem();
1254    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1255    if (ss != null && !ss.isEmpty()) {
1256      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1257      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1258    }
1259
1260    // If the user has enabled the snapshot, we force the cleaners to be present
1261    // otherwise we still need to check if cleaners are enabled or not and verify
1262    // that there're no snapshot in the .snapshot folder.
1263    if (snapshotEnabled) {
1264      // Inject snapshot cleaners, if snapshot.enable is true
1265      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1266      hfileCleaners.add(HFileLinkCleaner.class.getName());
1267      // If sync acl to HDFS feature is enabled, then inject the cleaner
1268      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1269        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1270      }
1271
1272      // Set cleaners conf
1273      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1274        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1275      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1276        logCleaners.toArray(new String[logCleaners.size()]));
1277    } else {
1278      // There may be restore tables if snapshot is enabled and then disabled, so add
1279      // HFileLinkCleaner, see HBASE-26670 for more details.
1280      hfileCleaners.add(HFileLinkCleaner.class.getName());
1281      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1282        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1283      // Verify if SnapshotHFileCleaner are present
1284      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1285
1286      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1287      if (snapshotEnabled) {
1288        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1289          + HBASE_SNAPSHOT_ENABLED + "' property "
1290          + (userDisabled ? "is set to 'false'." : "is not set."));
1291      }
1292    }
1293
1294    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1295    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1296
1297    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1298    // otherwise we end up with snapshot data loss.
1299    if (!snapshotEnabled) {
1300      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1301      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1302      if (fs.exists(snapshotDir)) {
1303        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1304          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1305        if (snapshots != null) {
1306          LOG.error("Snapshots are present, but cleaners are not enabled.");
1307          checkSnapshotSupport();
1308        }
1309      }
1310    }
1311  }
1312
1313  @Override
1314  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1315    throws KeeperException, IOException, UnsupportedOperationException {
1316    this.master = master;
1317
1318    this.rootDir = master.getMasterFileSystem().getRootDir();
1319    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1320
1321    // get the configuration for the coordinator
1322    Configuration conf = master.getConfiguration();
1323    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1324    long timeoutMillis = Math.max(
1325      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1326        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1327      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1328        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1329    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1330
1331    // setup the default procedure coordinator
1332    String name = master.getServerName().toString();
1333    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1334    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1335      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1336
1337    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1338    this.executorService = master.getExecutorService();
1339    this.verifyWorkerAssigner =
1340      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1341        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1342    restoreUnfinishedSnapshotProcedure();
1343    restoreWorkers();
1344    resetTempDir();
1345    snapshotHandlerChoreCleanerTask =
1346      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1347  }
1348
1349  private void restoreUnfinishedSnapshotProcedure() {
1350    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1351      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1352      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1353        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1354        LOG.info("restore unfinished snapshot procedure {}", p);
1355      });
1356  }
1357
1358  @Override
1359  public String getProcedureSignature() {
1360    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1361  }
1362
1363  @Override
1364  public void execProcedure(ProcedureDescription desc) throws IOException {
1365    takeSnapshot(toSnapshotDescription(desc));
1366  }
1367
1368  @Override
1369  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1370    throws IOException {
1371    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1372    // In future, when we AC is removed for good, that check should be moved here.
1373  }
1374
1375  @Override
1376  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1377    return isSnapshotDone(toSnapshotDescription(desc));
1378  }
1379
1380  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1381    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1382    if (!desc.hasInstance()) {
1383      throw new IOException("Snapshot name is not defined: " + desc.toString());
1384    }
1385    String snapshotName = desc.getInstance();
1386    List<NameStringPair> props = desc.getConfigurationList();
1387    String table = null;
1388    for (NameStringPair prop : props) {
1389      if ("table".equalsIgnoreCase(prop.getName())) {
1390        table = prop.getValue();
1391      }
1392    }
1393    if (table == null) {
1394      throw new IOException("Snapshot table is not defined: " + desc.toString());
1395    }
1396    TableName tableName = TableName.valueOf(table);
1397    builder.setTable(tableName.getNameAsString());
1398    builder.setName(snapshotName);
1399    builder.setType(SnapshotDescription.Type.FLUSH);
1400    return builder.build();
1401  }
1402
1403  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1404    snapshotToProcIdMap.put(snapshot, procId);
1405    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1406      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1407  }
1408
1409  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1410    snapshotToProcIdMap.remove(snapshot, procId);
1411    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1412      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1413  }
1414
1415  public boolean snapshotProcedureEnabled() {
1416    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1417      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1418  }
1419
1420  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1421    throws ProcedureSuspendedException {
1422    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
1423    if (worker.isPresent()) {
1424      LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
1425      return worker.get();
1426    }
1427    verifyWorkerAssigner.suspend(procedure);
1428    throw new ProcedureSuspendedException();
1429  }
1430
1431  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
1432    MasterProcedureScheduler scheduler) {
1433    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1434    verifyWorkerAssigner.release(worker);
1435    verifyWorkerAssigner.wake(scheduler);
1436  }
1437
1438  private void restoreWorkers() {
1439    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1440      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1441      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1442        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1443        LOG.debug("{} restores used worker {}", p, p.getServerName());
1444      });
1445  }
1446
1447  public Integer getAvailableWorker(ServerName serverName) {
1448    return verifyWorkerAssigner.getAvailableWorker(serverName);
1449  }
1450}