001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Optional;
030import java.util.Set;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.Executors;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ScheduledFuture;
035import java.util.concurrent.ThreadPoolExecutor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.locks.ReadWriteLock;
038import java.util.concurrent.locks.ReentrantReadWriteLock;
039import java.util.stream.Collectors;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.CommonPathCapabilities;
042import org.apache.hadoop.fs.FSDataInputStream;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.permission.AclEntry;
047import org.apache.hadoop.fs.permission.AclStatus;
048import org.apache.hadoop.hbase.HBaseInterfaceAudience;
049import org.apache.hadoop.hbase.HConstants;
050import org.apache.hadoop.hbase.ServerName;
051import org.apache.hadoop.hbase.Stoppable;
052import org.apache.hadoop.hbase.TableName;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.client.TableState;
056import org.apache.hadoop.hbase.errorhandling.ForeignException;
057import org.apache.hadoop.hbase.executor.ExecutorService;
058import org.apache.hadoop.hbase.ipc.RpcServer;
059import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
060import org.apache.hadoop.hbase.master.MasterFileSystem;
061import org.apache.hadoop.hbase.master.MasterServices;
062import org.apache.hadoop.hbase.master.MetricsMaster;
063import org.apache.hadoop.hbase.master.SnapshotSentinel;
064import org.apache.hadoop.hbase.master.WorkerAssigner;
065import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
066import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
067import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
069import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
070import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
071import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
072import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
073import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure;
074import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
075import org.apache.hadoop.hbase.procedure.Procedure;
076import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
077import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
078import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
079import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
081import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
082import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils;
083import org.apache.hadoop.hbase.security.AccessDeniedException;
084import org.apache.hadoop.hbase.security.User;
085import org.apache.hadoop.hbase.security.access.AccessChecker;
086import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
087import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
088import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
089import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
090import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
091import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
092import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
093import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
094import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
095import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
096import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
097import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
098import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
099import org.apache.hadoop.hbase.util.CommonFSUtils;
100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
101import org.apache.hadoop.hbase.util.NonceKey;
102import org.apache.hadoop.hbase.util.TableDescriptorChecker;
103import org.apache.yetus.audience.InterfaceAudience;
104import org.apache.yetus.audience.InterfaceStability;
105import org.apache.zookeeper.KeeperException;
106import org.slf4j.Logger;
107import org.slf4j.LoggerFactory;
108
109import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
110
111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
112import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
113import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
114import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
115import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
116
117/**
118 * This class manages the procedure of taking and restoring snapshots. There is only one
119 * SnapshotManager for the master.
120 * <p>
121 * The class provides methods for monitoring in-progress snapshot actions.
122 * <p>
123 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
124 * simplification in the current implementation.
125 */
126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
127@InterfaceStability.Unstable
128public class SnapshotManager extends MasterProcedureManager implements Stoppable {
129  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
130
131  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
132  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
133
134  /**
135   * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a
136   * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks
137   * if a snapshot or restore is completed. This operation is part of the HBaseAdmin
138   * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore
139   * state is not reclaimed after a default timeout, the entry is removed from the in-progress map.
140   * At this point, if the user asks for the snapshot/restore status, the result will be snapshot
141   * done if exists or failed if it doesn't exists.
142   */
143  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
144    "hbase.snapshot.sentinels.cleanup.timeoutMillis";
145  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
146
147  /** Enable or disable snapshot support */
148  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
149
150  /**
151   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion.
152   */
153  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
154
155  /** Name of the operation to use in the controller */
156  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
157
158  /** Conf key for # of threads used by the SnapshotManager thread pool */
159  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
160
161  /** number of current operations running on the master */
162  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
163
164  /** Conf key for preserving original max file size configs */
165  public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE =
166    "hbase.snapshot.max.filesize.preserve";
167
168  /** Enable or disable snapshot procedure */
169  public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled";
170
171  public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true;
172
173  private boolean stopped;
174  private MasterServices master; // Needed by TableEventHandlers
175  private ProcedureCoordinator coordinator;
176
177  // Is snapshot feature enabled?
178  private boolean isSnapshotSupported = false;
179
180  // Snapshot handlers map, with table name as key.
181  // The map is always accessed and modified under the object lock using synchronized.
182  // snapshotTable() will insert an Handler in the table.
183  // isSnapshotDone() will remove the handler requested if the operation is finished.
184  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
185  private final ScheduledExecutorService scheduleThreadPool =
186    Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
187      .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
188  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
189
190  // Restore map, with table name as key, procedure ID as value.
191  // The map is always accessed and modified under the object lock using synchronized.
192  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
193  //
194  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
195  // restart/failover. This is just a stopgap implementation until implementation of taking
196  // snapshot using Procedure-V2.
197  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
198
199  // SnapshotDescription -> SnapshotProcId
200  private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap =
201    new ConcurrentHashMap<>();
202
203  private WorkerAssigner verifyWorkerAssigner;
204
205  private Path rootDir;
206  private ExecutorService executorService;
207
208  /**
209   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
210   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
211   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
212   * start to work. (See HBASE-21387)
213   */
214  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
215
216  public SnapshotManager() {
217  }
218
219  /**
220   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
221   * @param master      services for the master where the manager is running
222   * @param coordinator procedure coordinator instance. exposed for testing.
223   * @param pool        HBase ExecutorServcie instance, exposed for testing.
224   */
225  @InterfaceAudience.Private
226  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
227    ExecutorService pool, int sentinelCleanInterval)
228    throws IOException, UnsupportedOperationException {
229    this.master = master;
230
231    this.rootDir = master.getMasterFileSystem().getRootDir();
232    Configuration conf = master.getConfiguration();
233    checkSnapshotSupport(conf, master.getMasterFileSystem());
234
235    this.coordinator = coordinator;
236    this.executorService = pool;
237    resetTempDir();
238    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
239      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
240  }
241
242  /**
243   * Gets the list of all completed snapshots.
244   * @return list of SnapshotDescriptions
245   * @throws IOException File system exception
246   */
247  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
248    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
249  }
250
251  /**
252   * Gets the list of all completed snapshots.
253   * @param snapshotDir snapshot directory
254   * @param withCpCall  Whether to call CP hooks
255   * @return list of SnapshotDescriptions
256   * @throws IOException File system exception
257   */
258  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
259    throws IOException {
260    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
261    // first create the snapshot root path and check to see if it exists
262    FileSystem fs = master.getMasterFileSystem().getFileSystem();
263    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
264
265    // if there are no snapshots, return an empty list
266    if (!fs.exists(snapshotDir)) {
267      return snapshotDescs;
268    }
269
270    // ignore all the snapshots in progress
271    FileStatus[] snapshots = fs.listStatus(snapshotDir,
272      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
273    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
274    withCpCall = withCpCall && cpHost != null;
275    // loop through all the completed snapshots
276    for (FileStatus snapshot : snapshots) {
277      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
278      // if the snapshot is bad
279      if (!fs.exists(info)) {
280        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
281        continue;
282      }
283      FSDataInputStream in = null;
284      try {
285        in = fs.open(info);
286        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
287        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO =
288          (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null;
289        if (withCpCall) {
290          try {
291            cpHost.preListSnapshot(descPOJO);
292          } catch (AccessDeniedException e) {
293            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
294              + "Either you should be owner of this snapshot or admin user.");
295            // Skip this and try for next snapshot
296            continue;
297          }
298        }
299        snapshotDescs.add(desc);
300
301        // call coproc post hook
302        if (withCpCall) {
303          cpHost.postListSnapshot(descPOJO);
304        }
305      } catch (IOException e) {
306        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
307      } finally {
308        if (in != null) {
309          in.close();
310        }
311      }
312    }
313    return snapshotDescs;
314  }
315
316  /**
317   * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from
318   * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working
319   * directory.
320   * @throws IOException if we can't reach the filesystem
321   */
322  private void resetTempDir() throws IOException {
323    Set<String> workingProcedureCoordinatedSnapshotNames =
324      snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet());
325
326    Path tmpdir =
327      SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration());
328    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
329    FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir);
330    if (workingSnapshotDirs == null) {
331      return;
332    }
333    for (FileStatus workingSnapshotDir : workingSnapshotDirs) {
334      String workingSnapshotName = workingSnapshotDir.getPath().getName();
335      if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) {
336        try {
337          if (tmpFs.delete(workingSnapshotDir.getPath(), true)) {
338            LOG.info("delete unfinished zk-coordinated snapshot working directory {}",
339              workingSnapshotDir.getPath());
340          } else {
341            LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
342              workingSnapshotDir.getPath());
343          }
344        } catch (IOException e) {
345          LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}",
346            workingSnapshotDir.getPath(), e);
347        }
348      } else {
349        LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName);
350      }
351    }
352  }
353
354  /**
355   * Delete the specified snapshot
356   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
357   * @throws IOException                   For filesystem IOExceptions
358   */
359  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
360    // check to see if it is completed
361    if (!isSnapshotCompleted(snapshot)) {
362      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
363    }
364
365    String snapshotName = snapshot.getName();
366    // first create the snapshot description and check to see if it exists
367    FileSystem fs = master.getMasterFileSystem().getFileSystem();
368    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
369    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
370    // just the "name" and it does not contains the "real" snapshot information
371    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
372
373    // call coproc pre hook
374    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
375    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
376    if (cpHost != null) {
377      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
378      cpHost.preDeleteSnapshot(snapshotPOJO);
379    }
380
381    LOG.debug("Deleting snapshot: " + snapshotName);
382    // delete the existing snapshot
383    if (!fs.delete(snapshotDir, true)) {
384      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
385    }
386
387    // call coproc post hook
388    if (cpHost != null) {
389      cpHost.postDeleteSnapshot(snapshotPOJO);
390    }
391
392  }
393
394  /**
395   * Check if the specified snapshot is done
396   * @return true if snapshot is ready to be restored, false if it is still being taken.
397   * @throws IOException              IOException if error from HDFS or RPC
398   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
399   */
400  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
401    // check the request to make sure it has a snapshot
402    if (expected == null) {
403      throw new UnknownSnapshotException(
404        "No snapshot name passed in request, can't figure out which snapshot you want to check.");
405    }
406
407    Long procId = snapshotToProcIdMap.get(expected);
408    if (procId != null) {
409      if (master.getMasterProcedureExecutor().isRunning()) {
410        return master.getMasterProcedureExecutor().isFinished(procId);
411      } else {
412        return false;
413      }
414    }
415
416    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
417
418    // check to see if the sentinel exists,
419    // and if the task is complete removes it from the in-progress snapshots map.
420    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
421
422    // stop tracking "abandoned" handlers
423    cleanupSentinels();
424
425    if (handler == null) {
426      // If there's no handler in the in-progress map, it means one of the following:
427      // - someone has already requested the snapshot state
428      // - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
429      // - the snapshot was never requested
430      // In those cases returns to the user the "done state" if the snapshots exists on disk,
431      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
432      if (!isSnapshotCompleted(expected)) {
433        throw new UnknownSnapshotException("Snapshot " + ssString
434          + " is not currently running or one of the known completed snapshots.");
435      }
436      // was done, return true;
437      return true;
438    }
439
440    // pass on any failure we find in the sentinel
441    try {
442      handler.rethrowExceptionIfFailed();
443    } catch (ForeignException e) {
444      // Give some procedure info on an exception.
445      String status;
446      Procedure p = coordinator.getProcedure(expected.getName());
447      if (p != null) {
448        status = p.getStatus();
449      } else {
450        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
451      }
452      throw new HBaseSnapshotException("Snapshot " + ssString + " had an error.  " + status, e,
453        ProtobufUtil.createSnapshotDesc(expected));
454    }
455
456    // check to see if we are done
457    if (handler.isFinished()) {
458      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
459      return true;
460    } else if (LOG.isDebugEnabled()) {
461      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
462    }
463    return false;
464  }
465
466  /**
467   * Check to see if there is a snapshot in progress with the same name or on the same table.
468   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
469   * don't allow snapshot with the same name.
470   * @param snapshot   description of the snapshot being checked.
471   * @param checkTable check if the table is already taking a snapshot.
472   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
473   *         table.
474   */
475  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) {
476    if (checkTable) {
477      TableName snapshotTable = TableName.valueOf(snapshot.getTable());
478      if (isTakingSnapshot(snapshotTable)) {
479        return true;
480      }
481    }
482    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator();
483    while (it.hasNext()) {
484      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
485      SnapshotSentinel sentinel = entry.getValue();
486      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
487        return true;
488      }
489    }
490    Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator();
491    while (spIt.hasNext()) {
492      Map.Entry<SnapshotDescription, Long> entry = spIt.next();
493      if (
494        snapshot.getName().equals(entry.getKey().getName())
495          && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
496      ) {
497        return true;
498      }
499    }
500    return false;
501  }
502
503  /**
504   * Check to see if the specified table has a snapshot in progress. Currently we have a limitation
505   * only allowing a single snapshot per table at a time.
506   * @param tableName name of the table being snapshotted.
507   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
508   */
509  public boolean isTakingSnapshot(final TableName tableName) {
510    return isTakingSnapshot(tableName, false);
511  }
512
513  public boolean isTableTakingAnySnapshot(final TableName tableName) {
514    return isTakingSnapshot(tableName, true);
515  }
516
517  /**
518   * Check to see if the specified table has a snapshot in progress. Since we introduce the
519   * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we
520   * can just consider tables in snapshotHandlers only, but for
521   * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and
522   * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to
523   * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check
524   * if table in snapshot.
525   * @param tableName      name of the table being snapshotted.
526   * @param checkProcedure true if we should check tables in snapshotToProcIdMap
527   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
528   */
529  private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) {
530    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
531    if (handler != null && !handler.isFinished()) {
532      return true;
533    }
534    if (checkProcedure) {
535      for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) {
536        if (
537          TableName.valueOf(entry.getKey().getTable()).equals(tableName)
538            && !master.getMasterProcedureExecutor().isFinished(entry.getValue())
539        ) {
540          return true;
541        }
542      }
543    }
544    return false;
545  }
546
547  /**
548   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
549   * aren't already running a snapshot or restore on the requested table.
550   * @param snapshot description of the snapshot we want to start
551   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
552   */
553  public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot)
554    throws HBaseSnapshotException {
555    Path workingDir =
556      SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration());
557
558    try {
559      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
560      // delete the working directory, since we aren't running the snapshot. Likely leftovers
561      // from a failed attempt.
562      workingDirFS.delete(workingDir, true);
563
564      // recreate the working directory for the snapshot
565      if (!workingDirFS.mkdirs(workingDir)) {
566        throw new SnapshotCreationException(
567          "Couldn't create working directory (" + workingDir + ") for snapshot",
568          ProtobufUtil.createSnapshotDesc(snapshot));
569      }
570      updateWorkingDirAclsIfRequired(workingDir, workingDirFS);
571    } catch (HBaseSnapshotException e) {
572      throw e;
573    } catch (IOException e) {
574      throw new SnapshotCreationException(
575        "Exception while checking to see if snapshot could be started.", e,
576        ProtobufUtil.createSnapshotDesc(snapshot));
577    }
578  }
579
580  /**
581   * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs,
582   * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that
583   * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of
584   * the working dir to new snapshot dir, the ACLs are retained.
585   * @param workingDir   working dir to build the snapshot.
586   * @param workingDirFS working dir file system.
587   * @throws IOException If ACL read/modify operation fails.
588   */
589  private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS)
590    throws IOException {
591    if (
592      !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS)
593        || workingDir.getParent() == null || workingDir.getParent().getParent() == null
594    ) {
595      return;
596    }
597    AclStatus snapshotWorkingParentDirStatus;
598    try {
599      snapshotWorkingParentDirStatus =
600        workingDirFS.getAclStatus(workingDir.getParent().getParent());
601    } catch (IOException e) {
602      LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}",
603        workingDir.getParent().getParent(), workingDir, e);
604      return;
605    }
606    List<AclEntry> snapshotWorkingParentDirAclStatusEntries =
607      snapshotWorkingParentDirStatus.getEntries();
608    if (
609      snapshotWorkingParentDirAclStatusEntries != null
610        && snapshotWorkingParentDirAclStatusEntries.size() > 0
611    ) {
612      workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries);
613    }
614  }
615
616  /**
617   * Take a snapshot of a disabled table.
618   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
619   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
620   *                     directory could not be determined
621   */
622  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException {
623    // setup the snapshot
624    prepareWorkingDirectory(snapshot);
625
626    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
627    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
628
629    // Take the snapshot of the disabled table
630    DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this);
631    snapshotTable(snapshot, handler);
632  }
633
634  /**
635   * Take a snapshot of an enabled table.
636   * @param snapshot description of the snapshot to take.
637   * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary
638   *                     directory could not be determined
639   */
640  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException {
641    // setup the snapshot
642    prepareWorkingDirectory(snapshot);
643
644    // Take the snapshot of the enabled table
645    EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this);
646    snapshotTable(snapshot, handler);
647  }
648
649  /**
650   * Take a snapshot using the specified handler. On failure the snapshot temporary working
651   * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the
652   * rejecting the snapshot request if the table is busy with another snapshot/restore operation.
653   * @param snapshot the snapshot description
654   * @param handler  the snapshot handler
655   */
656  private synchronized void snapshotTable(SnapshotDescription snapshot,
657    final TakeSnapshotHandler handler) throws IOException {
658    try {
659      handler.prepare();
660      this.executorService.submit(handler);
661      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
662    } catch (Exception e) {
663      // cleanup the working directory by trying to delete it from the fs.
664      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
665        master.getConfiguration());
666      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
667      try {
668        if (!workingDirFs.delete(workingDir, true)) {
669          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
670            + ClientSnapshotDescriptionUtils.toString(snapshot));
671        }
672      } catch (IOException e1) {
673        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:"
674          + ClientSnapshotDescriptionUtils.toString(snapshot));
675      }
676      // fail the snapshot
677      throw new SnapshotCreationException("Could not build snapshot handler", e,
678        ProtobufUtil.createSnapshotDesc(snapshot));
679    }
680  }
681
682  public ReadWriteLock getTakingSnapshotLock() {
683    return this.takingSnapshotLock;
684  }
685
686  /**
687   * The snapshot operation processing as following: <br>
688   * 1. Create a Snapshot Handler, and do some initialization; <br>
689   * 2. Put the handler into snapshotHandlers <br>
690   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
691   * and snapshotHandlers;
692   * @return true to indicate that there're some running snapshots.
693   */
694  public synchronized boolean isTakingAnySnapshot() {
695    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0
696      || this.snapshotToProcIdMap.size() > 0;
697  }
698
699  /**
700   * Take a snapshot based on the enabled/disabled state of the table.
701   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
702   * @throws IOException            when some sort of generic IO exception occurs.
703   */
704  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
705    this.takingSnapshotLock.readLock().lock();
706    try {
707      takeSnapshotInternal(snapshot);
708    } finally {
709      this.takingSnapshotLock.readLock().unlock();
710    }
711  }
712
713  public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce)
714    throws IOException {
715    this.takingSnapshotLock.readLock().lock();
716    try {
717      return submitSnapshotProcedure(snapshot, nonceGroup, nonce);
718    } finally {
719      this.takingSnapshotLock.readLock().unlock();
720    }
721  }
722
723  private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup,
724    long nonce) throws IOException {
725    return MasterProcedureUtil
726      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) {
727        @Override
728        protected void run() throws IOException {
729          TableDescriptor tableDescriptor =
730            master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
731          MasterCoprocessorHost cpHost = getMaster().getMasterCoprocessorHost();
732          User user = RpcServer.getRequestUser().orElse(null);
733          org.apache.hadoop.hbase.client.SnapshotDescription snapshotDesc =
734            ProtobufUtil.createSnapshotDesc(snapshot);
735
736          if (cpHost != null) {
737            cpHost.preSnapshot(snapshotDesc, tableDescriptor, user);
738          }
739
740          sanityCheckBeforeSnapshot(snapshot, false);
741
742          long procId = submitProcedure(new SnapshotProcedure(
743            getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot));
744
745          getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId);
746
747          if (cpHost != null) {
748            cpHost.postSnapshot(snapshotDesc, tableDescriptor, user);
749          }
750        }
751
752        @Override
753        protected String getDescription() {
754          return "SnapshotProcedure";
755        }
756      });
757  }
758
759  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
760    TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true);
761
762    // call pre coproc hook
763    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
764    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
765    if (cpHost != null) {
766      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
767      cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
768    }
769
770    // if the table is enabled, then have the RS run actually the snapshot work
771    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
772    if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) {
773      if (LOG.isDebugEnabled()) {
774        LOG.debug("Table enabled, starting distributed snapshots for {}",
775          ClientSnapshotDescriptionUtils.toString(snapshot));
776      }
777      snapshotEnabledTable(snapshot);
778      if (LOG.isDebugEnabled()) {
779        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
780      }
781    }
782    // For disabled table, snapshot is created by the master
783    else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) {
784      if (LOG.isDebugEnabled()) {
785        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
786          ClientSnapshotDescriptionUtils.toString(snapshot));
787      }
788      snapshotDisabledTable(snapshot);
789      if (LOG.isDebugEnabled()) {
790        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
791      }
792    } else {
793      LOG.error("Can't snapshot table '" + snapshot.getTable()
794        + "', isn't open or closed, we don't know what to do!");
795      TablePartiallyOpenException tpoe =
796        new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open.");
797      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
798        ProtobufUtil.createSnapshotDesc(snapshot));
799    }
800
801    // call post coproc hook
802    if (cpHost != null) {
803      cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null));
804    }
805  }
806
807  /**
808   * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated
809   * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the
810   * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name.
811   * @param snapshot   description of the snapshot being checked.
812   * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot,
813   *                   we need to check if another zk-coordinated snapshot is in progress, for the
814   *                   snapshot procedure, this is unnecessary.
815   * @return the table descriptor of the table
816   */
817  private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot,
818    boolean checkTable) throws IOException {
819    // check to see if we already completed the snapshot
820    if (isSnapshotCompleted(snapshot)) {
821      throw new SnapshotExistsException(
822        "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
823        ProtobufUtil.createSnapshotDesc(snapshot));
824    }
825    LOG.debug("No existing snapshot, attempting snapshot...");
826
827    // stop tracking "abandoned" handlers
828    cleanupSentinels();
829
830    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
831    // make sure we aren't already running a snapshot
832    if (isTakingSnapshot(snapshot, checkTable)) {
833      throw new SnapshotCreationException(
834        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
835          + " because we are already running another snapshot"
836          + " on the same table or with the same name");
837    }
838
839    // make sure we aren't running a restore on the same table
840    if (isRestoringTable(snapshotTable)) {
841      throw new SnapshotCreationException(
842        "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot)
843          + " because we are already have a restore in progress on the same snapshot.");
844    }
845
846    // check to see if the table exists
847    TableDescriptor desc = null;
848    try {
849      desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable()));
850    } catch (FileNotFoundException e) {
851      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
852      LOG.error(msg);
853      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
854    } catch (IOException e) {
855      throw new SnapshotCreationException(
856        "Error while geting table description for table " + snapshot.getTable(), e,
857        ProtobufUtil.createSnapshotDesc(snapshot));
858    }
859    if (desc == null) {
860      throw new SnapshotCreationException(
861        "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
862        ProtobufUtil.createSnapshotDesc(snapshot));
863    }
864    return desc;
865  }
866
867  /**
868   * Set the handler for the current snapshot
869   * <p>
870   * Exposed for TESTING
871   * @param handler handler the master should use TODO get rid of this if possible, repackaging,
872   *                modify tests.
873   */
874  public synchronized void setSnapshotHandlerForTesting(final TableName tableName,
875    final SnapshotSentinel handler) {
876    if (handler != null) {
877      this.snapshotHandlers.put(tableName, handler);
878    } else {
879      this.snapshotHandlers.remove(tableName);
880    }
881  }
882
883  /** Returns distributed commit coordinator for all running snapshots */
884  ProcedureCoordinator getCoordinator() {
885    return coordinator;
886  }
887
888  /**
889   * Check to see if the snapshot is one of the currently completed snapshots Returns true if the
890   * snapshot exists in the "completed snapshots folder".
891   * @param snapshot expected snapshot to check
892   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
893   *         not stored
894   * @throws IOException              if the filesystem throws an unexpected exception,
895   * @throws IllegalArgumentException if snapshot name is invalid.
896   */
897  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
898    try {
899      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
900      FileSystem fs = master.getMasterFileSystem().getFileSystem();
901      // check to see if the snapshot already exists
902      return fs.exists(snapshotDir);
903    } catch (IllegalArgumentException iae) {
904      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
905    }
906  }
907
908  /**
909   * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or
910   * restore in progress.
911   * @param reqSnapshot       Snapshot Descriptor from request
912   * @param tableName         table to clone
913   * @param snapshot          Snapshot Descriptor
914   * @param snapshotTableDesc Table Descriptor
915   * @param nonceKey          unique identifier to prevent duplicated RPC
916   * @return procId the ID of the clone snapshot procedure
917   */
918  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
919    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
920    final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException {
921    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
922    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
923    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
924    if (cpHost != null) {
925      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
926      cpHost.preCloneSnapshot(snapshotPOJO, htd);
927    }
928    long procId;
929    try {
930      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT);
931    } catch (IOException e) {
932      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table "
933        + tableName.getNameAsString(), e);
934      throw e;
935    }
936    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
937
938    if (cpHost != null) {
939      cpHost.postCloneSnapshot(snapshotPOJO, htd);
940    }
941    return procId;
942  }
943
944  /**
945   * Clone the specified snapshot into a new table. The operation will fail if the destination table
946   * has a snapshot or restore in progress.
947   * @param snapshot        Snapshot Descriptor
948   * @param tableDescriptor Table Descriptor of the table to create
949   * @param nonceKey        unique identifier to prevent duplicated RPC
950   * @return procId the ID of the clone snapshot procedure
951   */
952  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
953    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl,
954    final String customSFT) throws HBaseSnapshotException {
955    TableName tableName = tableDescriptor.getTableName();
956
957    // make sure we aren't running a snapshot on the same table
958    if (isTableTakingAnySnapshot(tableName)) {
959      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
960    }
961
962    // make sure we aren't running a restore on the same table
963    if (isRestoringTable(tableName)) {
964      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
965    }
966
967    try {
968      long procId = master.getMasterProcedureExecutor().submitProcedure(
969        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
970          tableDescriptor, snapshot, restoreAcl, customSFT),
971        nonceKey);
972      this.restoreTableToProcIdMap.put(tableName, procId);
973      return procId;
974    } catch (Exception e) {
975      String msg = "Couldn't clone the snapshot="
976        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
977      LOG.error(msg, e);
978      throw new RestoreSnapshotException(msg, e);
979    }
980  }
981
982  /**
983   * Restore or Clone the specified snapshot
984   * @param nonceKey unique identifier to prevent duplicated RPC
985   */
986  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
987    final boolean restoreAcl, String customSFT) throws IOException {
988    FileSystem fs = master.getMasterFileSystem().getFileSystem();
989    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
990
991    // check if the snapshot exists
992    if (!fs.exists(snapshotDir)) {
993      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
994      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot));
995    }
996
997    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
998    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
999    // information.
1000    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
1001    SnapshotManifest manifest =
1002      SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot);
1003    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
1004    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
1005
1006    // sanity check the new table descriptor
1007    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
1008
1009    // stop tracking "abandoned" handlers
1010    cleanupSentinels();
1011
1012    // Verify snapshot validity
1013    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
1014
1015    // Execute the restore/clone operation
1016    long procId;
1017    if (master.getTableDescriptors().exists(tableName)) {
1018      procId =
1019        restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1020    } else {
1021      procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
1022        restoreAcl, customSFT);
1023    }
1024    return procId;
1025  }
1026
1027  /**
1028   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1029   * or restore in progress.
1030   * @param reqSnapshot       Snapshot Descriptor from request
1031   * @param tableName         table to restore
1032   * @param snapshot          Snapshot Descriptor
1033   * @param snapshotTableDesc Table Descriptor
1034   * @param nonceKey          unique identifier to prevent duplicated RPC
1035   * @param restoreAcl        true to restore acl of snapshot
1036   * @return procId the ID of the restore snapshot procedure
1037   */
1038  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
1039    final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
1040    final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
1041    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
1042
1043    // have to check first if restoring the snapshot would break current SFT setup
1044    StoreFileTrackerValidationUtils.validatePreRestoreSnapshot(
1045      master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration());
1046
1047    if (
1048      master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()),
1049        TableState.State.ENABLED)
1050    ) {
1051      throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable())
1052        + "' must be disabled in order to " + "perform a restore operation.");
1053    }
1054
1055    // call Coprocessor pre hook
1056    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
1057    if (cpHost != null) {
1058      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
1059      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1060    }
1061
1062    long procId;
1063    try {
1064      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
1065    } catch (IOException e) {
1066      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
1067        + " as table " + tableName.getNameAsString(), e);
1068      throw e;
1069    }
1070    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
1071
1072    if (cpHost != null) {
1073      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
1074    }
1075
1076    return procId;
1077  }
1078
1079  /**
1080   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
1081   * or restore in progress.
1082   * @param snapshot        Snapshot Descriptor
1083   * @param tableDescriptor Table Descriptor
1084   * @param nonceKey        unique identifier to prevent duplicated RPC
1085   * @param restoreAcl      true to restore acl of snapshot
1086   * @return procId the ID of the restore snapshot procedure
1087   */
1088  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
1089    final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
1090    throws HBaseSnapshotException {
1091    final TableName tableName = tableDescriptor.getTableName();
1092
1093    // make sure we aren't running a snapshot on the same table
1094    if (isTableTakingAnySnapshot(tableName)) {
1095      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
1096    }
1097
1098    // make sure we aren't running a restore on the same table
1099    if (isRestoringTable(tableName)) {
1100      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
1101    }
1102
1103    try {
1104      TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName);
1105      long procId = master.getMasterProcedureExecutor().submitProcedure(
1106        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
1107          oldDescriptor, tableDescriptor, snapshot, restoreAcl),
1108        nonceKey);
1109      this.restoreTableToProcIdMap.put(tableName, procId);
1110      return procId;
1111    } catch (Exception e) {
1112      String msg = "Couldn't restore the snapshot="
1113        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
1114      LOG.error(msg, e);
1115      throw new RestoreSnapshotException(msg, e);
1116    }
1117  }
1118
1119  /**
1120   * Verify if the restore of the specified table is in progress.
1121   * @param tableName table under restore
1122   * @return <tt>true</tt> if there is a restore in progress of the specified table.
1123   */
1124  private synchronized boolean isRestoringTable(final TableName tableName) {
1125    Long procId = this.restoreTableToProcIdMap.get(tableName);
1126    if (procId == null) {
1127      return false;
1128    }
1129    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1130    if (procExec.isRunning() && !procExec.isFinished(procId)) {
1131      return true;
1132    } else {
1133      this.restoreTableToProcIdMap.remove(tableName);
1134      return false;
1135    }
1136  }
1137
1138  /**
1139   * Return the handler if it is currently live and has the same snapshot target name. The handler
1140   * is removed from the sentinels map if completed.
1141   * @param sentinels live handlers
1142   * @param snapshot  snapshot description
1143   * @return null if doesn't match, else a live handler.
1144   */
1145  private synchronized SnapshotSentinel removeSentinelIfFinished(
1146    final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
1147    if (!snapshot.hasTable()) {
1148      return null;
1149    }
1150
1151    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
1152    SnapshotSentinel h = sentinels.get(snapshotTable);
1153    if (h == null) {
1154      return null;
1155    }
1156
1157    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
1158      // specified snapshot is to the one currently running
1159      return null;
1160    }
1161
1162    // Remove from the "in-progress" list once completed
1163    if (h.isFinished()) {
1164      sentinels.remove(snapshotTable);
1165    }
1166
1167    return h;
1168  }
1169
1170  /**
1171   * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API
1172   * the operation status is checked until completed, and the in-progress maps are cleaned up when
1173   * the status of a completed task is requested. To avoid having sentinels staying around for long
1174   * time if something client side is failed, each operation tries to clean up the in-progress maps
1175   * sentinels finished from a long time.
1176   */
1177  private void cleanupSentinels() {
1178    cleanupSentinels(this.snapshotHandlers);
1179    cleanupCompletedRestoreInMap();
1180    cleanupCompletedSnapshotInMap();
1181  }
1182
1183  /**
1184   * Remove the sentinels that are marked as finished and the completion time has exceeded the
1185   * removal timeout.
1186   * @param sentinels map of sentinels to clean
1187   */
1188  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1189    long currentTime = EnvironmentEdgeManager.currentTime();
1190    long sentinelsCleanupTimeoutMillis =
1191      master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1192        SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1193    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1194    while (it.hasNext()) {
1195      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1196      SnapshotSentinel sentinel = entry.getValue();
1197      if (
1198        sentinel.isFinished()
1199          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis
1200      ) {
1201        it.remove();
1202      }
1203    }
1204  }
1205
1206  /**
1207   * Remove the procedures that are marked as finished
1208   */
1209  private synchronized void cleanupCompletedRestoreInMap() {
1210    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1211    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1212    while (it.hasNext()) {
1213      Map.Entry<TableName, Long> entry = it.next();
1214      Long procId = entry.getValue();
1215      if (procExec.isRunning() && procExec.isFinished(procId)) {
1216        it.remove();
1217      }
1218    }
1219  }
1220
1221  /**
1222   * Remove the procedures that are marked as finished
1223   */
1224  private synchronized void cleanupCompletedSnapshotInMap() {
1225    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1226    Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator();
1227    while (it.hasNext()) {
1228      Map.Entry<SnapshotDescription, Long> entry = it.next();
1229      Long procId = entry.getValue();
1230      if (procExec.isRunning() && procExec.isFinished(procId)) {
1231        it.remove();
1232      }
1233    }
1234  }
1235
1236  //
1237  // Implementing Stoppable interface
1238  //
1239
1240  @Override
1241  public void stop(String why) {
1242    // short circuit
1243    if (this.stopped) return;
1244    // make sure we get stop
1245    this.stopped = true;
1246    // pass the stop onto take snapshot handlers
1247    for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) {
1248      snapshotHandler.cancel(why);
1249    }
1250    if (snapshotHandlerChoreCleanerTask != null) {
1251      snapshotHandlerChoreCleanerTask.cancel(true);
1252    }
1253    try {
1254      if (coordinator != null) {
1255        coordinator.close();
1256      }
1257    } catch (IOException e) {
1258      LOG.error("stop ProcedureCoordinator error", e);
1259    }
1260  }
1261
1262  @Override
1263  public boolean isStopped() {
1264    return this.stopped;
1265  }
1266
1267  /**
1268   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1269   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1270   * @throws UnsupportedOperationException if snapshot are not supported
1271   */
1272  public void checkSnapshotSupport() throws UnsupportedOperationException {
1273    if (!this.isSnapshotSupported) {
1274      throw new UnsupportedOperationException(
1275        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '"
1276          + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1277    }
1278  }
1279
1280  /**
1281   * Called at startup, to verify if snapshot operation is supported, and to avoid starting the
1282   * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end
1283   * up with snapshot data loss.
1284   * @param conf The {@link Configuration} object to use
1285   * @param mfs  The MasterFileSystem to use
1286   * @throws IOException                   in case of file-system operation failure
1287   * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the
1288   *                                       system
1289   */
1290  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1291    throws IOException, UnsupportedOperationException {
1292    // Verify if snapshot is disabled by the user
1293    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1294    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1295    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1296
1297    // Extract cleaners from conf
1298    Set<String> hfileCleaners = new HashSet<>();
1299    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1300    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1301
1302    Set<String> logCleaners = new HashSet<>();
1303    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1304    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1305
1306    // check if an older version of snapshot directory was present
1307    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1308    FileSystem fs = mfs.getFileSystem();
1309    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1310    if (ss != null && !ss.isEmpty()) {
1311      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1312      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1313    }
1314
1315    // If the user has enabled the snapshot, we force the cleaners to be present
1316    // otherwise we still need to check if cleaners are enabled or not and verify
1317    // that there're no snapshot in the .snapshot folder.
1318    if (snapshotEnabled) {
1319      // Inject snapshot cleaners, if snapshot.enable is true
1320      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1321      hfileCleaners.add(HFileLinkCleaner.class.getName());
1322      // If sync acl to HDFS feature is enabled, then inject the cleaner
1323      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1324        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1325      }
1326
1327      // Set cleaners conf
1328      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1329        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1330      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1331        logCleaners.toArray(new String[logCleaners.size()]));
1332    } else {
1333      // There may be restore tables if snapshot is enabled and then disabled, so add
1334      // HFileLinkCleaner, see HBASE-26670 for more details.
1335      hfileCleaners.add(HFileLinkCleaner.class.getName());
1336      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1337        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1338      // Verify if SnapshotHFileCleaner are present
1339      snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName());
1340
1341      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1342      if (snapshotEnabled) {
1343        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '"
1344          + HBASE_SNAPSHOT_ENABLED + "' property "
1345          + (userDisabled ? "is set to 'false'." : "is not set."));
1346      }
1347    }
1348
1349    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1350    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1351
1352    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1353    // otherwise we end up with snapshot data loss.
1354    if (!snapshotEnabled) {
1355      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1356      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1357      if (fs.exists(snapshotDir)) {
1358        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1359          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1360        if (snapshots != null) {
1361          LOG.error("Snapshots are present, but cleaners are not enabled.");
1362          checkSnapshotSupport();
1363        }
1364      }
1365    }
1366  }
1367
1368  @Override
1369  public void initialize(MasterServices master, MetricsMaster metricsMaster)
1370    throws KeeperException, IOException, UnsupportedOperationException {
1371    this.master = master;
1372
1373    this.rootDir = master.getMasterFileSystem().getRootDir();
1374    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1375
1376    // get the configuration for the coordinator
1377    Configuration conf = master.getConfiguration();
1378    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1379    long timeoutMillis = Math.max(
1380      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1381        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1382      conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1383        SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1384    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1385
1386    // setup the default procedure coordinator
1387    String name = master.getServerName().toString();
1388    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1389    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(),
1390      SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1391
1392    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1393    this.executorService = master.getExecutorService();
1394    this.verifyWorkerAssigner =
1395      new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3),
1396        new ProcedureEvent<>("snapshot-verify-worker-assigning"));
1397    restoreUnfinishedSnapshotProcedure();
1398    restoreWorkers();
1399    resetTempDir();
1400    snapshotHandlerChoreCleanerTask =
1401      scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1402  }
1403
1404  private void restoreUnfinishedSnapshotProcedure() {
1405    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1406      .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished())
1407      .map(p -> (SnapshotProcedure) p).forEach(p -> {
1408        registerSnapshotProcedure(p.getSnapshot(), p.getProcId());
1409        LOG.info("restore unfinished snapshot procedure {}", p);
1410      });
1411  }
1412
1413  @Override
1414  public String getProcedureSignature() {
1415    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1416  }
1417
1418  @Override
1419  public void execProcedure(ProcedureDescription desc) throws IOException {
1420    takeSnapshot(toSnapshotDescription(desc));
1421  }
1422
1423  @Override
1424  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1425    throws IOException {
1426    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1427    // In future, when we AC is removed for good, that check should be moved here.
1428  }
1429
1430  @Override
1431  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1432    return isSnapshotDone(toSnapshotDescription(desc));
1433  }
1434
1435  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException {
1436    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1437    if (!desc.hasInstance()) {
1438      throw new IOException("Snapshot name is not defined: " + desc.toString());
1439    }
1440    String snapshotName = desc.getInstance();
1441    List<NameStringPair> props = desc.getConfigurationList();
1442    String table = null;
1443    for (NameStringPair prop : props) {
1444      if ("table".equalsIgnoreCase(prop.getName())) {
1445        table = prop.getValue();
1446      }
1447    }
1448    if (table == null) {
1449      throw new IOException("Snapshot table is not defined: " + desc.toString());
1450    }
1451    TableName tableName = TableName.valueOf(table);
1452    builder.setTable(tableName.getNameAsString());
1453    builder.setName(snapshotName);
1454    builder.setType(SnapshotDescription.Type.FLUSH);
1455    return builder.build();
1456  }
1457
1458  public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1459    snapshotToProcIdMap.put(snapshot, procId);
1460    LOG.debug("register snapshot={}, snapshot procedure id = {}",
1461      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1462  }
1463
1464  public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) {
1465    snapshotToProcIdMap.remove(snapshot, procId);
1466    LOG.debug("unregister snapshot={}, snapshot procedure id = {}",
1467      ClientSnapshotDescriptionUtils.toString(snapshot), procId);
1468  }
1469
1470  public boolean snapshotProcedureEnabled() {
1471    return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED,
1472      SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
1473  }
1474
1475  public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
1476    throws ProcedureSuspendedException {
1477    Optional<ServerName> worker = verifyWorkerAssigner.acquire();
1478    if (worker.isPresent()) {
1479      LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
1480      return worker.get();
1481    }
1482    verifyWorkerAssigner.suspend(procedure);
1483    throw new ProcedureSuspendedException();
1484  }
1485
1486  public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
1487    MasterProcedureScheduler scheduler) {
1488    LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
1489    verifyWorkerAssigner.release(worker);
1490    verifyWorkerAssigner.wake(scheduler);
1491  }
1492
1493  private void restoreWorkers() {
1494    master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream()
1495      .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p)
1496      .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> {
1497        verifyWorkerAssigner.addUsedWorker(p.getServerName());
1498        LOG.debug("{} restores used worker {}", p, p.getServerName());
1499      });
1500  }
1501
1502  public Integer getAvailableWorker(ServerName serverName) {
1503    return verifyWorkerAssigner.getAvailableWorker(serverName);
1504  }
1505}