001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.ScheduledFuture;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FSDataInputStream;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.HBaseInterfaceAudience;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.client.TableState;
050import org.apache.hadoop.hbase.errorhandling.ForeignException;
051import org.apache.hadoop.hbase.executor.ExecutorService;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
054import org.apache.hadoop.hbase.master.MasterFileSystem;
055import org.apache.hadoop.hbase.master.MasterServices;
056import org.apache.hadoop.hbase.master.MetricsMaster;
057import org.apache.hadoop.hbase.master.SnapshotSentinel;
058import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
059import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
060import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
061import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
062import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
063import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
064import org.apache.hadoop.hbase.procedure.Procedure;
065import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
066import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
067import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
068import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
069import org.apache.hadoop.hbase.security.AccessDeniedException;
070import org.apache.hadoop.hbase.security.User;
071import org.apache.hadoop.hbase.security.access.AccessChecker;
072import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
073import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
074import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
075import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
076import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
077import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
078import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
079import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
080import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
081import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
082import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
083import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
084import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
085import org.apache.hadoop.hbase.util.CommonFSUtils;
086import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
087import org.apache.hadoop.hbase.util.NonceKey;
088import org.apache.hadoop.hbase.util.TableDescriptorChecker;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.apache.yetus.audience.InterfaceStability;
091import org.apache.zookeeper.KeeperException;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094
095import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
096import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
097
098import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
099import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
100import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
101import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
102import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
103
104/**
105 * This class manages the procedure of taking and restoring snapshots. There is only one
106 * SnapshotManager for the master.
107 * <p>
108 * The class provides methods for monitoring in-progress snapshot actions.
109 * <p>
110 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
111 * simplification in the current implementation.
112 */
113@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
114@InterfaceStability.Unstable
115public class SnapshotManager extends MasterProcedureManager implements Stoppable {
116  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
117
118  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
119  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
120
121  /**
122   * Wait time before removing a finished sentinel from the in-progress map
123   *
124   * NOTE: This is used as a safety auto cleanup.
125   * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
126   * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
127   * In case something fails on the client side and the snapshot/restore state is not reclaimed
128   * after a default timeout, the entry is removed from the in-progress map.
129   * At this point, if the user asks for the snapshot/restore status, the result will be
130   * snapshot done if exists or failed if it doesn't exists.
131   */
132  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
133      "hbase.snapshot.sentinels.cleanup.timeoutMillis";
134  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
135
136  /** Enable or disable snapshot support */
137  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
138
139  /**
140   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
141   * completion.
142   */
143  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
144
145  /** Name of the operation to use in the controller */
146  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
147
148  /** Conf key for # of threads used by the SnapshotManager thread pool */
149  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
150
151  /** number of current operations running on the master */
152  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
153
154  private boolean stopped;
155  private MasterServices master;  // Needed by TableEventHandlers
156  private ProcedureCoordinator coordinator;
157
158  // Is snapshot feature enabled?
159  private boolean isSnapshotSupported = false;
160
161  // Snapshot handlers map, with table name as key.
162  // The map is always accessed and modified under the object lock using synchronized.
163  // snapshotTable() will insert an Handler in the table.
164  // isSnapshotDone() will remove the handler requested if the operation is finished.
165  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
166  private final ScheduledExecutorService scheduleThreadPool =
167      Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
168          .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
169  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
170
171  // Restore map, with table name as key, procedure ID as value.
172  // The map is always accessed and modified under the object lock using synchronized.
173  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
174  //
175  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
176  // restart/failover. This is just a stopgap implementation until implementation of taking
177  // snapshot using Procedure-V2.
178  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
179
180  private Path rootDir;
181  private ExecutorService executorService;
182
183  /**
184   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
185   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
186   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
187   * start to work. (See HBASE-21387)
188   */
189  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
190
191  public SnapshotManager() {}
192
193  /**
194   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
195   * @param master services for the master where the manager is running
196   * @param coordinator procedure coordinator instance.  exposed for testing.
197   * @param pool HBase ExecutorServcie instance, exposed for testing.
198   */
199  @VisibleForTesting
200  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
201      ExecutorService pool, int sentinelCleanInterval)
202      throws IOException, UnsupportedOperationException {
203    this.master = master;
204
205    this.rootDir = master.getMasterFileSystem().getRootDir();
206    Configuration conf = master.getConfiguration();
207    checkSnapshotSupport(conf, master.getMasterFileSystem());
208
209    this.coordinator = coordinator;
210    this.executorService = pool;
211    resetTempDir();
212    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
213      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
214  }
215
216  /**
217   * Gets the list of all completed snapshots.
218   * @return list of SnapshotDescriptions
219   * @throws IOException File system exception
220   */
221  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
222    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
223  }
224
225  /**
226   * Gets the list of all completed snapshots.
227   * @param snapshotDir snapshot directory
228   * @param withCpCall Whether to call CP hooks
229   * @return list of SnapshotDescriptions
230   * @throws IOException File system exception
231   */
232  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
233      throws IOException {
234    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
235    // first create the snapshot root path and check to see if it exists
236    FileSystem fs = master.getMasterFileSystem().getFileSystem();
237    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
238
239    // if there are no snapshots, return an empty list
240    if (!fs.exists(snapshotDir)) {
241      return snapshotDescs;
242    }
243
244    // ignore all the snapshots in progress
245    FileStatus[] snapshots = fs.listStatus(snapshotDir,
246      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
247    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
248    withCpCall = withCpCall && cpHost != null;
249    // loop through all the completed snapshots
250    for (FileStatus snapshot : snapshots) {
251      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
252      // if the snapshot is bad
253      if (!fs.exists(info)) {
254        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
255        continue;
256      }
257      FSDataInputStream in = null;
258      try {
259        in = fs.open(info);
260        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
261        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO = (withCpCall)
262            ? ProtobufUtil.createSnapshotDesc(desc) : null;
263        if (withCpCall) {
264          try {
265            cpHost.preListSnapshot(descPOJO);
266          } catch (AccessDeniedException e) {
267            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
268                + "Either you should be owner of this snapshot or admin user.");
269            // Skip this and try for next snapshot
270            continue;
271          }
272        }
273        snapshotDescs.add(desc);
274
275        // call coproc post hook
276        if (withCpCall) {
277          cpHost.postListSnapshot(descPOJO);
278        }
279      } catch (IOException e) {
280        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
281      } finally {
282        if (in != null) {
283          in.close();
284        }
285      }
286    }
287    return snapshotDescs;
288  }
289
290  /**
291   * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
292   * snapshot attempts.
293   *
294   * @throws IOException if we can't reach the filesystem
295   */
296  private void resetTempDir() throws IOException {
297    // cleanup any existing snapshots.
298    Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
299        master.getConfiguration());
300    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
301    if (!tmpFs.delete(tmpdir, true)) {
302      LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
303    }
304  }
305
306  /**
307   * Delete the specified snapshot
308   * @param snapshot
309   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
310   * @throws IOException For filesystem IOExceptions
311   */
312  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
313    // check to see if it is completed
314    if (!isSnapshotCompleted(snapshot)) {
315      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
316    }
317
318    String snapshotName = snapshot.getName();
319    // first create the snapshot description and check to see if it exists
320    FileSystem fs = master.getMasterFileSystem().getFileSystem();
321    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
322    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
323    // just the "name" and it does not contains the "real" snapshot information
324    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
325
326    // call coproc pre hook
327    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
328    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
329    if (cpHost != null) {
330      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
331      cpHost.preDeleteSnapshot(snapshotPOJO);
332    }
333
334    LOG.debug("Deleting snapshot: " + snapshotName);
335    // delete the existing snapshot
336    if (!fs.delete(snapshotDir, true)) {
337      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
338    }
339
340    // call coproc post hook
341    if (cpHost != null) {
342      cpHost.postDeleteSnapshot(snapshotPOJO);
343    }
344
345  }
346
347  /**
348   * Check if the specified snapshot is done
349   *
350   * @param expected
351   * @return true if snapshot is ready to be restored, false if it is still being taken.
352   * @throws IOException IOException if error from HDFS or RPC
353   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
354   */
355  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
356    // check the request to make sure it has a snapshot
357    if (expected == null) {
358      throw new UnknownSnapshotException(
359         "No snapshot name passed in request, can't figure out which snapshot you want to check.");
360    }
361
362    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
363
364    // check to see if the sentinel exists,
365    // and if the task is complete removes it from the in-progress snapshots map.
366    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
367
368    // stop tracking "abandoned" handlers
369    cleanupSentinels();
370
371    if (handler == null) {
372      // If there's no handler in the in-progress map, it means one of the following:
373      //   - someone has already requested the snapshot state
374      //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
375      //   - the snapshot was never requested
376      // In those cases returns to the user the "done state" if the snapshots exists on disk,
377      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
378      if (!isSnapshotCompleted(expected)) {
379        throw new UnknownSnapshotException("Snapshot " + ssString
380            + " is not currently running or one of the known completed snapshots.");
381      }
382      // was done, return true;
383      return true;
384    }
385
386    // pass on any failure we find in the sentinel
387    try {
388      handler.rethrowExceptionIfFailed();
389    } catch (ForeignException e) {
390      // Give some procedure info on an exception.
391      String status;
392      Procedure p = coordinator.getProcedure(expected.getName());
393      if (p != null) {
394        status = p.getStatus();
395      } else {
396        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
397      }
398      throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
399        ProtobufUtil.createSnapshotDesc(expected));
400    }
401
402    // check to see if we are done
403    if (handler.isFinished()) {
404      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
405      return true;
406    } else if (LOG.isDebugEnabled()) {
407      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
408    }
409    return false;
410  }
411
412  /**
413   * Check to see if there is a snapshot in progress with the same name or on the same table.
414   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
415   * don't allow snapshot with the same name.
416   * @param snapshot description of the snapshot being checked.
417   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
418   *         table.
419   */
420  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
421    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
422    if (isTakingSnapshot(snapshotTable)) {
423      return true;
424    }
425    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
426    while (it.hasNext()) {
427      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
428      SnapshotSentinel sentinel = entry.getValue();
429      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
430        return true;
431      }
432    }
433    return false;
434  }
435
436  /**
437   * Check to see if the specified table has a snapshot in progress.  Currently we have a
438   * limitation only allowing a single snapshot per table at a time.
439   * @param tableName name of the table being snapshotted.
440   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
441   */
442  public boolean isTakingSnapshot(final TableName tableName) {
443    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
444    return handler != null && !handler.isFinished();
445  }
446
447  /**
448   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
449   * aren't already running a snapshot or restore on the requested table.
450   * @param snapshot description of the snapshot we want to start
451   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
452   */
453  private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
454      throws HBaseSnapshotException {
455    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
456        master.getConfiguration());
457    TableName snapshotTable =
458        TableName.valueOf(snapshot.getTable());
459
460    // make sure we aren't already running a snapshot
461    if (isTakingSnapshot(snapshot)) {
462      SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
463      throw new SnapshotCreationException("Rejected taking "
464          + ClientSnapshotDescriptionUtils.toString(snapshot)
465          + " because we are already running another snapshot "
466          + (handler != null ? ("on the same table " +
467              ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
468              : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot));
469    }
470
471    // make sure we aren't running a restore on the same table
472    if (isRestoringTable(snapshotTable)) {
473      throw new SnapshotCreationException("Rejected taking "
474          + ClientSnapshotDescriptionUtils.toString(snapshot)
475          + " because we are already have a restore in progress on the same snapshot.");
476    }
477
478    try {
479      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
480      // delete the working directory, since we aren't running the snapshot. Likely leftovers
481      // from a failed attempt.
482      workingDirFS.delete(workingDir, true);
483
484      // recreate the working directory for the snapshot
485      if (!workingDirFS.mkdirs(workingDir)) {
486        throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
487            + ") for snapshot" , ProtobufUtil.createSnapshotDesc(snapshot));
488      }
489    } catch (HBaseSnapshotException e) {
490      throw e;
491    } catch (IOException e) {
492      throw new SnapshotCreationException(
493          "Exception while checking to see if snapshot could be started.", e,
494          ProtobufUtil.createSnapshotDesc(snapshot));
495    }
496  }
497
498  /**
499   * Take a snapshot of a disabled table.
500   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
501   * @throws IOException if the snapshot could not be started or filesystem for snapshot
502   *         temporary directory could not be determined
503   */
504  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
505      throws IOException {
506    // setup the snapshot
507    prepareToTakeSnapshot(snapshot);
508
509    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
510    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
511
512    // Take the snapshot of the disabled table
513    DisabledTableSnapshotHandler handler =
514        new DisabledTableSnapshotHandler(snapshot, master, this);
515    snapshotTable(snapshot, handler);
516  }
517
518  /**
519   * Take a snapshot of an enabled table.
520   * @param snapshot description of the snapshot to take.
521   * @throws IOException if the snapshot could not be started or filesystem for snapshot
522   *         temporary directory could not be determined
523   */
524  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
525          throws IOException {
526    // setup the snapshot
527    prepareToTakeSnapshot(snapshot);
528
529    // Take the snapshot of the enabled table
530    EnabledTableSnapshotHandler handler =
531        new EnabledTableSnapshotHandler(snapshot, master, this);
532    snapshotTable(snapshot, handler);
533  }
534
535  /**
536   * Take a snapshot using the specified handler.
537   * On failure the snapshot temporary working directory is removed.
538   * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
539   *       snapshot request if the table is busy with another snapshot/restore operation.
540   * @param snapshot the snapshot description
541   * @param handler the snapshot handler
542   */
543  private synchronized void snapshotTable(SnapshotDescription snapshot,
544      final TakeSnapshotHandler handler) throws IOException {
545    try {
546      handler.prepare();
547      this.executorService.submit(handler);
548      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
549    } catch (Exception e) {
550      // cleanup the working directory by trying to delete it from the fs.
551      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
552          master.getConfiguration());
553      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
554      try {
555        if (!workingDirFs.delete(workingDir, true)) {
556          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
557              ClientSnapshotDescriptionUtils.toString(snapshot));
558        }
559      } catch (IOException e1) {
560        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
561            ClientSnapshotDescriptionUtils.toString(snapshot));
562      }
563      // fail the snapshot
564      throw new SnapshotCreationException("Could not build snapshot handler", e,
565        ProtobufUtil.createSnapshotDesc(snapshot));
566    }
567  }
568
569  public ReadWriteLock getTakingSnapshotLock() {
570    return this.takingSnapshotLock;
571  }
572
573  /**
574   * The snapshot operation processing as following: <br>
575   * 1. Create a Snapshot Handler, and do some initialization; <br>
576   * 2. Put the handler into snapshotHandlers <br>
577   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
578   * and snapshotHandlers;
579   * @return true to indicate that there're some running snapshots.
580   */
581  public synchronized boolean isTakingAnySnapshot() {
582    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0;
583  }
584
585  /**
586   * Take a snapshot based on the enabled/disabled state of the table.
587   * @param snapshot
588   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
589   * @throws IOException when some sort of generic IO exception occurs.
590   */
591  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
592    this.takingSnapshotLock.readLock().lock();
593    try {
594      takeSnapshotInternal(snapshot);
595    } finally {
596      this.takingSnapshotLock.readLock().unlock();
597    }
598  }
599
600  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
601    // check to see if we already completed the snapshot
602    if (isSnapshotCompleted(snapshot)) {
603      throw new SnapshotExistsException(
604          "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
605          ProtobufUtil.createSnapshotDesc(snapshot));
606    }
607
608    LOG.debug("No existing snapshot, attempting snapshot...");
609
610    // stop tracking "abandoned" handlers
611    cleanupSentinels();
612
613    // check to see if the table exists
614    TableDescriptor desc = null;
615    try {
616      desc = master.getTableDescriptors().get(
617          TableName.valueOf(snapshot.getTable()));
618    } catch (FileNotFoundException e) {
619      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
620      LOG.error(msg);
621      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
622    } catch (IOException e) {
623      throw new SnapshotCreationException(
624          "Error while geting table description for table " + snapshot.getTable(), e,
625          ProtobufUtil.createSnapshotDesc(snapshot));
626    }
627    if (desc == null) {
628      throw new SnapshotCreationException(
629          "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
630          ProtobufUtil.createSnapshotDesc(snapshot));
631    }
632    SnapshotDescription.Builder builder = snapshot.toBuilder();
633    // if not specified, set the snapshot format
634    if (!snapshot.hasVersion()) {
635      builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
636    }
637    RpcServer.getRequestUser().ifPresent(user -> {
638      if (AccessChecker.isAuthorizationSupported(master.getConfiguration())) {
639        builder.setOwner(user.getShortName());
640      }
641    });
642    snapshot = builder.build();
643
644    // call pre coproc hook
645    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
646    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
647    if (cpHost != null) {
648      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
649      cpHost.preSnapshot(snapshotPOJO, desc);
650    }
651
652    // if the table is enabled, then have the RS run actually the snapshot work
653    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
654    if (master.getTableStateManager().isTableState(snapshotTable,
655        TableState.State.ENABLED)) {
656      if (LOG.isDebugEnabled()) {
657        LOG.debug("Table enabled, starting distributed snapshots for {}",
658          ClientSnapshotDescriptionUtils.toString(snapshot));
659      }
660      snapshotEnabledTable(snapshot);
661      if (LOG.isDebugEnabled()) {
662        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
663      }
664    }
665    // For disabled table, snapshot is created by the master
666    else if (master.getTableStateManager().isTableState(snapshotTable,
667        TableState.State.DISABLED)) {
668      if (LOG.isDebugEnabled()) {
669        LOG.debug("Table is disabled, running snapshot entirely on master for {}",
670          ClientSnapshotDescriptionUtils.toString(snapshot));
671      }
672      snapshotDisabledTable(snapshot);
673      if (LOG.isDebugEnabled()) {
674        LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
675      }
676    } else {
677      LOG.error("Can't snapshot table '" + snapshot.getTable()
678          + "', isn't open or closed, we don't know what to do!");
679      TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
680          + " isn't fully open.");
681      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
682        ProtobufUtil.createSnapshotDesc(snapshot));
683    }
684
685    // call post coproc hook
686    if (cpHost != null) {
687      cpHost.postSnapshot(snapshotPOJO, desc);
688    }
689  }
690
691  /**
692   * Set the handler for the current snapshot
693   * <p>
694   * Exposed for TESTING
695   * @param tableName
696   * @param handler handler the master should use
697   *
698   * TODO get rid of this if possible, repackaging, modify tests.
699   */
700  public synchronized void setSnapshotHandlerForTesting(
701      final TableName tableName,
702      final SnapshotSentinel handler) {
703    if (handler != null) {
704      this.snapshotHandlers.put(tableName, handler);
705    } else {
706      this.snapshotHandlers.remove(tableName);
707    }
708  }
709
710  /**
711   * @return distributed commit coordinator for all running snapshots
712   */
713  ProcedureCoordinator getCoordinator() {
714    return coordinator;
715  }
716
717  /**
718   * Check to see if the snapshot is one of the currently completed snapshots
719   * Returns true if the snapshot exists in the "completed snapshots folder".
720   *
721   * @param snapshot expected snapshot to check
722   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
723   *         not stored
724   * @throws IOException if the filesystem throws an unexpected exception,
725   * @throws IllegalArgumentException if snapshot name is invalid.
726   */
727  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
728    try {
729      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
730      FileSystem fs = master.getMasterFileSystem().getFileSystem();
731      // check to see if the snapshot already exists
732      return fs.exists(snapshotDir);
733    } catch (IllegalArgumentException iae) {
734      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
735    }
736  }
737
738  /**
739   * Clone the specified snapshot.
740   * The clone will fail if the destination table has a snapshot or restore in progress.
741   *
742   * @param reqSnapshot Snapshot Descriptor from request
743   * @param tableName table to clone
744   * @param snapshot Snapshot Descriptor
745   * @param snapshotTableDesc Table Descriptor
746   * @param nonceKey unique identifier to prevent duplicated RPC
747   * @return procId the ID of the clone snapshot procedure
748   * @throws IOException
749   */
750  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
751      final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
752      final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
753    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
754    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
755    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
756    if (cpHost != null) {
757      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
758      cpHost.preCloneSnapshot(snapshotPOJO, htd);
759    }
760    long procId;
761    try {
762      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl);
763    } catch (IOException e) {
764      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
765        + " as table " + tableName.getNameAsString(), e);
766      throw e;
767    }
768    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
769
770    if (cpHost != null) {
771      cpHost.postCloneSnapshot(snapshotPOJO, htd);
772    }
773    return procId;
774  }
775
776  /**
777   * Clone the specified snapshot into a new table.
778   * The operation will fail if the destination table has a snapshot or restore in progress.
779   *
780   * @param snapshot Snapshot Descriptor
781   * @param tableDescriptor Table Descriptor of the table to create
782   * @param nonceKey unique identifier to prevent duplicated RPC
783   * @return procId the ID of the clone snapshot procedure
784   */
785  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
786      final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
787      throws HBaseSnapshotException {
788    TableName tableName = tableDescriptor.getTableName();
789
790    // make sure we aren't running a snapshot on the same table
791    if (isTakingSnapshot(tableName)) {
792      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
793    }
794
795    // make sure we aren't running a restore on the same table
796    if (isRestoringTable(tableName)) {
797      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
798    }
799
800    try {
801      long procId = master.getMasterProcedureExecutor().submitProcedure(
802        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
803                tableDescriptor, snapshot, restoreAcl),
804        nonceKey);
805      this.restoreTableToProcIdMap.put(tableName, procId);
806      return procId;
807    } catch (Exception e) {
808      String msg = "Couldn't clone the snapshot="
809        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
810      LOG.error(msg, e);
811      throw new RestoreSnapshotException(msg, e);
812    }
813  }
814
815  /**
816   * Restore or Clone the specified snapshot
817   * @param reqSnapshot
818   * @param nonceKey unique identifier to prevent duplicated RPC
819   * @throws IOException
820   */
821  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
822      final boolean restoreAcl) throws IOException {
823    FileSystem fs = master.getMasterFileSystem().getFileSystem();
824    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
825
826    // check if the snapshot exists
827    if (!fs.exists(snapshotDir)) {
828      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
829      throw new SnapshotDoesNotExistException(
830        ProtobufUtil.createSnapshotDesc(reqSnapshot));
831    }
832
833    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
834    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
835    // information.
836    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
837    SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
838        snapshotDir, snapshot);
839    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
840    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
841
842    // sanity check the new table descriptor
843    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
844
845    // stop tracking "abandoned" handlers
846    cleanupSentinels();
847
848    // Verify snapshot validity
849    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
850
851    // Execute the restore/clone operation
852    long procId;
853    if (master.getTableDescriptors().exists(tableName)) {
854      procId = restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
855        restoreAcl);
856    } else {
857      procId =
858          cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
859    }
860    return procId;
861  }
862
863  /**
864   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
865   * or restore in progress.
866   * @param reqSnapshot Snapshot Descriptor from request
867   * @param tableName table to restore
868   * @param snapshot Snapshot Descriptor
869   * @param snapshotTableDesc Table Descriptor
870   * @param nonceKey unique identifier to prevent duplicated RPC
871   * @param restoreAcl true to restore acl of snapshot
872   * @return procId the ID of the restore snapshot procedure
873   * @throws IOException
874   */
875  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
876      final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
877      final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
878    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
879
880    if (master.getTableStateManager().isTableState(
881      TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
882      throw new UnsupportedOperationException("Table '" +
883        TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
884        "perform a restore operation.");
885    }
886
887    // call Coprocessor pre hook
888    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
889    if (cpHost != null) {
890      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
891      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
892    }
893
894    long procId;
895    try {
896      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
897    } catch (IOException e) {
898      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
899        + " as table " + tableName.getNameAsString(), e);
900      throw e;
901    }
902    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
903
904    if (cpHost != null) {
905      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
906    }
907
908    return procId;
909  }
910
911  /**
912   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
913   * or restore in progress.
914   * @param snapshot Snapshot Descriptor
915   * @param tableDescriptor Table Descriptor
916   * @param nonceKey unique identifier to prevent duplicated RPC
917   * @param restoreAcl true to restore acl of snapshot
918   * @return procId the ID of the restore snapshot procedure
919   */
920  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
921      final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
922      throws HBaseSnapshotException {
923    final TableName tableName = tableDescriptor.getTableName();
924
925    // make sure we aren't running a snapshot on the same table
926    if (isTakingSnapshot(tableName)) {
927      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
928    }
929
930    // make sure we aren't running a restore on the same table
931    if (isRestoringTable(tableName)) {
932      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
933    }
934
935    try {
936      long procId = master.getMasterProcedureExecutor().submitProcedure(
937        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
938                tableDescriptor, snapshot, restoreAcl),
939        nonceKey);
940      this.restoreTableToProcIdMap.put(tableName, procId);
941      return procId;
942    } catch (Exception e) {
943      String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
944          snapshot)  +
945          " on table=" + tableName;
946      LOG.error(msg, e);
947      throw new RestoreSnapshotException(msg, e);
948    }
949  }
950
951  /**
952   * Verify if the restore of the specified table is in progress.
953   *
954   * @param tableName table under restore
955   * @return <tt>true</tt> if there is a restore in progress of the specified table.
956   */
957  private synchronized boolean isRestoringTable(final TableName tableName) {
958    Long procId = this.restoreTableToProcIdMap.get(tableName);
959    if (procId == null) {
960      return false;
961    }
962    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
963    if (procExec.isRunning() && !procExec.isFinished(procId)) {
964      return true;
965    } else {
966      this.restoreTableToProcIdMap.remove(tableName);
967      return false;
968    }
969  }
970
971  /**
972   * Return the handler if it is currently live and has the same snapshot target name.
973   * The handler is removed from the sentinels map if completed.
974   * @param sentinels live handlers
975   * @param snapshot snapshot description
976   * @return null if doesn't match, else a live handler.
977   */
978  private synchronized SnapshotSentinel removeSentinelIfFinished(
979      final Map<TableName, SnapshotSentinel> sentinels,
980      final SnapshotDescription snapshot) {
981    if (!snapshot.hasTable()) {
982      return null;
983    }
984
985    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
986    SnapshotSentinel h = sentinels.get(snapshotTable);
987    if (h == null) {
988      return null;
989    }
990
991    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
992      // specified snapshot is to the one currently running
993      return null;
994    }
995
996    // Remove from the "in-progress" list once completed
997    if (h.isFinished()) {
998      sentinels.remove(snapshotTable);
999    }
1000
1001    return h;
1002  }
1003
1004  /**
1005   * Removes "abandoned" snapshot/restore requests.
1006   * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
1007   * and the in-progress maps are cleaned up when the status of a completed task is requested.
1008   * To avoid having sentinels staying around for long time if something client side is failed,
1009   * each operation tries to clean up the in-progress maps sentinels finished from a long time.
1010   */
1011  private void cleanupSentinels() {
1012    cleanupSentinels(this.snapshotHandlers);
1013    cleanupCompletedRestoreInMap();
1014  }
1015
1016  /**
1017   * Remove the sentinels that are marked as finished and the completion time
1018   * has exceeded the removal timeout.
1019   * @param sentinels map of sentinels to clean
1020   */
1021  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1022    long currentTime = EnvironmentEdgeManager.currentTime();
1023    long sentinelsCleanupTimeoutMillis =
1024        master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1025          SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1026    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1027    while (it.hasNext()) {
1028      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1029      SnapshotSentinel sentinel = entry.getValue();
1030      if (sentinel.isFinished()
1031          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
1032        it.remove();
1033      }
1034    }
1035  }
1036
1037  /**
1038   * Remove the procedures that are marked as finished
1039   */
1040  private synchronized void cleanupCompletedRestoreInMap() {
1041    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1042    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1043    while (it.hasNext()) {
1044      Map.Entry<TableName, Long> entry = it.next();
1045      Long procId = entry.getValue();
1046      if (procExec.isRunning() && procExec.isFinished(procId)) {
1047        it.remove();
1048      }
1049    }
1050  }
1051
1052  //
1053  // Implementing Stoppable interface
1054  //
1055
1056  @Override
1057  public void stop(String why) {
1058    // short circuit
1059    if (this.stopped) return;
1060    // make sure we get stop
1061    this.stopped = true;
1062    // pass the stop onto take snapshot handlers
1063    for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
1064      snapshotHandler.cancel(why);
1065    }
1066    if (snapshotHandlerChoreCleanerTask != null) {
1067      snapshotHandlerChoreCleanerTask.cancel(true);
1068    }
1069    try {
1070      if (coordinator != null) {
1071        coordinator.close();
1072      }
1073    } catch (IOException e) {
1074      LOG.error("stop ProcedureCoordinator error", e);
1075    }
1076  }
1077
1078  @Override
1079  public boolean isStopped() {
1080    return this.stopped;
1081  }
1082
1083  /**
1084   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1085   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1086   * @throws UnsupportedOperationException if snapshot are not supported
1087   */
1088  public void checkSnapshotSupport() throws UnsupportedOperationException {
1089    if (!this.isSnapshotSupported) {
1090      throw new UnsupportedOperationException(
1091        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1092          HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1093    }
1094  }
1095
1096  /**
1097   * Called at startup, to verify if snapshot operation is supported, and to avoid
1098   * starting the master if there're snapshots present but the cleaners needed are missing.
1099   * Otherwise we can end up with snapshot data loss.
1100   * @param conf The {@link Configuration} object to use
1101   * @param mfs The MasterFileSystem to use
1102   * @throws IOException in case of file-system operation failure
1103   * @throws UnsupportedOperationException in case cleaners are missing and
1104   *         there're snapshot in the system
1105   */
1106  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1107      throws IOException, UnsupportedOperationException {
1108    // Verify if snapshot is disabled by the user
1109    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1110    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1111    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1112
1113    // Extract cleaners from conf
1114    Set<String> hfileCleaners = new HashSet<>();
1115    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1116    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1117
1118    Set<String> logCleaners = new HashSet<>();
1119    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1120    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1121
1122    // check if an older version of snapshot directory was present
1123    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1124    FileSystem fs = mfs.getFileSystem();
1125    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1126    if (ss != null && !ss.isEmpty()) {
1127      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1128      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1129    }
1130
1131    // If the user has enabled the snapshot, we force the cleaners to be present
1132    // otherwise we still need to check if cleaners are enabled or not and verify
1133    // that there're no snapshot in the .snapshot folder.
1134    if (snapshotEnabled) {
1135      // Inject snapshot cleaners, if snapshot.enable is true
1136      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1137      hfileCleaners.add(HFileLinkCleaner.class.getName());
1138      // If sync acl to HDFS feature is enabled, then inject the cleaner
1139      if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
1140        hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
1141      }
1142
1143      // Set cleaners conf
1144      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1145        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1146      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1147        logCleaners.toArray(new String[logCleaners.size()]));
1148    } else {
1149      // Verify if cleaners are present
1150      snapshotEnabled =
1151        hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1152        hfileCleaners.contains(HFileLinkCleaner.class.getName());
1153
1154      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1155      if (snapshotEnabled) {
1156        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1157          "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1158          (userDisabled ? "is set to 'false'." : "is not set."));
1159      }
1160    }
1161
1162    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1163    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1164
1165    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1166    // otherwise we end up with snapshot data loss.
1167    if (!snapshotEnabled) {
1168      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1169      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1170      if (fs.exists(snapshotDir)) {
1171        FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
1172          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1173        if (snapshots != null) {
1174          LOG.error("Snapshots are present, but cleaners are not enabled.");
1175          checkSnapshotSupport();
1176        }
1177      }
1178    }
1179  }
1180
1181  @Override
1182  public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1183      IOException, UnsupportedOperationException {
1184    this.master = master;
1185
1186    this.rootDir = master.getMasterFileSystem().getRootDir();
1187    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1188
1189    // get the configuration for the coordinator
1190    Configuration conf = master.getConfiguration();
1191    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1192    long timeoutMillis = Math.max(
1193            conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1194                    SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
1195            conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1196                    SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1197    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1198
1199    // setup the default procedure coordinator
1200    String name = master.getServerName().toString();
1201    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1202    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
1203        master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1204
1205    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1206    this.executorService = master.getExecutorService();
1207    resetTempDir();
1208    snapshotHandlerChoreCleanerTask =
1209        scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1210  }
1211
1212  @Override
1213  public String getProcedureSignature() {
1214    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1215  }
1216
1217  @Override
1218  public void execProcedure(ProcedureDescription desc) throws IOException {
1219    takeSnapshot(toSnapshotDescription(desc));
1220  }
1221
1222  @Override
1223  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1224      throws IOException {
1225    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1226    // In future, when we AC is removed for good, that check should be moved here.
1227  }
1228
1229  @Override
1230  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1231    return isSnapshotDone(toSnapshotDescription(desc));
1232  }
1233
1234  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1235      throws IOException {
1236    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1237    if (!desc.hasInstance()) {
1238      throw new IOException("Snapshot name is not defined: " + desc.toString());
1239    }
1240    String snapshotName = desc.getInstance();
1241    List<NameStringPair> props = desc.getConfigurationList();
1242    String table = null;
1243    for (NameStringPair prop : props) {
1244      if ("table".equalsIgnoreCase(prop.getName())) {
1245        table = prop.getValue();
1246      }
1247    }
1248    if (table == null) {
1249      throw new IOException("Snapshot table is not defined: " + desc.toString());
1250    }
1251    TableName tableName = TableName.valueOf(table);
1252    builder.setTable(tableName.getNameAsString());
1253    builder.setName(snapshotName);
1254    builder.setType(SnapshotDescription.Type.FLUSH);
1255    return builder.build();
1256  }
1257}