001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.ScheduledFuture;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import java.util.stream.Collectors;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.CommonPathCapabilities;
041import org.apache.hadoop.fs.FSDataInputStream;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.Path;
045import org.apache.hadoop.fs.permission.AclEntry;
046import org.apache.hadoop.fs.permission.AclStatus;
047import org.apache.hadoop.hbase.HBaseInterfaceAudience;
048import org.apache.hadoop.hbase.HConstants;
049import org.apache.hadoop.hbase.ServerName;
050import org.apache.hadoop.hbase.Stoppable;
051import org.apache.hadoop.hbase.TableName;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableState;
055import org.apache.hadoop.hbase.errorhandling.ForeignException;
056import org.apache.hadoop.hbase.executor.ExecutorService;
057import org.apache.hadoop.hbase.ipc.RpcServer;
058import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
059import org.apache.hadoop.hbase.master.MasterFileSystem;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.MetricsMaster;
062import org.apache.hadoop.hbase.master.SnapshotSentinel;
063import org.apache.hadoop.hbase.master.WorkerAssigner;
064import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
065import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
066import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
067import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
069import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
070import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
071import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
072import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
073import org.apache.hadoop.hbase.procedure.Procedure;
074import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
075import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
076import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
077import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
078import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
079import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
080import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
081import org.apache.hadoop.hbase.security.AccessDeniedException;
082import org.apache.hadoop.hbase.security.User;
083import org.apache.hadoop.hbase.security.access.AccessChecker;
084import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
085import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
086import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
087import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
088import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
089import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
090import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
091import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
092import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
093import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
094import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
095import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
097import org.apache.hadoop.hbase.util.CommonFSUtils;
098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
099import org.apache.hadoop.hbase.util.NonceKey;
100import org.apache.hadoop.hbase.util.TableDescriptorChecker;
101import org.apache.yetus.audience.InterfaceAudience;
102import org.apache.yetus.audience.InterfaceStability;
103import org.apache.zookeeper.KeeperException;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
108
109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
110import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
111import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
114
115/**
116 * This class manages the procedure of taking and restoring snapshots. There is only one
117 * SnapshotManager for the master.
118 * <p>
119 * The class provides methods for monitoring in-progress snapshot actions.
120 * <p>
121 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
122 * simplification in the current implementation.
123 */
124@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
125@InterfaceStability.Unstable
126public class SnapshotManager extends MasterProcedureManager implements Stoppable {
127  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
128
129  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
130  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
131
132  /**
133   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
134   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
135   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
136   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
137   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
138   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
139   * done if exists or failed if it doesn't exists.
140   */
141  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
142    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
143  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
144
145  /** Enable or disable snapshot support */
146  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
147
148  /**
149   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
150   */
151  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
152
153  /** Name of the operation to use in the controller */
154  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
155
156  /** Conf key for # of threads used by the SnapshotManager thread pool */
157  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
158
159  /** number of current operations running on the master */
160  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
161
162  /** Conf key for preserving original max file size configs */
163  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
164    "hbase.snapshot.max.filesize.preserve";
165
166  /** Enable or disable snapshot procedure */
167  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
168
169  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
170
171  private boolean stopped;
172  private MasterServices master; // Needed by TableEventHandlers
173  private ProcedureCoordinator coordinator;
174
175  // Is snapshot feature enabled?
176  private boolean isSnapshotSupported = false;
177
178  // Snapshot handlers map, with table name as key.
179  // The map is always accessed and modified under the object lock using synchronized.
180  // snapshotTable() will insert an Handler in the table.
181  // isSnapshotDone() will remove the handler requested if the operation is finished.
182  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
183  private final ScheduledExecutorService scheduleThreadPool =
184    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
185      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
186  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
187
188  // Restore map, with table name as key, procedure ID as value.
189  // The map is always accessed and modified under the object lock using synchronized.
190  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
191  //
192  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
193  // restart/failover. This is just a stopgap implementation until implementation of taking
194  // snapshot using Procedure-V2.
195  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
196
197  // SnapshotDescription -> SnapshotProcId
198  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
199    new ConcurrentHashMap<>();
200
201  private WorkerAssigner verifyWorkerAssigner;
202
203  private Path rootDir;
204  private ExecutorService executorService;
205
206  /**
207   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
208   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
209   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
210   * start to work. (See HBASE-21387)
211   */
212  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
213
214  public SnapshotManager() {
215  }
216
217  /**
218   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
219   * @param master      services for the master where the manager is running
220   * @param coordinator procedure coordinator instance. exposed for testing.
221   * @param pool        HBase ExecutorServcie instance, exposed for testing.
222   */
223  @InterfaceAudience.Private
224  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
225    ExecutorService pool, int sentinelCleanInterval)
226    throws IOException, UnsupportedOperationException {
227    this.master = master;
228
229    this.rootDir = master.getMasterFileSystem().getRootDir();
230    Configuration conf = master.getConfiguration();
231    checkSnapshotSupport(conf, master.getMasterFileSystem());
232
233    this.coordinator = coordinator;
234    this.executorService = pool;
235    resetTempDir();
236    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
237      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
238  }
239
240  /**
241   * Gets the list of all completed snapshots.
242   * @return list of SnapshotDescriptions
243   * @throws IOException File system exception
244   */
245  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
246    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
247  }
248
249  /**
250   * Gets the list of all completed snapshots.
251   * @param snapshotDir snapshot directory
252   * @param withCpCall  Whether to call CP hooks
253   * @return list of SnapshotDescriptions
254   * @throws IOException File system exception
255   */
256  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
257    throws IOException {
258    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
259    // first create the snapshot root path and check to see if it exists
260    FileSystem fs = master.getMasterFileSystem().getFileSystem();
261    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
262
263    // if there are no snapshots, return an empty list
264    if (!fs.exists(snapshotDir)) {
265      return snapshotDescs;
266    }
267
268    // ignore all the snapshots in progress
269    FileStatus[] snapshots = fs.listStatus(snapshotDir,
270      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
271    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
272    withCpCall = withCpCall && cpHost != null;
273    // loop through all the completed snapshots
274    for (FileStatus snapshot : snapshots) {
275      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
276      // if the snapshot is bad
277      if (!fs.exists(info)) {
278        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
279        continue;
280      }
281      FSDataInputStream in = null;
282      try {
283        in = fs.open(info);
284        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
285        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
286          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
287        if (withCpCall) {
288          try {
289            cpHost.preListSnapshot(descPOJO);
290          } catch (AccessDeniedException e) {
291            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
292              + "Either you should be owner of this snapshot or admin user.");
293            // Skip this and try for next snapshot
294            continue;
295          }
296        }
297        snapshotDescs.add(desc);
298
299        // call coproc post hook
300        if (withCpCall) {
301          cpHost.postListSnapshot(descPOJO);
302        }
303      } catch (IOException e) {
304        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
305      } finally {
306        if (in != null) {
307          in.close();
308        }
309      }
310    }
311    return snapshotDescs;
312  }
313
314  /**
315   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
316   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
317   * directory.
318   * @throws IOException if we can't reach the filesystem
319   */
320  private void resetTempDir() throws IOException {
321    Set<String> workingProcedureCoordinatedSnapshotNames =
322      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
323
324    Path tmpdir =
325      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
326    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
327    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
328    if (workingSnapshotDirs == null) {
329      return;
330    }
331    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
332      String workingSnapshotName = workingSnapshotDir.getPath().getName();
333      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
334        try {
335          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
336            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
337              workingSnapshotDir.getPath());
338          } else {
339            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
340              workingSnapshotDir.getPath());
341          }
342        } catch (IOException e) {
343          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
344            workingSnapshotDir.getPath(), e);
345        }
346      } else {
347        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
348      }
349    }
350  }
351
352  /**
353   * Delete the specified snapshot
354   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
355   * @throws IOException                   For filesystem IOExceptions
356   */
357  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
358    // check to see if it is completed
359    if (!isSnapshotCompleted(snapshot)) {
360      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
361    }
362
363    String snapshotName = snapshot.getName();
364    // first create the snapshot description and check to see if it exists
365    FileSystem fs = master.getMasterFileSystem().getFileSystem();
366    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
367    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
368    // just the "name" and it does not contains the "real" snapshot information
369    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
370
371    // call coproc pre hook
372    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
373    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
374    if (cpHost != null) {
375      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
376      cpHost.preDeleteSnapshot(snapshotPOJO);
377    }
378
379    LOG.debug("Deleting snapshot: " + snapshotName);
380    // delete the existing snapshot
381    if (!fs.delete(snapshotDir, true)) {
382      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
383    }
384
385    // call coproc post hook
386    if (cpHost != null) {
387      cpHost.postDeleteSnapshot(snapshotPOJO);
388    }
389
390  }
391
392  /**
393   * Check if the specified snapshot is done
394   * @return true if snapshot is ready to be restored, false if it is still being taken.
395   * @throws IOException              IOException if error from HDFS or RPC
396   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
397   */
398  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
399    // check the request to make sure it has a snapshot
400    if (expected == null) {
401      throw new UnknownSnapshotException(
402        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
403    }
404
405    Long procId = snapshotToProcIdMap.get(expected);
406    if (procId != null) {
407      if (master.getMasterProcedureExecutor().isRunning()) {
408        return master.getMasterProcedureExecutor().isFinished(procId);
409      } else {
410        return false;
411      }
412    }
413
414    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
415
416    // check to see if the sentinel exists,
417    // and if the task is complete removes it from the in-progress snapshots map.
418    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
419
420    // stop tracking "abandoned" handlers
421    cleanupSentinels();
422
423    if (handler == null) {
424      // If there's no handler in the in-progress map, it means one of the following:
425      // - someone has already requested the snapshot state
426      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
427      // - the snapshot was never requested
428      // In those cases returns to the user the "done state" if the snapshots exists on disk,
429      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
430      if (!isSnapshotCompleted(expected)) {
431        throw new UnknownSnapshotException("Snapshot " + ssString
432          + " is not currently running or one of the known completed snapshots.");
433      }
434      // was done, return true;
435      return true;
436    }
437
438    // pass on any failure we find in the sentinel
439    try {
440      handler.rethrowExceptionIfFailed();
441    } catch (ForeignException e) {
442      // Give some procedure info on an exception.
443      String status;
444      Procedure p = coordinator.getProcedure(expected.getName());
445      if (p != null) {
446        status = p.getStatus();
447      } else {
448        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
449      }
450      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
451        ProtobufUtil.createSnapshotDesc(expected));
452    }
453
454    // check to see if we are done
455    if (handler.isFinished()) {
456      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
457      return true;
458    } else if (LOG.isDebugEnabled()) {
459      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
460    }
461    return false;
462  }
463
464  /**
465   * Check to see if there is a snapshot in progress with the same name or on the same table.
466   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
467   * don't allow snapshot with the same name.
468   * @param snapshot   description of the snapshot being checked.
469   * @param checkTable check if the table is already taking a snapshot.
470   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
471   *         table.
472   */
473  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
474    if (checkTable) {
475      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
476      if (isTakingSnapshot(snapshotTable)) {
477        return true;
478      }
479    }
480    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
481    while (it.hasNext()) {
482      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
483      SnapshotSentinel sentinel = entry.getValue();
484      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
485        return true;
486      }
487    }
488    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
489    while (spIt.hasNext()) {
490      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
491      if (
492        snapshot.getName().equals(entry.getKey().getName())
493          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
494      ) {
495        return true;
496      }
497    }
498    return false;
499  }
500
501  /**
502   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
503   * only allowing a single snapshot per table at a time.
504   * @param tableName name of the table being snapshotted.
505   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
506   */
507  public boolean isTakingSnapshot(final TableName tableName) {
508    return isTakingSnapshot(tableName, false);
509  }
510
511  public boolean isTableTakingAnySnapshot(final TableName tableName) {
512    return isTakingSnapshot(tableName, true);
513  }
514
515  /**
516   * Check to see if the specified table has a snapshot in progress. Since we introduce the
517   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
518   * can just consider tables in snapshotHandlers only, but for
519   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
520   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
521   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
522   * if table in snapshot.
523   * @param tableName      name of the table being snapshotted.
524   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
525   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
526   */
527  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
528    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
529    if (handler != null && !handler.isFinished()) {
530      return true;
531    }
532    if (checkProcedure) {
533      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
534        if (
535          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
536            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
537        ) {
538          return true;
539        }
540      }
541    }
542    return false;
543  }
544
545  /**
546   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
547   * aren't already running a snapshot or restore on the requested table.
548   * @param snapshot description of the snapshot we want to start
549   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
550   */
551  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
552    throws HBaseSnapshotException {
553    Path workingDir =
554      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
555
556    try {
557      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
558      // delete the working directory, since we aren't running the snapshot. Likely leftovers
559      // from a failed attempt.
560      workingDirFS.delete(workingDir, true);
561
562      // recreate the working directory for the snapshot
563      if (!workingDirFS.mkdirs(workingDir)) {
564        throw new SnapshotCreationException(
565          "Couldn't create working directory (" + workingDir + ") for snapshot",
566          ProtobufUtil.createSnapshotDesc(snapshot));
567      }
568      updateWorkingDirAclsIfRequired(workingDir, workingDirFS);
569    } catch (HBaseSnapshotException e) {
570      throw e;
571    } catch (IOException e) {
572      throw new SnapshotCreationException(
573        "Exception while checking to see if snapshot could be started.", e,
574        ProtobufUtil.createSnapshotDesc(snapshot));
575    }
576  }
577
578  /**
579   * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs,
580   * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that
581   * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of
582   * the working dir to new snapshot dir, the ACLs are retained.
583   * @param workingDir   working dir to build the snapshot.
584   * @param workingDirFS working dir file system.
585   * @throws IOException If ACL read/modify operation fails.
586   */
587  private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS)
588    throws IOException {
589    if (
590      !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS)
591        || workingDir.getParent() == null || workingDir.getParent().getParent() == null
592    ) {
593      return;
594    }
595    AclStatus snapshotWorkingParentDirStatus;
596    try {
597      snapshotWorkingParentDirStatus =
598        workingDirFS.getAclStatus(workingDir.getParent().getParent());
599    } catch (IOException e) {
600      LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}",
601        workingDir.getParent().getParent(), workingDir, e);
602      return;
603    }
604    List<AclEntry> snapshotWorkingParentDirAclStatusEntries =
605      snapshotWorkingParentDirStatus.getEntries();
606    if (
607      snapshotWorkingParentDirAclStatusEntries != null
608        && snapshotWorkingParentDirAclStatusEntries.size() > 0
609    ) {
610      workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries);
611    }
612  }
613
614  /**
615   * Take a snapshot of a disabled table.
616   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
617   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
618   *                     directory could not be determined
619   */
620  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
621    // setup the snapshot
622    prepareWorkingDirectory(snapshot);
623
624    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
625    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
626
627    // Take the snapshot of the disabled table
628    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
629    snapshotTable(snapshot, handler);
630  }
631
632  /**
633   * Take a snapshot of an enabled table.
634   * @param snapshot description of the snapshot to take.
635   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
636   *                     directory could not be determined
637   */
638  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
639    // setup the snapshot
640    prepareWorkingDirectory(snapshot);
641
642    // Take the snapshot of the enabled table
643    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
644    snapshotTable(snapshot, handler);
645  }
646
647  /**
648   * Take a snapshot using the specified handler. On failure the snapshot temporary working
649   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
650   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
651   * @param snapshot the snapshot description
652   * @param handler  the snapshot handler
653   */
654  private synchronized void snapshotTable(SnapshotDescription snapshot,
655    final TakeSnapshotHandler handler) throws IOException {
656    try {
657      handler.prepare();
658      this.executorService.submit(handler);
659      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
660    } catch (Exception e) {
661      // cleanup the working directory by trying to delete it from the fs.
662      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
663        master.getConfiguration());
664      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
665      try {
666        if (!workingDirFs.delete(workingDir, true)) {
667          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
668            + ClientSnapshotDescriptionUtils.toString(snapshot));
669        }
670      } catch (IOException e1) {
671        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
672          + ClientSnapshotDescriptionUtils.toString(snapshot));
673      }
674      // fail the snapshot
675      throw new SnapshotCreationException("Could not build snapshot handler", e,
676        ProtobufUtil.createSnapshotDesc(snapshot));
677    }
678  }
679
680  public ReadWriteLock getTakingSnapshotLock() {
681    return this.takingSnapshotLock;
682  }
683
684  /**
685   * The snapshot operation processing as following: <br>
686   * 1. Create a Snapshot Handler, and do some initialization; <br>
687   * 2. Put the handler into snapshotHandlers <br>
688   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
689   * and snapshotHandlers;
690   * @return true to indicate that there're some running snapshots.
691   */
692  public synchronized boolean isTakingAnySnapshot() {
693    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
694      || this.snapshotToProcIdMap.size() > 0;
695  }
696
697  /**
698   * Take a snapshot based on the enabled/disabled state of the table.
699   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
700   * @throws IOException            when some sort of generic IO exception occurs.
701   */
702  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
703    this.takingSnapshotLock.readLock().lock();
704    try {
705      takeSnapshotInternal(snapshot);
706    } finally {
707      this.takingSnapshotLock.readLock().unlock();
708    }
709  }
710
711  public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
712    throws IOException {
713    this.takingSnapshotLock.readLock().lock();
714    try {
715      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
716    } finally {
717      this.takingSnapshotLock.readLock().unlock();
718    }
719  }
720
721  private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup,
722    long nonce) throws IOException {
723    return MasterProcedureUtil
724      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
725        @Override
726        protected void run() throws IOException {
727          TableDescriptor tableDescriptor = sanityCheckBeforeSnapshot(snapshot, false);
728          MasterCoprocessorHost cpHost = getMaster().getMasterCoprocessorHost();
729          User user = RpcServer.getRequestUser().orElse(null);
730          org.apache.hadoop.hbase.client.SnapshotDescription snapshotDesc =
731            ProtobufUtil.createSnapshotDesc(snapshot);
732
733          if (cpHost != null) {
734            cpHost.preSnapshot(snapshotDesc, tableDescriptor, user);
735          }
736
737          long procId = submitProcedure(new SnapshotProcedure(
738            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
739
740          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
741
742          if (cpHost != null) {
743            cpHost.postSnapshot(snapshotDesc, tableDescriptor, user);
744          }
745        }
746
747        @Override
748        protected String getDescription() {
749          return "SnapshotProcedure";
750        }
751      });
752  }
753
754  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
755    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
756
757    // call pre coproc hook
758    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
759    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
760    if (cpHost != null) {
761      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
762      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
763    }
764
765    // if the table is enabled, then have the RS run actually the snapshot work
766    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
767    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
768      if (LOG.isDebugEnabled()) {
769        LOG.debug("Table enabled, starting distributed snapshots for {}",
770          ClientSnapshotDescriptionUtils.toString(snapshot));
771      }
772      snapshotEnabledTable(snapshot);
773      if (LOG.isDebugEnabled()) {
774        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
775      }
776    }
777    // For disabled table, snapshot is created by the master
778    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
779      if (LOG.isDebugEnabled()) {
780        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
781          ClientSnapshotDescriptionUtils.toString(snapshot));
782      }
783      snapshotDisabledTable(snapshot);
784      if (LOG.isDebugEnabled()) {
785        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
786      }
787    } else {
788      LOG.error("Can't snapshot table '" + snapshot.getTable()
789        + "', isn't open or closed, we don't know what to do!");
790      TablePartiallyOpenException tpoe =
791        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
792      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
793        ProtobufUtil.createSnapshotDesc(snapshot));
794    }
795
796    // call post coproc hook
797    if (cpHost != null) {
798      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
799    }
800  }
801
802  /**
803   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
804   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
805   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
806   * @param snapshot   description of the snapshot being checked.
807   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
808   *                   we need to check if another zk-coordinated snapshot is in progress, for the
809   *                   snapshot procedure, this is unnecessary.
810   * @return the table descriptor of the table
811   */
812  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
813    boolean checkTable) throws IOException {
814    // check to see if we already completed the snapshot
815    if (isSnapshotCompleted(snapshot)) {
816      throw new SnapshotExistsException(
817        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
818        ProtobufUtil.createSnapshotDesc(snapshot));
819    }
820    LOG.debug("No existing snapshot, attempting snapshot...");
821
822    // stop tracking "abandoned" handlers
823    cleanupSentinels();
824
825    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
826    // make sure we aren't already running a snapshot
827    if (isTakingSnapshot(snapshot, checkTable)) {
828      throw new SnapshotCreationException(
829        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
830          + " because we are already running another snapshot"
831          + " on the same table or with the same name");
832    }
833
834    // make sure we aren't running a restore on the same table
835    if (isRestoringTable(snapshotTable)) {
836      throw new SnapshotCreationException(
837        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
838          + " because we are already have a restore in progress on the same snapshot.");
839    }
840
841    // check to see if the table exists
842    TableDescriptor desc = null;
843    try {
844      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
845    } catch (FileNotFoundException e) {
846      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
847      LOG.error(msg);
848      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
849    } catch (IOException e) {
850      throw new SnapshotCreationException(
851        "Error while geting table description for table " + snapshot.getTable(), e,
852        ProtobufUtil.createSnapshotDesc(snapshot));
853    }
854    if (desc == null) {
855      throw new SnapshotCreationException(
856        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
857        ProtobufUtil.createSnapshotDesc(snapshot));
858    }
859    return desc;
860  }
861
862  /**
863   * Set the handler for the current snapshot
864   * <p>
865   * Exposed for TESTING
866   * @param handler handler the master should use TODO get rid of this if possible, repackaging,
867   *                modify tests.
868   */
869  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
870    final SnapshotSentinel handler) {
871    if (handler != null) {
872      this.snapshotHandlers.put(tableName, handler);
873    } else {
874      this.snapshotHandlers.remove(tableName);
875    }
876  }
877
878  /** Returns distributed commit coordinator for all running snapshots */
879  ProcedureCoordinator getCoordinator() {
880    return coordinator;
881  }
882
883  /**
884   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
885   * snapshot exists in the "completed snapshots folder".
886   * @param snapshot expected snapshot to check
887   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
888   *         not stored
889   * @throws IOException              if the filesystem throws an unexpected exception,
890   * @throws IllegalArgumentException if snapshot name is invalid.
891   */
892  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
893    try {
894      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
895      FileSystem fs = master.getMasterFileSystem().getFileSystem();
896      // check to see if the snapshot already exists
897      return fs.exists(snapshotDir);
898    } catch (IllegalArgumentException iae) {
899      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
900    }
901  }
902
903  /**
904   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
905   * restore in progress.
906   * @param reqSnapshot       Snapshot Descriptor from request
907   * @param tableName         table to clone
908   * @param snapshot          Snapshot Descriptor
909   * @param snapshotTableDesc Table Descriptor
910   * @param nonceKey          unique identifier to prevent duplicated RPC
911   * @return procId the ID of the clone snapshot procedure
912   */
913  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
914    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
915    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
916    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
917    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
918    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
919    if (cpHost != null) {
920      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
921      cpHost.preCloneSnapshot(snapshotPOJO, htd);
922    }
923    long procId;
924    try {
925      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
926    } catch (IOException e) {
927      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
928        + tableName.getNameAsString(), e);
929      throw e;
930    }
931    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
932
933    if (cpHost != null) {
934      cpHost.postCloneSnapshot(snapshotPOJO, htd);
935    }
936    return procId;
937  }
938
939  /**
940   * Clone the specified snapshot into a new table. The operation will fail if the destination table
941   * has a snapshot or restore in progress.
942   * @param snapshot        Snapshot Descriptor
943   * @param tableDescriptor Table Descriptor of the table to create
944   * @param nonceKey        unique identifier to prevent duplicated RPC
945   * @return procId the ID of the clone snapshot procedure
946   */
947  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
948    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
949    final String customSFT) throws HBaseSnapshotException {
950    TableName tableName = tableDescriptor.getTableName();
951
952    // make sure we aren't running a snapshot on the same table
953    if (isTableTakingAnySnapshot(tableName)) {
954      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
955    }
956
957    // make sure we aren't running a restore on the same table
958    if (isRestoringTable(tableName)) {
959      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
960    }
961
962    try {
963      long procId = master.getMasterProcedureExecutor().submitProcedure(
964        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
965          tableDescriptor, snapshot, restoreAcl, customSFT),
966        nonceKey);
967      this.restoreTableToProcIdMap.put(tableName, procId);
968      return procId;
969    } catch (Exception e) {
970      String msg = "Couldn't clone the snapshot="
971        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
972      LOG.error(msg, e);
973      throw new RestoreSnapshotException(msg, e);
974    }
975  }
976
977  /**
978   * Restore or Clone the specified snapshot
979   * @param nonceKey unique identifier to prevent duplicated RPC
980   */
981  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
982    final boolean restoreAcl, String customSFT) throws IOException {
983    FileSystem fs = master.getMasterFileSystem().getFileSystem();
984    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
985
986    // check if the snapshot exists
987    if (!fs.exists(snapshotDir)) {
988      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
989      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
990    }
991
992    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
993    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
994    // information.
995    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
996    SnapshotManifest manifest =
997      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
998    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
999    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
1000
1001    // sanity check the new table descriptor
1002    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
1003
1004    // stop tracking "abandoned" handlers
1005    cleanupSentinels();
1006
1007    // Verify snapshot validity
1008    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
1009
1010    // Execute the restore/clone operation
1011    long procId;
1012    if (master.getTableDescriptors().exists(tableName)) {
1013      procId =
1014        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1015    } else {
1016      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
1017        restoreAcl, customSFT);
1018    }
1019    return procId;
1020  }
1021
1022  /**
1023   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1024   * or restore in progress.
1025   * @param reqSnapshot       Snapshot Descriptor from request
1026   * @param tableName         table to restore
1027   * @param snapshot          Snapshot Descriptor
1028   * @param snapshotTableDesc Table Descriptor
1029   * @param nonceKey          unique identifier to prevent duplicated RPC
1030   * @param restoreAcl        true to restore acl of snapshot
1031   * @return procId the ID of the restore snapshot procedure
1032   */
1033  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
1034    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
1035    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
1036    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
1037
1038    // have to check first if restoring the snapshot would break current SFT setup
1039    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
1040      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
1041
1042    if (
1043      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
1044        TableState.State.ENABLED)
1045    ) {
1046      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
1047        + "' must be disabled in order to " + "perform a restore operation.");
1048    }
1049
1050    // call Coprocessor pre hook
1051    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1052    if (cpHost != null) {
1053      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1054      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1055    }
1056
1057    long procId;
1058    try {
1059      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1060    } catch (IOException e) {
1061      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1062        + " as table " + tableName.getNameAsString(), e);
1063      throw e;
1064    }
1065    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1066
1067    if (cpHost != null) {
1068      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1069    }
1070
1071    return procId;
1072  }
1073
1074  /**
1075   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1076   * or restore in progress.
1077   * @param snapshot        Snapshot Descriptor
1078   * @param tableDescriptor Table Descriptor
1079   * @param nonceKey        unique identifier to prevent duplicated RPC
1080   * @param restoreAcl      true to restore acl of snapshot
1081   * @return procId the ID of the restore snapshot procedure
1082   */
1083  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1084    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1085    throws HBaseSnapshotException {
1086    final TableName tableName = tableDescriptor.getTableName();
1087
1088    // make sure we aren't running a snapshot on the same table
1089    if (isTableTakingAnySnapshot(tableName)) {
1090      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1091    }
1092
1093    // make sure we aren't running a restore on the same table
1094    if (isRestoringTable(tableName)) {
1095      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1096    }
1097
1098    try {
1099      TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName);
1100      long procId = master.getMasterProcedureExecutor().submitProcedure(
1101        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1102          oldDescriptor, tableDescriptor, snapshot, restoreAcl),
1103        nonceKey);
1104      this.restoreTableToProcIdMap.put(tableName, procId);
1105      return procId;
1106    } catch (Exception e) {
1107      String msg = "Couldn't restore the snapshot="
1108        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1109      LOG.error(msg, e);
1110      throw new RestoreSnapshotException(msg, e);
1111    }
1112  }
1113
1114  /**
1115   * Verify if the restore of the specified table is in progress.
1116   * @param tableName table under restore
1117   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1118   */
1119  private synchronized boolean isRestoringTable(final TableName tableName) {
1120    Long procId = this.restoreTableToProcIdMap.get(tableName);
1121    if (procId == null) {
1122      return false;
1123    }
1124    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1125    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1126      return true;
1127    } else {
1128      this.restoreTableToProcIdMap.remove(tableName);
1129      return false;
1130    }
1131  }
1132
1133  /**
1134   * Return the handler if it is currently live and has the same snapshot target name. The handler
1135   * is removed from the sentinels map if completed.
1136   * @param sentinels live handlers
1137   * @param snapshot  snapshot description
1138   * @return null if doesn't match, else a live handler.
1139   */
1140  private synchronized SnapshotSentinel removeSentinelIfFinished(
1141    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1142    if (!snapshot.hasTable()) {
1143      return null;
1144    }
1145
1146    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1147    SnapshotSentinel h = sentinels.get(snapshotTable);
1148    if (h == null) {
1149      return null;
1150    }
1151
1152    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1153      // specified snapshot is to the one currently running
1154      return null;
1155    }
1156
1157    // Remove from the "in-progress" list once completed
1158    if (h.isFinished()) {
1159      sentinels.remove(snapshotTable);
1160    }
1161
1162    return h;
1163  }
1164
1165  /**
1166   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1167   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1168   * the status of a completed task is requested. To avoid having sentinels staying around for long
1169   * time if something client side is failed, each operation tries to clean up the in-progress maps
1170   * sentinels finished from a long time.
1171   */
1172  private void cleanupSentinels() {
1173    cleanupSentinels(this.snapshotHandlers);
1174    cleanupCompletedRestoreInMap();
1175    cleanupCompletedSnapshotInMap();
1176  }
1177
1178  /**
1179   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1180   * removal timeout.
1181   * @param sentinels map of sentinels to clean
1182   */
1183  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1184    long currentTime = EnvironmentEdgeManager.currentTime();
1185    long sentinelsCleanupTimeoutMillis =
1186      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1187        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1188    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1189    while (it.hasNext()) {
1190      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1191      SnapshotSentinel sentinel = entry.getValue();
1192      if (
1193        sentinel.isFinished()
1194          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1195      ) {
1196        it.remove();
1197      }
1198    }
1199  }
1200
1201  /**
1202   * Remove the procedures that are marked as finished
1203   */
1204  private synchronized void cleanupCompletedRestoreInMap() {
1205    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1206    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1207    while (it.hasNext()) {
1208      Map.Entry<TableName, Long> entry = it.next();
1209      Long procId = entry.getValue();
1210      if (procExec.isRunning() && procExec.isFinished(procId)) {
1211        it.remove();
1212      }
1213    }
1214  }
1215
1216  /**
1217   * Remove the procedures that are marked as finished
1218   */
1219  private synchronized void cleanupCompletedSnapshotInMap() {
1220    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1221    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1222    while (it.hasNext()) {
1223      Map.Entry<SnapshotDescription, Long> entry = it.next();
1224      Long procId = entry.getValue();
1225      if (procExec.isRunning() && procExec.isFinished(procId)) {
1226        it.remove();
1227      }
1228    }
1229  }
1230
1231  //
1232  // Implementing Stoppable interface
1233  //
1234
1235  @Override
1236  public void stop(String why) {
1237    // short circuit
1238    if (this.stopped) return;
1239    // make sure we get stop
1240    this.stopped = true;
1241    // pass the stop onto take snapshot handlers
1242    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1243      snapshotHandler.cancel(why);
1244    }
1245    if (snapshotHandlerChoreCleanerTask != null) {
1246      snapshotHandlerChoreCleanerTask.cancel(true);
1247    }
1248    try {
1249      if (coordinator != null) {
1250        coordinator.close();
1251      }
1252    } catch (IOException e) {
1253      LOG.error("stop ProcedureCoordinator error", e);
1254    }
1255  }
1256
1257  @Override
1258  public boolean isStopped() {
1259    return this.stopped;
1260  }
1261
1262  /**
1263   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1264   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1265   * @throws UnsupportedOperationException if snapshot are not supported
1266   */
1267  public void checkSnapshotSupport() throws UnsupportedOperationException {
1268    if (!this.isSnapshotSupported) {
1269      throw new UnsupportedOperationException(
1270        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1271          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1272    }
1273  }
1274
1275  /**
1276   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1277   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1278   * up with snapshot data loss.
1279   * @param conf The {@link Configuration} object to use
1280   * @param mfs  The MasterFileSystem to use
1281   * @throws IOException                   in case of file-system operation failure
1282   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1283   *                                       system
1284   */
1285  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1286    throws IOException, UnsupportedOperationException {
1287    // Verify if snapshot is disabled by the user
1288    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1289    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1290    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1291
1292    // Extract cleaners from conf
1293    Set<String> hfileCleaners = new HashSet<>();
1294    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1295    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1296
1297    Set<String> logCleaners = new HashSet<>();
1298    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1299    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1300
1301    // check if an older version of snapshot directory was present
1302    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1303    FileSystem fs = mfs.getFileSystem();
1304    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1305    if (ss != null && !ss.isEmpty()) {
1306      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1307      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1308    }
1309
1310    // If the user has enabled the snapshot, we force the cleaners to be present
1311    // otherwise we still need to check if cleaners are enabled or not and verify
1312    // that there're no snapshot in the .snapshot folder.
1313    if (snapshotEnabled) {
1314      // Inject snapshot cleaners, if snapshot.enable is true
1315      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1316      hfileCleaners.add(HFileLinkCleaner.class.getName());
1317      // If sync acl to HDFS feature is enabled, then inject the cleaner
1318      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1319        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1320      }
1321
1322      // Set cleaners conf
1323      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1324        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1325      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1326        logCleaners.toArray(new String[logCleaners.size()]));
1327    } else {
1328      // There may be restore tables if snapshot is enabled and then disabled, so add
1329      // HFileLinkCleaner, see HBASE-26670 for more details.
1330      hfileCleaners.add(HFileLinkCleaner.class.getName());
1331      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1332        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1333      // Verify if SnapshotHFileCleaner are present
1334      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1335
1336      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1337      if (snapshotEnabled) {
1338        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1339          + HBASE_SNAPSHOT_ENABLED + "' property "
1340          + (userDisabled ? "is set to 'false'." : "is not set."));
1341      }
1342    }
1343
1344    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1345    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1346
1347    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1348    // otherwise we end up with snapshot data loss.
1349    if (!snapshotEnabled) {
1350      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1351      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1352      if (fs.exists(snapshotDir)) {
1353        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1354          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1355        if (snapshots != null) {
1356          LOG.error("Snapshots are present, but cleaners are not enabled.");
1357          checkSnapshotSupport();
1358        }
1359      }
1360    }
1361  }
1362
1363  @Override
1364  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1365    throws KeeperException, IOException, UnsupportedOperationException {
1366    this.master = master;
1367
1368    this.rootDir = master.getMasterFileSystem().getRootDir();
1369    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1370
1371    // get the configuration for the coordinator
1372    Configuration conf = master.getConfiguration();
1373    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1374    long timeoutMillis = Math.max(
1375      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1376        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1377      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1378        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1379    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1380
1381    // setup the default procedure coordinator
1382    String name = master.getServerName().toString();
1383    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1384    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1385      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1386
1387    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1388    this.executorService = master.getExecutorService();
1389    this.verifyWorkerAssigner =
1390      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1391        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1392    restoreUnfinishedSnapshotProcedure();
1393    restoreWorkers();
1394    resetTempDir();
1395    snapshotHandlerChoreCleanerTask =
1396      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1397  }
1398
1399  private void restoreUnfinishedSnapshotProcedure() {
1400    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1401      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1402      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1403        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1404        LOG.info("restore unfinished snapshot procedure {}", p);
1405      });
1406  }
1407
1408  @Override
1409  public String getProcedureSignature() {
1410    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1411  }
1412
1413  @Override
1414  public void execProcedure(ProcedureDescription desc) throws IOException {
1415    takeSnapshot(toSnapshotDescription(desc));
1416  }
1417
1418  @Override
1419  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1420    throws IOException {
1421    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1422    // In future, when we AC is removed for good, that check should be moved here.
1423  }
1424
1425  @Override
1426  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1427    return isSnapshotDone(toSnapshotDescription(desc));
1428  }
1429
1430  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1431    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1432    if (!desc.hasInstance()) {
1433      throw new IOException("Snapshot name is not defined: " + desc.toString());
1434    }
1435    String snapshotName = desc.getInstance();
1436    List<NameStringPair> props = desc.getConfigurationList();
1437    String table = null;
1438    for (NameStringPair prop : props) {
1439      if ("table".equalsIgnoreCase(prop.getName())) {
1440        table = prop.getValue();
1441      }
1442    }
1443    if (table == null) {
1444      throw new IOException("Snapshot table is not defined: " + desc.toString());
1445    }
1446    TableName tableName = TableName.valueOf(table);
1447    builder.setTable(tableName.getNameAsString());
1448    builder.setName(snapshotName);
1449    builder.setType(SnapshotDescription.Type.FLUSH);
1450    return builder.build();
1451  }
1452
1453  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1454    snapshotToProcIdMap.put(snapshot, procId);
1455    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1456      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1457  }
1458
1459  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1460    snapshotToProcIdMap.remove(snapshot, procId);
1461    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1462      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1463  }
1464
1465  public boolean snapshotProcedureEnabled() {
1466    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1467      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1468  }
1469
1470  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1471    throws ProcedureSuspendedException {
1472    ServerName worker = verifyWorkerAssigner.acquire(procedure);
1473    LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
1474    return worker;
1475  }
1476
1477  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
1478    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1479    verifyWorkerAssigner.release(worker);
1480  }
1481
1482  private void restoreWorkers() {
1483    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1484      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1485      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1486        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1487        LOG.debug("{} restores used worker {}", p, p.getServerName());
1488      });
1489  }
1490
1491  public Integer getAvailableWorker(ServerName serverName) {
1492    return verifyWorkerAssigner.getAvailableWorker(serverName);
1493  }
1494}