001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.ScheduledFuture;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import java.util.stream.Collectors;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.CommonPathCapabilities;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.AclEntry;
046import org.apache.hadoop.fs.permission.AclStatus;
047import org.apache.hadoop.hbase.HBaseInterfaceAudience;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.Stoppable;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableState;
055import org.apache.hadoop.hbase.errorhandling.ForeignException;
056import org.apache.hadoop.hbase.executor.ExecutorService;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
059import org.apache.hadoop.hbase.master.MasterFileSystem;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsMaster;
062import org.apache.hadoop.hbase.master.SnapshotSentinel;
063import org.apache.hadoop.hbase.master.WorkerAssigner;
064import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
065import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
066import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
067import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
069import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
070import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
071import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
072import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
073import org.apache.hadoop.hbase.procedure.Procedure;
074import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
075import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
076import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
077import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
078import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
079import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
080import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
081import org.apache.hadoop.hbase.security.AccessDeniedException;
082import org.apache.hadoop.hbase.security.User;
083import org.apache.hadoop.hbase.security.access.AccessChecker;
084import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
085import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
086import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
087import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
088import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
089import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
090import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
091import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
092import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
093import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
094import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
095import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
097import org.apache.hadoop.hbase.util.CommonFSUtils;
098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
099import org.apache.hadoop.hbase.util.NonceKey;
100import org.apache.hadoop.hbase.util.TableDescriptorChecker;
101import org.apache.yetus.audience.InterfaceAudience;
102import org.apache.yetus.audience.InterfaceStability;
103import org.apache.zookeeper.KeeperException;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
114
115/**
116 * This class manages the procedure of taking and restoring snapshots. There is only one
117 * SnapshotManager for the master.
118 * <p>
119 * The class provides methods for monitoring in-progress snapshot actions.
120 * <p>
121 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
122 * simplification in the current implementation.
123 */
124@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
125@InterfaceStability.Unstable
126public class SnapshotManager extends MasterProcedureManager implements Stoppable {
127  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
128
129  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
130  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
131
132  /**
133   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
134   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
135   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
136   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
137   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
138   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
139   * done if exists or failed if it doesn't exists.
140   */
141  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
142    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
143  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
144
145  /** Enable or disable snapshot support */
146  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
147
148  /**
149   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
150   */
151  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
152
153  /** Name of the operation to use in the controller */
154  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
155
156  /** Conf key for # of threads used by the SnapshotManager thread pool */
157  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
158
159  /** number of current operations running on the master */
160  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
161
162  /** Conf key for preserving original max file size configs */
163  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
164    "hbase.snapshot.max.filesize.preserve";
165
166  /** Enable or disable snapshot procedure */
167  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
168
169  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
170
171  private boolean stopped;
172  private MasterServices master; // Needed by TableEventHandlers
173  private ProcedureCoordinator coordinator;
174
175  // Is snapshot feature enabled?
176  private boolean isSnapshotSupported = false;
177
178  // Snapshot handlers map, with table name as key.
179  // The map is always accessed and modified under the object lock using synchronized.
180  // snapshotTable() will insert an Handler in the table.
181  // isSnapshotDone() will remove the handler requested if the operation is finished.
182  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
183  private final ScheduledExecutorService scheduleThreadPool =
184    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
185      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
186  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
187
188  // Restore map, with table name as key, procedure ID as value.
189  // The map is always accessed and modified under the object lock using synchronized.
190  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
191  //
192  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
193  // restart/failover. This is just a stopgap implementation until implementation of taking
194  // snapshot using Procedure-V2.
195  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
196
197  // SnapshotDescription -> SnapshotProcId
198  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
199    new ConcurrentHashMap<>();
200
201  private WorkerAssigner verifyWorkerAssigner;
202
203  private Path rootDir;
204  private ExecutorService executorService;
205
206  /**
207   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
208   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
209   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
210   * start to work. (See HBASE-21387)
211   */
212  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
213
214  public SnapshotManager() {
215  }
216
217  /**
218   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
219   * @param master      services for the master where the manager is running
220   * @param coordinator procedure coordinator instance. exposed for testing.
221   * @param pool        HBase ExecutorServcie instance, exposed for testing.
222   */
223  @InterfaceAudience.Private
224  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
225    ExecutorService pool, int sentinelCleanInterval)
226    throws IOException, UnsupportedOperationException {
227    this.master = master;
228
229    this.rootDir = master.getMasterFileSystem().getRootDir();
230    Configuration conf = master.getConfiguration();
231    checkSnapshotSupport(conf, master.getMasterFileSystem());
232
233    this.coordinator = coordinator;
234    this.executorService = pool;
235    resetTempDir();
236    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
237      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
238  }
239
240  /**
241   * Gets the list of all completed snapshots.
242   * @return list of SnapshotDescriptions
243   * @throws IOException File system exception
244   */
245  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
246    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
247  }
248
249  /**
250   * Gets the list of all completed snapshots.
251   * @param snapshotDir snapshot directory
252   * @param withCpCall  Whether to call CP hooks
253   * @return list of SnapshotDescriptions
254   * @throws IOException File system exception
255   */
256  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
257    throws IOException {
258    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
259    // first create the snapshot root path and check to see if it exists
260    FileSystem fs = master.getMasterFileSystem().getFileSystem();
261    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
262
263    // if there are no snapshots, return an empty list
264    if (!fs.exists(snapshotDir)) {
265      return snapshotDescs;
266    }
267
268    // ignore all the snapshots in progress
269    FileStatus[] snapshots = fs.listStatus(snapshotDir,
270      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
271    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
272    withCpCall = withCpCall && cpHost != null;
273    // loop through all the completed snapshots
274    for (FileStatus snapshot : snapshots) {
275      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
276      // if the snapshot is bad
277      if (!fs.exists(info)) {
278        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
279        continue;
280      }
281      FSDataInputStream in = null;
282      try {
283        in = fs.open(info);
284        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
285        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
286          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
287        if (withCpCall) {
288          try {
289            cpHost.preListSnapshot(descPOJO);
290          } catch (AccessDeniedException e) {
291            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
292              + "Either you should be owner of this snapshot or admin user.");
293            // Skip this and try for next snapshot
294            continue;
295          }
296        }
297        snapshotDescs.add(desc);
298
299        // call coproc post hook
300        if (withCpCall) {
301          cpHost.postListSnapshot(descPOJO);
302        }
303      } catch (IOException e) {
304        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
305      } finally {
306        if (in != null) {
307          in.close();
308        }
309      }
310    }
311    return snapshotDescs;
312  }
313
314  /**
315   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
316   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
317   * directory.
318   * @throws IOException if we can't reach the filesystem
319   */
320  private void resetTempDir() throws IOException {
321    Set<String> workingProcedureCoordinatedSnapshotNames =
322      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
323
324    Path tmpdir =
325      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
326    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
327    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
328    if (workingSnapshotDirs == null) {
329      return;
330    }
331    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
332      String workingSnapshotName = workingSnapshotDir.getPath().getName();
333      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
334        try {
335          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
336            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
337              workingSnapshotDir.getPath());
338          } else {
339            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
340              workingSnapshotDir.getPath());
341          }
342        } catch (IOException e) {
343          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
344            workingSnapshotDir.getPath(), e);
345        }
346      } else {
347        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
348      }
349    }
350  }
351
352  /**
353   * Delete the specified snapshot
354   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
355   * @throws IOException                   For filesystem IOExceptions
356   */
357  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
358    // check to see if it is completed
359    if (!isSnapshotCompleted(snapshot)) {
360      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
361    }
362
363    String snapshotName = snapshot.getName();
364    // first create the snapshot description and check to see if it exists
365    FileSystem fs = master.getMasterFileSystem().getFileSystem();
366    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
367    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
368    // just the "name" and it does not contains the "real" snapshot information
369    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
370
371    // call coproc pre hook
372    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
373    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
374    if (cpHost != null) {
375      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
376      cpHost.preDeleteSnapshot(snapshotPOJO);
377    }
378
379    LOG.debug("Deleting snapshot: " + snapshotName);
380    // delete the existing snapshot
381    if (!fs.delete(snapshotDir, true)) {
382      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
383    }
384
385    // call coproc post hook
386    if (cpHost != null) {
387      cpHost.postDeleteSnapshot(snapshotPOJO);
388    }
389
390  }
391
392  /**
393   * Check if the specified snapshot is done
394   * @return true if snapshot is ready to be restored, false if it is still being taken.
395   * @throws IOException              IOException if error from HDFS or RPC
396   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
397   */
398  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
399    // check the request to make sure it has a snapshot
400    if (expected == null) {
401      throw new UnknownSnapshotException(
402        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
403    }
404
405    Long procId = snapshotToProcIdMap.get(expected);
406    if (procId != null) {
407      if (master.getMasterProcedureExecutor().isRunning()) {
408        return master.getMasterProcedureExecutor().isFinished(procId);
409      } else {
410        return false;
411      }
412    }
413
414    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
415
416    // check to see if the sentinel exists,
417    // and if the task is complete removes it from the in-progress snapshots map.
418    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
419
420    // stop tracking "abandoned" handlers
421    cleanupSentinels();
422
423    if (handler == null) {
424      // If there's no handler in the in-progress map, it means one of the following:
425      // - someone has already requested the snapshot state
426      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
427      // - the snapshot was never requested
428      // In those cases returns to the user the "done state" if the snapshots exists on disk,
429      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
430      if (!isSnapshotCompleted(expected)) {
431        throw new UnknownSnapshotException("Snapshot " + ssString
432          + " is not currently running or one of the known completed snapshots.");
433      }
434      // was done, return true;
435      return true;
436    }
437
438    // pass on any failure we find in the sentinel
439    try {
440      handler.rethrowExceptionIfFailed();
441    } catch (ForeignException e) {
442      // Give some procedure info on an exception.
443      String status;
444      Procedure p = coordinator.getProcedure(expected.getName());
445      if (p != null) {
446        status = p.getStatus();
447      } else {
448        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
449      }
450      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
451        ProtobufUtil.createSnapshotDesc(expected));
452    }
453
454    // check to see if we are done
455    if (handler.isFinished()) {
456      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
457      return true;
458    } else if (LOG.isDebugEnabled()) {
459      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
460    }
461    return false;
462  }
463
464  /**
465   * Check to see if there is a snapshot in progress with the same name or on the same table.
466   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
467   * don't allow snapshot with the same name.
468   * @param snapshot   description of the snapshot being checked.
469   * @param checkTable check if the table is already taking a snapshot.
470   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
471   *         table.
472   */
473  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
474    if (checkTable) {
475      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
476      if (isTakingSnapshot(snapshotTable)) {
477        return true;
478      }
479    }
480    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
481    while (it.hasNext()) {
482      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
483      SnapshotSentinel sentinel = entry.getValue();
484      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
485        return true;
486      }
487    }
488    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
489    while (spIt.hasNext()) {
490      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
491      if (
492        snapshot.getName().equals(entry.getKey().getName())
493          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
494      ) {
495        return true;
496      }
497    }
498    return false;
499  }
500
501  /**
502   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
503   * only allowing a single snapshot per table at a time.
504   * @param tableName name of the table being snapshotted.
505   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
506   */
507  public boolean isTakingSnapshot(final TableName tableName) {
508    return isTakingSnapshot(tableName, false);
509  }
510
511  public boolean isTableTakingAnySnapshot(final TableName tableName) {
512    return isTakingSnapshot(tableName, true);
513  }
514
515  /**
516   * Check to see if the specified table has a snapshot in progress. Since we introduce the
517   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
518   * can just consider tables in snapshotHandlers only, but for
519   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
520   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
521   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
522   * if table in snapshot.
523   * @param tableName      name of the table being snapshotted.
524   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
525   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
526   */
527  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
528    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
529    if (handler != null && !handler.isFinished()) {
530      return true;
531    }
532    if (checkProcedure) {
533      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
534        if (
535          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
536            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
537        ) {
538          return true;
539        }
540      }
541    }
542    return false;
543  }
544
545  /**
546   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
547   * aren't already running a snapshot or restore on the requested table.
548   * @param snapshot description of the snapshot we want to start
549   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
550   */
551  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
552    throws HBaseSnapshotException {
553    Path workingDir =
554      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
555
556    try {
557      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
558      // delete the working directory, since we aren't running the snapshot. Likely leftovers
559      // from a failed attempt.
560      workingDirFS.delete(workingDir, true);
561
562      // recreate the working directory for the snapshot
563      if (!workingDirFS.mkdirs(workingDir)) {
564        throw new SnapshotCreationException(
565          "Couldn't create working directory (" + workingDir + ") for snapshot",
566          ProtobufUtil.createSnapshotDesc(snapshot));
567      }
568      updateWorkingDirAclsIfRequired(workingDir, workingDirFS);
569    } catch (HBaseSnapshotException e) {
570      throw e;
571    } catch (IOException e) {
572      throw new SnapshotCreationException(
573        "Exception while checking to see if snapshot could be started.", e,
574        ProtobufUtil.createSnapshotDesc(snapshot));
575    }
576  }
577
578  /**
579   * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs,
580   * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that
581   * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of
582   * the working dir to new snapshot dir, the ACLs are retained.
583   * @param workingDir   working dir to build the snapshot.
584   * @param workingDirFS working dir file system.
585   * @throws IOException If ACL read/modify operation fails.
586   */
587  private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS)
588    throws IOException {
589    if (
590      !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS)
591        || workingDir.getParent() == null || workingDir.getParent().getParent() == null
592    ) {
593      return;
594    }
595    AclStatus snapshotWorkingParentDirStatus;
596    try {
597      snapshotWorkingParentDirStatus =
598        workingDirFS.getAclStatus(workingDir.getParent().getParent());
599    } catch (IOException e) {
600      LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}",
601        workingDir.getParent().getParent(), workingDir, e);
602      return;
603    }
604    List<AclEntry> snapshotWorkingParentDirAclStatusEntries =
605      snapshotWorkingParentDirStatus.getEntries();
606    if (
607      snapshotWorkingParentDirAclStatusEntries != null
608        && snapshotWorkingParentDirAclStatusEntries.size() > 0
609    ) {
610      workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries);
611    }
612  }
613
614  /**
615   * Take a snapshot of a disabled table.
616   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
617   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
618   *                     directory could not be determined
619   */
620  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
621    // setup the snapshot
622    prepareWorkingDirectory(snapshot);
623
624    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
625    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
626
627    // Take the snapshot of the disabled table
628    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
629    snapshotTable(snapshot, handler);
630  }
631
632  /**
633   * Take a snapshot of an enabled table.
634   * @param snapshot description of the snapshot to take.
635   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
636   *                     directory could not be determined
637   */
638  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
639    // setup the snapshot
640    prepareWorkingDirectory(snapshot);
641
642    // Take the snapshot of the enabled table
643    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
644    snapshotTable(snapshot, handler);
645  }
646
647  /**
648   * Take a snapshot using the specified handler. On failure the snapshot temporary working
649   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
650   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
651   * @param snapshot the snapshot description
652   * @param handler  the snapshot handler
653   */
654  private synchronized void snapshotTable(SnapshotDescription snapshot,
655    final TakeSnapshotHandler handler) throws IOException {
656    try {
657      handler.prepare();
658      this.executorService.submit(handler);
659      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
660    } catch (Exception e) {
661      // cleanup the working directory by trying to delete it from the fs.
662      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
663        master.getConfiguration());
664      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
665      try {
666        if (!workingDirFs.delete(workingDir, true)) {
667          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
668            + ClientSnapshotDescriptionUtils.toString(snapshot));
669        }
670      } catch (IOException e1) {
671        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
672          + ClientSnapshotDescriptionUtils.toString(snapshot));
673      }
674      // fail the snapshot
675      throw new SnapshotCreationException("Could not build snapshot handler", e,
676        ProtobufUtil.createSnapshotDesc(snapshot));
677    }
678  }
679
680  public ReadWriteLock getTakingSnapshotLock() {
681    return this.takingSnapshotLock;
682  }
683
684  /**
685   * The snapshot operation processing as following: <br>
686   * 1. Create a Snapshot Handler, and do some initialization; <br>
687   * 2. Put the handler into snapshotHandlers <br>
688   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
689   * and snapshotHandlers;
690   * @return true to indicate that there're some running snapshots.
691   */
692  public synchronized boolean isTakingAnySnapshot() {
693    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
694      || this.snapshotToProcIdMap.size() > 0;
695  }
696
697  /**
698   * Take a snapshot based on the enabled/disabled state of the table.
699   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
700   * @throws IOException            when some sort of generic IO exception occurs.
701   */
702  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
703    this.takingSnapshotLock.readLock().lock();
704    try {
705      takeSnapshotInternal(snapshot);
706    } finally {
707      this.takingSnapshotLock.readLock().unlock();
708    }
709  }
710
711  public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
712    throws IOException {
713    this.takingSnapshotLock.readLock().lock();
714    try {
715      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
716    } finally {
717      this.takingSnapshotLock.readLock().unlock();
718    }
719  }
720
721  private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup,
722    long nonce) throws IOException {
723    return MasterProcedureUtil
724      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
725        @Override
726        protected void run() throws IOException {
727          TableDescriptor tableDescriptor =
728            master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
729          MasterCoprocessorHost cpHost = getMaster().getMasterCoprocessorHost();
730          User user = RpcServer.getRequestUser().orElse(null);
731          org.apache.hadoop.hbase.client.SnapshotDescription snapshotDesc =
732            ProtobufUtil.createSnapshotDesc(snapshot);
733
734          if (cpHost != null) {
735            cpHost.preSnapshot(snapshotDesc, tableDescriptor, user);
736          }
737
738          sanityCheckBeforeSnapshot(snapshot, false);
739
740          long procId = submitProcedure(new SnapshotProcedure(
741            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
742
743          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
744
745          if (cpHost != null) {
746            cpHost.postSnapshot(snapshotDesc, tableDescriptor, user);
747          }
748        }
749
750        @Override
751        protected String getDescription() {
752          return "SnapshotProcedure";
753        }
754      });
755  }
756
757  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
758    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
759
760    // call pre coproc hook
761    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
762    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
763    if (cpHost != null) {
764      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
765      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
766    }
767
768    // if the table is enabled, then have the RS run actually the snapshot work
769    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
770    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
771      if (LOG.isDebugEnabled()) {
772        LOG.debug("Table enabled, starting distributed snapshots for {}",
773          ClientSnapshotDescriptionUtils.toString(snapshot));
774      }
775      snapshotEnabledTable(snapshot);
776      if (LOG.isDebugEnabled()) {
777        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
778      }
779    }
780    // For disabled table, snapshot is created by the master
781    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
782      if (LOG.isDebugEnabled()) {
783        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
784          ClientSnapshotDescriptionUtils.toString(snapshot));
785      }
786      snapshotDisabledTable(snapshot);
787      if (LOG.isDebugEnabled()) {
788        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
789      }
790    } else {
791      LOG.error("Can't snapshot table '" + snapshot.getTable()
792        + "', isn't open or closed, we don't know what to do!");
793      TablePartiallyOpenException tpoe =
794        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
795      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
796        ProtobufUtil.createSnapshotDesc(snapshot));
797    }
798
799    // call post coproc hook
800    if (cpHost != null) {
801      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
802    }
803  }
804
805  /**
806   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
807   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
808   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
809   * @param snapshot   description of the snapshot being checked.
810   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
811   *                   we need to check if another zk-coordinated snapshot is in progress, for the
812   *                   snapshot procedure, this is unnecessary.
813   * @return the table descriptor of the table
814   */
815  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
816    boolean checkTable) throws IOException {
817    // check to see if we already completed the snapshot
818    if (isSnapshotCompleted(snapshot)) {
819      throw new SnapshotExistsException(
820        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
821        ProtobufUtil.createSnapshotDesc(snapshot));
822    }
823    LOG.debug("No existing snapshot, attempting snapshot...");
824
825    // stop tracking "abandoned" handlers
826    cleanupSentinels();
827
828    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
829    // make sure we aren't already running a snapshot
830    if (isTakingSnapshot(snapshot, checkTable)) {
831      throw new SnapshotCreationException(
832        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
833          + " because we are already running another snapshot"
834          + " on the same table or with the same name");
835    }
836
837    // make sure we aren't running a restore on the same table
838    if (isRestoringTable(snapshotTable)) {
839      throw new SnapshotCreationException(
840        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
841          + " because we are already have a restore in progress on the same snapshot.");
842    }
843
844    // check to see if the table exists
845    TableDescriptor desc = null;
846    try {
847      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
848    } catch (FileNotFoundException e) {
849      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
850      LOG.error(msg);
851      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
852    } catch (IOException e) {
853      throw new SnapshotCreationException(
854        "Error while geting table description for table " + snapshot.getTable(), e,
855        ProtobufUtil.createSnapshotDesc(snapshot));
856    }
857    if (desc == null) {
858      throw new SnapshotCreationException(
859        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
860        ProtobufUtil.createSnapshotDesc(snapshot));
861    }
862    return desc;
863  }
864
865  /**
866   * Set the handler for the current snapshot
867   * <p>
868   * Exposed for TESTING
869   * @param handler handler the master should use TODO get rid of this if possible, repackaging,
870   *                modify tests.
871   */
872  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
873    final SnapshotSentinel handler) {
874    if (handler != null) {
875      this.snapshotHandlers.put(tableName, handler);
876    } else {
877      this.snapshotHandlers.remove(tableName);
878    }
879  }
880
881  /** Returns distributed commit coordinator for all running snapshots */
882  ProcedureCoordinator getCoordinator() {
883    return coordinator;
884  }
885
886  /**
887   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
888   * snapshot exists in the "completed snapshots folder".
889   * @param snapshot expected snapshot to check
890   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
891   *         not stored
892   * @throws IOException              if the filesystem throws an unexpected exception,
893   * @throws IllegalArgumentException if snapshot name is invalid.
894   */
895  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
896    try {
897      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
898      FileSystem fs = master.getMasterFileSystem().getFileSystem();
899      // check to see if the snapshot already exists
900      return fs.exists(snapshotDir);
901    } catch (IllegalArgumentException iae) {
902      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
903    }
904  }
905
906  /**
907   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
908   * restore in progress.
909   * @param reqSnapshot       Snapshot Descriptor from request
910   * @param tableName         table to clone
911   * @param snapshot          Snapshot Descriptor
912   * @param snapshotTableDesc Table Descriptor
913   * @param nonceKey          unique identifier to prevent duplicated RPC
914   * @return procId the ID of the clone snapshot procedure
915   */
916  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
917    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
918    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
919    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
920    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
921    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
922    if (cpHost != null) {
923      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
924      cpHost.preCloneSnapshot(snapshotPOJO, htd);
925    }
926    long procId;
927    try {
928      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
929    } catch (IOException e) {
930      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
931        + tableName.getNameAsString(), e);
932      throw e;
933    }
934    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
935
936    if (cpHost != null) {
937      cpHost.postCloneSnapshot(snapshotPOJO, htd);
938    }
939    return procId;
940  }
941
942  /**
943   * Clone the specified snapshot into a new table. The operation will fail if the destination table
944   * has a snapshot or restore in progress.
945   * @param snapshot        Snapshot Descriptor
946   * @param tableDescriptor Table Descriptor of the table to create
947   * @param nonceKey        unique identifier to prevent duplicated RPC
948   * @return procId the ID of the clone snapshot procedure
949   */
950  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
951    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
952    final String customSFT) throws HBaseSnapshotException {
953    TableName tableName = tableDescriptor.getTableName();
954
955    // make sure we aren't running a snapshot on the same table
956    if (isTableTakingAnySnapshot(tableName)) {
957      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
958    }
959
960    // make sure we aren't running a restore on the same table
961    if (isRestoringTable(tableName)) {
962      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
963    }
964
965    try {
966      long procId = master.getMasterProcedureExecutor().submitProcedure(
967        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
968          tableDescriptor, snapshot, restoreAcl, customSFT),
969        nonceKey);
970      this.restoreTableToProcIdMap.put(tableName, procId);
971      return procId;
972    } catch (Exception e) {
973      String msg = "Couldn't clone the snapshot="
974        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
975      LOG.error(msg, e);
976      throw new RestoreSnapshotException(msg, e);
977    }
978  }
979
980  /**
981   * Restore or Clone the specified snapshot
982   * @param nonceKey unique identifier to prevent duplicated RPC
983   */
984  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
985    final boolean restoreAcl, String customSFT) throws IOException {
986    FileSystem fs = master.getMasterFileSystem().getFileSystem();
987    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
988
989    // check if the snapshot exists
990    if (!fs.exists(snapshotDir)) {
991      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
992      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
993    }
994
995    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
996    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
997    // information.
998    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
999    SnapshotManifest manifest =
1000      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
1001    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
1002    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
1003
1004    // sanity check the new table descriptor
1005    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
1006
1007    // stop tracking "abandoned" handlers
1008    cleanupSentinels();
1009
1010    // Verify snapshot validity
1011    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
1012
1013    // Execute the restore/clone operation
1014    long procId;
1015    if (master.getTableDescriptors().exists(tableName)) {
1016      procId =
1017        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1018    } else {
1019      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
1020        restoreAcl, customSFT);
1021    }
1022    return procId;
1023  }
1024
1025  /**
1026   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1027   * or restore in progress.
1028   * @param reqSnapshot       Snapshot Descriptor from request
1029   * @param tableName         table to restore
1030   * @param snapshot          Snapshot Descriptor
1031   * @param snapshotTableDesc Table Descriptor
1032   * @param nonceKey          unique identifier to prevent duplicated RPC
1033   * @param restoreAcl        true to restore acl of snapshot
1034   * @return procId the ID of the restore snapshot procedure
1035   */
1036  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
1037    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
1038    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
1039    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
1040
1041    // have to check first if restoring the snapshot would break current SFT setup
1042    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
1043      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
1044
1045    if (
1046      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
1047        TableState.State.ENABLED)
1048    ) {
1049      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
1050        + "' must be disabled in order to " + "perform a restore operation.");
1051    }
1052
1053    // call Coprocessor pre hook
1054    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1055    if (cpHost != null) {
1056      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1057      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1058    }
1059
1060    long procId;
1061    try {
1062      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1063    } catch (IOException e) {
1064      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1065        + " as table " + tableName.getNameAsString(), e);
1066      throw e;
1067    }
1068    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1069
1070    if (cpHost != null) {
1071      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1072    }
1073
1074    return procId;
1075  }
1076
1077  /**
1078   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1079   * or restore in progress.
1080   * @param snapshot        Snapshot Descriptor
1081   * @param tableDescriptor Table Descriptor
1082   * @param nonceKey        unique identifier to prevent duplicated RPC
1083   * @param restoreAcl      true to restore acl of snapshot
1084   * @return procId the ID of the restore snapshot procedure
1085   */
1086  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1087    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1088    throws HBaseSnapshotException {
1089    final TableName tableName = tableDescriptor.getTableName();
1090
1091    // make sure we aren't running a snapshot on the same table
1092    if (isTableTakingAnySnapshot(tableName)) {
1093      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1094    }
1095
1096    // make sure we aren't running a restore on the same table
1097    if (isRestoringTable(tableName)) {
1098      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1099    }
1100
1101    try {
1102      TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName);
1103      long procId = master.getMasterProcedureExecutor().submitProcedure(
1104        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1105          oldDescriptor, tableDescriptor, snapshot, restoreAcl),
1106        nonceKey);
1107      this.restoreTableToProcIdMap.put(tableName, procId);
1108      return procId;
1109    } catch (Exception e) {
1110      String msg = "Couldn't restore the snapshot="
1111        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1112      LOG.error(msg, e);
1113      throw new RestoreSnapshotException(msg, e);
1114    }
1115  }
1116
1117  /**
1118   * Verify if the restore of the specified table is in progress.
1119   * @param tableName table under restore
1120   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1121   */
1122  private synchronized boolean isRestoringTable(final TableName tableName) {
1123    Long procId = this.restoreTableToProcIdMap.get(tableName);
1124    if (procId == null) {
1125      return false;
1126    }
1127    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1128    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1129      return true;
1130    } else {
1131      this.restoreTableToProcIdMap.remove(tableName);
1132      return false;
1133    }
1134  }
1135
1136  /**
1137   * Return the handler if it is currently live and has the same snapshot target name. The handler
1138   * is removed from the sentinels map if completed.
1139   * @param sentinels live handlers
1140   * @param snapshot  snapshot description
1141   * @return null if doesn't match, else a live handler.
1142   */
1143  private synchronized SnapshotSentinel removeSentinelIfFinished(
1144    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1145    if (!snapshot.hasTable()) {
1146      return null;
1147    }
1148
1149    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1150    SnapshotSentinel h = sentinels.get(snapshotTable);
1151    if (h == null) {
1152      return null;
1153    }
1154
1155    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1156      // specified snapshot is to the one currently running
1157      return null;
1158    }
1159
1160    // Remove from the "in-progress" list once completed
1161    if (h.isFinished()) {
1162      sentinels.remove(snapshotTable);
1163    }
1164
1165    return h;
1166  }
1167
1168  /**
1169   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1170   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1171   * the status of a completed task is requested. To avoid having sentinels staying around for long
1172   * time if something client side is failed, each operation tries to clean up the in-progress maps
1173   * sentinels finished from a long time.
1174   */
1175  private void cleanupSentinels() {
1176    cleanupSentinels(this.snapshotHandlers);
1177    cleanupCompletedRestoreInMap();
1178    cleanupCompletedSnapshotInMap();
1179  }
1180
1181  /**
1182   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1183   * removal timeout.
1184   * @param sentinels map of sentinels to clean
1185   */
1186  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1187    long currentTime = EnvironmentEdgeManager.currentTime();
1188    long sentinelsCleanupTimeoutMillis =
1189      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1190        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1191    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1192    while (it.hasNext()) {
1193      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1194      SnapshotSentinel sentinel = entry.getValue();
1195      if (
1196        sentinel.isFinished()
1197          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1198      ) {
1199        it.remove();
1200      }
1201    }
1202  }
1203
1204  /**
1205   * Remove the procedures that are marked as finished
1206   */
1207  private synchronized void cleanupCompletedRestoreInMap() {
1208    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1209    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1210    while (it.hasNext()) {
1211      Map.Entry<TableName, Long> entry = it.next();
1212      Long procId = entry.getValue();
1213      if (procExec.isRunning() && procExec.isFinished(procId)) {
1214        it.remove();
1215      }
1216    }
1217  }
1218
1219  /**
1220   * Remove the procedures that are marked as finished
1221   */
1222  private synchronized void cleanupCompletedSnapshotInMap() {
1223    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1224    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1225    while (it.hasNext()) {
1226      Map.Entry<SnapshotDescription, Long> entry = it.next();
1227      Long procId = entry.getValue();
1228      if (procExec.isRunning() && procExec.isFinished(procId)) {
1229        it.remove();
1230      }
1231    }
1232  }
1233
1234  //
1235  // Implementing Stoppable interface
1236  //
1237
1238  @Override
1239  public void stop(String why) {
1240    // short circuit
1241    if (this.stopped) return;
1242    // make sure we get stop
1243    this.stopped = true;
1244    // pass the stop onto take snapshot handlers
1245    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1246      snapshotHandler.cancel(why);
1247    }
1248    if (snapshotHandlerChoreCleanerTask != null) {
1249      snapshotHandlerChoreCleanerTask.cancel(true);
1250    }
1251    try {
1252      if (coordinator != null) {
1253        coordinator.close();
1254      }
1255    } catch (IOException e) {
1256      LOG.error("stop ProcedureCoordinator error", e);
1257    }
1258  }
1259
1260  @Override
1261  public boolean isStopped() {
1262    return this.stopped;
1263  }
1264
1265  /**
1266   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1267   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1268   * @throws UnsupportedOperationException if snapshot are not supported
1269   */
1270  public void checkSnapshotSupport() throws UnsupportedOperationException {
1271    if (!this.isSnapshotSupported) {
1272      throw new UnsupportedOperationException(
1273        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1274          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1275    }
1276  }
1277
1278  /**
1279   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1280   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1281   * up with snapshot data loss.
1282   * @param conf The {@link Configuration} object to use
1283   * @param mfs  The MasterFileSystem to use
1284   * @throws IOException                   in case of file-system operation failure
1285   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1286   *                                       system
1287   */
1288  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1289    throws IOException, UnsupportedOperationException {
1290    // Verify if snapshot is disabled by the user
1291    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1292    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1293    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1294
1295    // Extract cleaners from conf
1296    Set<String> hfileCleaners = new HashSet<>();
1297    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1298    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1299
1300    Set<String> logCleaners = new HashSet<>();
1301    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1302    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1303
1304    // check if an older version of snapshot directory was present
1305    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1306    FileSystem fs = mfs.getFileSystem();
1307    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1308    if (ss != null && !ss.isEmpty()) {
1309      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1310      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1311    }
1312
1313    // If the user has enabled the snapshot, we force the cleaners to be present
1314    // otherwise we still need to check if cleaners are enabled or not and verify
1315    // that there're no snapshot in the .snapshot folder.
1316    if (snapshotEnabled) {
1317      // Inject snapshot cleaners, if snapshot.enable is true
1318      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1319      hfileCleaners.add(HFileLinkCleaner.class.getName());
1320      // If sync acl to HDFS feature is enabled, then inject the cleaner
1321      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1322        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1323      }
1324
1325      // Set cleaners conf
1326      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1327        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1328      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1329        logCleaners.toArray(new String[logCleaners.size()]));
1330    } else {
1331      // There may be restore tables if snapshot is enabled and then disabled, so add
1332      // HFileLinkCleaner, see HBASE-26670 for more details.
1333      hfileCleaners.add(HFileLinkCleaner.class.getName());
1334      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1335        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1336      // Verify if SnapshotHFileCleaner are present
1337      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1338
1339      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1340      if (snapshotEnabled) {
1341        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1342          + HBASE_SNAPSHOT_ENABLED + "' property "
1343          + (userDisabled ? "is set to 'false'." : "is not set."));
1344      }
1345    }
1346
1347    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1348    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1349
1350    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1351    // otherwise we end up with snapshot data loss.
1352    if (!snapshotEnabled) {
1353      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1354      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1355      if (fs.exists(snapshotDir)) {
1356        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1357          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1358        if (snapshots != null) {
1359          LOG.error("Snapshots are present, but cleaners are not enabled.");
1360          checkSnapshotSupport();
1361        }
1362      }
1363    }
1364  }
1365
1366  @Override
1367  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1368    throws KeeperException, IOException, UnsupportedOperationException {
1369    this.master = master;
1370
1371    this.rootDir = master.getMasterFileSystem().getRootDir();
1372    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1373
1374    // get the configuration for the coordinator
1375    Configuration conf = master.getConfiguration();
1376    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1377    long timeoutMillis = Math.max(
1378      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1379        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1380      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1381        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1382    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1383
1384    // setup the default procedure coordinator
1385    String name = master.getServerName().toString();
1386    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1387    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1388      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1389
1390    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1391    this.executorService = master.getExecutorService();
1392    this.verifyWorkerAssigner =
1393      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1394        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1395    restoreUnfinishedSnapshotProcedure();
1396    restoreWorkers();
1397    resetTempDir();
1398    snapshotHandlerChoreCleanerTask =
1399      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1400  }
1401
1402  private void restoreUnfinishedSnapshotProcedure() {
1403    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1404      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1405      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1406        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1407        LOG.info("restore unfinished snapshot procedure {}", p);
1408      });
1409  }
1410
1411  @Override
1412  public String getProcedureSignature() {
1413    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1414  }
1415
1416  @Override
1417  public void execProcedure(ProcedureDescription desc) throws IOException {
1418    takeSnapshot(toSnapshotDescription(desc));
1419  }
1420
1421  @Override
1422  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1423    throws IOException {
1424    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1425    // In future, when we AC is removed for good, that check should be moved here.
1426  }
1427
1428  @Override
1429  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1430    return isSnapshotDone(toSnapshotDescription(desc));
1431  }
1432
1433  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1434    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1435    if (!desc.hasInstance()) {
1436      throw new IOException("Snapshot name is not defined: " + desc.toString());
1437    }
1438    String snapshotName = desc.getInstance();
1439    List<NameStringPair> props = desc.getConfigurationList();
1440    String table = null;
1441    for (NameStringPair prop : props) {
1442      if ("table".equalsIgnoreCase(prop.getName())) {
1443        table = prop.getValue();
1444      }
1445    }
1446    if (table == null) {
1447      throw new IOException("Snapshot table is not defined: " + desc.toString());
1448    }
1449    TableName tableName = TableName.valueOf(table);
1450    builder.setTable(tableName.getNameAsString());
1451    builder.setName(snapshotName);
1452    builder.setType(SnapshotDescription.Type.FLUSH);
1453    return builder.build();
1454  }
1455
1456  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1457    snapshotToProcIdMap.put(snapshot, procId);
1458    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1459      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1460  }
1461
1462  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1463    snapshotToProcIdMap.remove(snapshot, procId);
1464    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1465      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1466  }
1467
1468  public boolean snapshotProcedureEnabled() {
1469    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1470      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1471  }
1472
1473  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1474    throws ProcedureSuspendedException {
1475    ServerName worker = verifyWorkerAssigner.acquire(procedure);
1476    LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
1477    return worker;
1478  }
1479
1480  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
1481    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1482    verifyWorkerAssigner.release(worker);
1483  }
1484
1485  private void restoreWorkers() {
1486    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1487      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1488      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1489        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1490        LOG.debug("{} restores used worker {}", p, p.getServerName());
1491      });
1492  }
1493
1494  public Integer getAvailableWorker(ServerName serverName) {
1495    return verifyWorkerAssigner.getAvailableWorker(serverName);
1496  }
1497}