001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.snapshot;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.HashSet;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ConcurrentHashMap;
031import java.util.concurrent.Executors;
032import java.util.concurrent.ScheduledExecutorService;
033import java.util.concurrent.ScheduledFuture;
034import java.util.concurrent.ThreadPoolExecutor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.locks.ReadWriteLock;
037import java.util.concurrent.locks.ReentrantReadWriteLock;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FSDataInputStream;
041import org.apache.hadoop.fs.FileStatus;
042import org.apache.hadoop.fs.FileSystem;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseInterfaceAudience;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.MetaTableAccessor;
047import org.apache.hadoop.hbase.Stoppable;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.client.TableState;
052import org.apache.hadoop.hbase.errorhandling.ForeignException;
053import org.apache.hadoop.hbase.executor.ExecutorService;
054import org.apache.hadoop.hbase.ipc.RpcServer;
055import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
056import org.apache.hadoop.hbase.master.MasterFileSystem;
057import org.apache.hadoop.hbase.master.MasterServices;
058import org.apache.hadoop.hbase.master.MetricsMaster;
059import org.apache.hadoop.hbase.master.SnapshotSentinel;
060import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
061import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
062import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
064import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
065import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
066import org.apache.hadoop.hbase.procedure.Procedure;
067import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
068import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
069import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
070import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
071import org.apache.hadoop.hbase.security.AccessDeniedException;
072import org.apache.hadoop.hbase.security.User;
073import org.apache.hadoop.hbase.security.access.AccessChecker;
074import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
075import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
076import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
077import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
078import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
079import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
080import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
081import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
082import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
083import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
084import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
085import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
086import org.apache.hadoop.hbase.util.FSUtils;
087import org.apache.hadoop.hbase.util.NonceKey;
088import org.apache.hadoop.hbase.util.TableDescriptorChecker;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.apache.yetus.audience.InterfaceStability;
091import org.apache.zookeeper.KeeperException;
092import org.slf4j.Logger;
093import org.slf4j.LoggerFactory;
094import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
096import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
097import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
098import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
099import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
100import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
101
102/**
103 * This class manages the procedure of taking and restoring snapshots. There is only one
104 * SnapshotManager for the master.
105 * <p>
106 * The class provides methods for monitoring in-progress snapshot actions.
107 * <p>
108 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
109 * simplification in the current implementation.
110 */
111@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
112@InterfaceStability.Unstable
113public class SnapshotManager extends MasterProcedureManager implements Stoppable {
114  private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
115
116  /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
117  private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
118
119  /**
120   * Wait time before removing a finished sentinel from the in-progress map
121   *
122   * NOTE: This is used as a safety auto cleanup.
123   * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
124   * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
125   * In case something fails on the client side and the snapshot/restore state is not reclaimed
126   * after a default timeout, the entry is removed from the in-progress map.
127   * At this point, if the user asks for the snapshot/restore status, the result will be
128   * snapshot done if exists or failed if it doesn't exists.
129   */
130  public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
131      "hbase.snapshot.sentinels.cleanup.timeoutMillis";
132  public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
133
134  /** Enable or disable snapshot support */
135  public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
136
137  /**
138   * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
139   * completion.
140   */
141  private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
142
143  /** Name of the operation to use in the controller */
144  public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
145
146  /** Conf key for # of threads used by the SnapshotManager thread pool */
147  public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
148
149  /** number of current operations running on the master */
150  public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
151
152  private boolean stopped;
153  private MasterServices master;  // Needed by TableEventHandlers
154  private ProcedureCoordinator coordinator;
155
156  // Is snapshot feature enabled?
157  private boolean isSnapshotSupported = false;
158
159  // Snapshot handlers map, with table name as key.
160  // The map is always accessed and modified under the object lock using synchronized.
161  // snapshotTable() will insert an Handler in the table.
162  // isSnapshotDone() will remove the handler requested if the operation is finished.
163  private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
164  private final ScheduledExecutorService scheduleThreadPool =
165      Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
166          .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
167  private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
168
169  // Restore map, with table name as key, procedure ID as value.
170  // The map is always accessed and modified under the object lock using synchronized.
171  // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
172  //
173  // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
174  // restart/failover. This is just a stopgap implementation until implementation of taking
175  // snapshot using Procedure-V2.
176  private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
177
178  private Path rootDir;
179  private ExecutorService executorService;
180
181  /**
182   * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
183   * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
184   * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
185   * start to work. (See HBASE-21387)
186   */
187  private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
188
189  public SnapshotManager() {}
190
191  /**
192   * Fully specify all necessary components of a snapshot manager. Exposed for testing.
193   * @param master services for the master where the manager is running
194   * @param coordinator procedure coordinator instance.  exposed for testing.
195   * @param pool HBase ExecutorServcie instance, exposed for testing.
196   */
197  @VisibleForTesting
198  SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
199      ExecutorService pool, int sentinelCleanInterval)
200      throws IOException, UnsupportedOperationException {
201    this.master = master;
202
203    this.rootDir = master.getMasterFileSystem().getRootDir();
204    Configuration conf = master.getConfiguration();
205    checkSnapshotSupport(conf, master.getMasterFileSystem());
206
207    this.coordinator = coordinator;
208    this.executorService = pool;
209    resetTempDir();
210    snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
211      this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
212  }
213
214  /**
215   * Gets the list of all completed snapshots.
216   * @return list of SnapshotDescriptions
217   * @throws IOException File system exception
218   */
219  public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
220    return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
221  }
222
223  /**
224   * Gets the list of all completed snapshots.
225   * @param snapshotDir snapshot directory
226   * @param withCpCall Whether to call CP hooks
227   * @return list of SnapshotDescriptions
228   * @throws IOException File system exception
229   */
230  private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
231      throws IOException {
232    List<SnapshotDescription> snapshotDescs = new ArrayList<>();
233    // first create the snapshot root path and check to see if it exists
234    FileSystem fs = master.getMasterFileSystem().getFileSystem();
235    if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
236
237    // if there are no snapshots, return an empty list
238    if (!fs.exists(snapshotDir)) {
239      return snapshotDescs;
240    }
241
242    // ignore all the snapshots in progress
243    FileStatus[] snapshots = fs.listStatus(snapshotDir,
244      new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
245    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
246    withCpCall = withCpCall && cpHost != null;
247    // loop through all the completed snapshots
248    for (FileStatus snapshot : snapshots) {
249      Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
250      // if the snapshot is bad
251      if (!fs.exists(info)) {
252        LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
253        continue;
254      }
255      FSDataInputStream in = null;
256      try {
257        in = fs.open(info);
258        SnapshotDescription desc = SnapshotDescription.parseFrom(in);
259        org.apache.hadoop.hbase.client.SnapshotDescription descPOJO = (withCpCall)
260            ? ProtobufUtil.createSnapshotDesc(desc) : null;
261        if (withCpCall) {
262          try {
263            cpHost.preListSnapshot(descPOJO);
264          } catch (AccessDeniedException e) {
265            LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
266                + "Either you should be owner of this snapshot or admin user.");
267            // Skip this and try for next snapshot
268            continue;
269          }
270        }
271        snapshotDescs.add(desc);
272
273        // call coproc post hook
274        if (withCpCall) {
275          cpHost.postListSnapshot(descPOJO);
276        }
277      } catch (IOException e) {
278        LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
279      } finally {
280        if (in != null) {
281          in.close();
282        }
283      }
284    }
285    return snapshotDescs;
286  }
287
288  /**
289   * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
290   * snapshot attempts.
291   *
292   * @throws IOException if we can't reach the filesystem
293   */
294  private void resetTempDir() throws IOException {
295    // cleanup any existing snapshots.
296    Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
297        master.getConfiguration());
298    FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
299    if (!tmpFs.delete(tmpdir, true)) {
300      LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
301    }
302  }
303
304  /**
305   * Delete the specified snapshot
306   * @param snapshot
307   * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
308   * @throws IOException For filesystem IOExceptions
309   */
310  public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
311    // check to see if it is completed
312    if (!isSnapshotCompleted(snapshot)) {
313      throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
314    }
315
316    String snapshotName = snapshot.getName();
317    // first create the snapshot description and check to see if it exists
318    FileSystem fs = master.getMasterFileSystem().getFileSystem();
319    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
320    // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
321    // just the "name" and it does not contains the "real" snapshot information
322    snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
323
324    // call coproc pre hook
325    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
326    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
327    if (cpHost != null) {
328      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
329      cpHost.preDeleteSnapshot(snapshotPOJO);
330    }
331
332    LOG.debug("Deleting snapshot: " + snapshotName);
333    // delete the existing snapshot
334    if (!fs.delete(snapshotDir, true)) {
335      throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
336    }
337
338    // call coproc post hook
339    if (cpHost != null) {
340      cpHost.postDeleteSnapshot(snapshotPOJO);
341    }
342
343  }
344
345  /**
346   * Check if the specified snapshot is done
347   *
348   * @param expected
349   * @return true if snapshot is ready to be restored, false if it is still being taken.
350   * @throws IOException IOException if error from HDFS or RPC
351   * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
352   */
353  public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
354    // check the request to make sure it has a snapshot
355    if (expected == null) {
356      throw new UnknownSnapshotException(
357         "No snapshot name passed in request, can't figure out which snapshot you want to check.");
358    }
359
360    String ssString = ClientSnapshotDescriptionUtils.toString(expected);
361
362    // check to see if the sentinel exists,
363    // and if the task is complete removes it from the in-progress snapshots map.
364    SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
365
366    // stop tracking "abandoned" handlers
367    cleanupSentinels();
368
369    if (handler == null) {
370      // If there's no handler in the in-progress map, it means one of the following:
371      //   - someone has already requested the snapshot state
372      //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
373      //   - the snapshot was never requested
374      // In those cases returns to the user the "done state" if the snapshots exists on disk,
375      // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
376      if (!isSnapshotCompleted(expected)) {
377        throw new UnknownSnapshotException("Snapshot " + ssString
378            + " is not currently running or one of the known completed snapshots.");
379      }
380      // was done, return true;
381      return true;
382    }
383
384    // pass on any failure we find in the sentinel
385    try {
386      handler.rethrowExceptionIfFailed();
387    } catch (ForeignException e) {
388      // Give some procedure info on an exception.
389      String status;
390      Procedure p = coordinator.getProcedure(expected.getName());
391      if (p != null) {
392        status = p.getStatus();
393      } else {
394        status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
395      }
396      throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
397        ProtobufUtil.createSnapshotDesc(expected));
398    }
399
400    // check to see if we are done
401    if (handler.isFinished()) {
402      LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
403      return true;
404    } else if (LOG.isDebugEnabled()) {
405      LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
406    }
407    return false;
408  }
409
410  /**
411   * Check to see if there is a snapshot in progress with the same name or on the same table.
412   * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
413   * don't allow snapshot with the same name.
414   * @param snapshot description of the snapshot being checked.
415   * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
416   *         table.
417   */
418  synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
419    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
420    if (isTakingSnapshot(snapshotTable)) {
421      return true;
422    }
423    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
424    while (it.hasNext()) {
425      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
426      SnapshotSentinel sentinel = entry.getValue();
427      if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
428        return true;
429      }
430    }
431    return false;
432  }
433
434  /**
435   * Check to see if the specified table has a snapshot in progress.  Currently we have a
436   * limitation only allowing a single snapshot per table at a time.
437   * @param tableName name of the table being snapshotted.
438   * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
439   */
440  public boolean isTakingSnapshot(final TableName tableName) {
441    SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
442    return handler != null && !handler.isFinished();
443  }
444
445  /**
446   * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
447   * aren't already running a snapshot or restore on the requested table.
448   * @param snapshot description of the snapshot we want to start
449   * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
450   */
451  private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
452      throws HBaseSnapshotException {
453    Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
454        master.getConfiguration());
455    TableName snapshotTable =
456        TableName.valueOf(snapshot.getTable());
457
458    // make sure we aren't already running a snapshot
459    if (isTakingSnapshot(snapshot)) {
460      SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
461      throw new SnapshotCreationException("Rejected taking "
462          + ClientSnapshotDescriptionUtils.toString(snapshot)
463          + " because we are already running another snapshot "
464          + (handler != null ? ("on the same table " +
465              ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
466              : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot));
467    }
468
469    // make sure we aren't running a restore on the same table
470    if (isRestoringTable(snapshotTable)) {
471      throw new SnapshotCreationException("Rejected taking "
472          + ClientSnapshotDescriptionUtils.toString(snapshot)
473          + " because we are already have a restore in progress on the same snapshot.");
474    }
475
476    try {
477      FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
478      // delete the working directory, since we aren't running the snapshot. Likely leftovers
479      // from a failed attempt.
480      workingDirFS.delete(workingDir, true);
481
482      // recreate the working directory for the snapshot
483      if (!workingDirFS.mkdirs(workingDir)) {
484        throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
485            + ") for snapshot" , ProtobufUtil.createSnapshotDesc(snapshot));
486      }
487    } catch (HBaseSnapshotException e) {
488      throw e;
489    } catch (IOException e) {
490      throw new SnapshotCreationException(
491          "Exception while checking to see if snapshot could be started.", e,
492          ProtobufUtil.createSnapshotDesc(snapshot));
493    }
494  }
495
496  /**
497   * Take a snapshot of a disabled table.
498   * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
499   * @throws IOException if the snapshot could not be started or filesystem for snapshot
500   *         temporary directory could not be determined
501   */
502  private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
503      throws IOException {
504    // setup the snapshot
505    prepareToTakeSnapshot(snapshot);
506
507    // set the snapshot to be a disabled snapshot, since the client doesn't know about that
508    snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
509
510    // Take the snapshot of the disabled table
511    DisabledTableSnapshotHandler handler =
512        new DisabledTableSnapshotHandler(snapshot, master, this);
513    snapshotTable(snapshot, handler);
514  }
515
516  /**
517   * Take a snapshot of an enabled table.
518   * @param snapshot description of the snapshot to take.
519   * @throws IOException if the snapshot could not be started or filesystem for snapshot
520   *         temporary directory could not be determined
521   */
522  private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
523          throws IOException {
524    // setup the snapshot
525    prepareToTakeSnapshot(snapshot);
526
527    // Take the snapshot of the enabled table
528    EnabledTableSnapshotHandler handler =
529        new EnabledTableSnapshotHandler(snapshot, master, this);
530    snapshotTable(snapshot, handler);
531  }
532
533  /**
534   * Take a snapshot using the specified handler.
535   * On failure the snapshot temporary working directory is removed.
536   * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
537   *       snapshot request if the table is busy with another snapshot/restore operation.
538   * @param snapshot the snapshot description
539   * @param handler the snapshot handler
540   */
541  private synchronized void snapshotTable(SnapshotDescription snapshot,
542      final TakeSnapshotHandler handler) throws IOException {
543    try {
544      handler.prepare();
545      this.executorService.submit(handler);
546      this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
547    } catch (Exception e) {
548      // cleanup the working directory by trying to delete it from the fs.
549      Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
550          master.getConfiguration());
551      FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
552      try {
553        if (!workingDirFs.delete(workingDir, true)) {
554          LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
555              ClientSnapshotDescriptionUtils.toString(snapshot));
556        }
557      } catch (IOException e1) {
558        LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
559            ClientSnapshotDescriptionUtils.toString(snapshot));
560      }
561      // fail the snapshot
562      throw new SnapshotCreationException("Could not build snapshot handler", e,
563        ProtobufUtil.createSnapshotDesc(snapshot));
564    }
565  }
566
567  public ReadWriteLock getTakingSnapshotLock() {
568    return this.takingSnapshotLock;
569  }
570
571  /**
572   * The snapshot operation processing as following: <br>
573   * 1. Create a Snapshot Handler, and do some initialization; <br>
574   * 2. Put the handler into snapshotHandlers <br>
575   * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
576   * and snapshotHandlers;
577   * @return true to indicate that there're some running snapshots.
578   */
579  public synchronized boolean isTakingAnySnapshot() {
580    return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0;
581  }
582
583  /**
584   * Take a snapshot based on the enabled/disabled state of the table.
585   * @param snapshot
586   * @throws HBaseSnapshotException when a snapshot specific exception occurs.
587   * @throws IOException when some sort of generic IO exception occurs.
588   */
589  public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
590    this.takingSnapshotLock.readLock().lock();
591    try {
592      takeSnapshotInternal(snapshot);
593    } finally {
594      this.takingSnapshotLock.readLock().unlock();
595    }
596  }
597
598  private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
599    // check to see if we already completed the snapshot
600    if (isSnapshotCompleted(snapshot)) {
601      throw new SnapshotExistsException(
602          "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
603          ProtobufUtil.createSnapshotDesc(snapshot));
604    }
605
606    LOG.debug("No existing snapshot, attempting snapshot...");
607
608    // stop tracking "abandoned" handlers
609    cleanupSentinels();
610
611    // check to see if the table exists
612    TableDescriptor desc = null;
613    try {
614      desc = master.getTableDescriptors().get(
615          TableName.valueOf(snapshot.getTable()));
616    } catch (FileNotFoundException e) {
617      String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
618      LOG.error(msg);
619      throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
620    } catch (IOException e) {
621      throw new SnapshotCreationException(
622          "Error while geting table description for table " + snapshot.getTable(), e,
623          ProtobufUtil.createSnapshotDesc(snapshot));
624    }
625    if (desc == null) {
626      throw new SnapshotCreationException(
627          "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
628          ProtobufUtil.createSnapshotDesc(snapshot));
629    }
630    SnapshotDescription.Builder builder = snapshot.toBuilder();
631    // if not specified, set the snapshot format
632    if (!snapshot.hasVersion()) {
633      builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
634    }
635    RpcServer.getRequestUser().ifPresent(user -> {
636      if (User.isHBaseSecurityEnabled(master.getConfiguration())) {
637        builder.setOwner(user.getShortName());
638      }
639    });
640    snapshot = builder.build();
641
642    // call pre coproc hook
643    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
644    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
645    if (cpHost != null) {
646      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
647      cpHost.preSnapshot(snapshotPOJO, desc);
648    }
649
650    // if the table is enabled, then have the RS run actually the snapshot work
651    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
652    if (master.getTableStateManager().isTableState(snapshotTable,
653        TableState.State.ENABLED)) {
654      LOG.debug("Table enabled, starting distributed snapshot.");
655      snapshotEnabledTable(snapshot);
656      LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
657    }
658    // For disabled table, snapshot is created by the master
659    else if (master.getTableStateManager().isTableState(snapshotTable,
660        TableState.State.DISABLED)) {
661      LOG.debug("Table is disabled, running snapshot entirely on master.");
662      snapshotDisabledTable(snapshot);
663      LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
664    } else {
665      LOG.error("Can't snapshot table '" + snapshot.getTable()
666          + "', isn't open or closed, we don't know what to do!");
667      TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
668          + " isn't fully open.");
669      throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
670        ProtobufUtil.createSnapshotDesc(snapshot));
671    }
672
673    // call post coproc hook
674    if (cpHost != null) {
675      cpHost.postSnapshot(snapshotPOJO, desc);
676    }
677  }
678
679  /**
680   * Set the handler for the current snapshot
681   * <p>
682   * Exposed for TESTING
683   * @param tableName
684   * @param handler handler the master should use
685   *
686   * TODO get rid of this if possible, repackaging, modify tests.
687   */
688  public synchronized void setSnapshotHandlerForTesting(
689      final TableName tableName,
690      final SnapshotSentinel handler) {
691    if (handler != null) {
692      this.snapshotHandlers.put(tableName, handler);
693    } else {
694      this.snapshotHandlers.remove(tableName);
695    }
696  }
697
698  /**
699   * @return distributed commit coordinator for all running snapshots
700   */
701  ProcedureCoordinator getCoordinator() {
702    return coordinator;
703  }
704
705  /**
706   * Check to see if the snapshot is one of the currently completed snapshots
707   * Returns true if the snapshot exists in the "completed snapshots folder".
708   *
709   * @param snapshot expected snapshot to check
710   * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
711   *         not stored
712   * @throws IOException if the filesystem throws an unexpected exception,
713   * @throws IllegalArgumentException if snapshot name is invalid.
714   */
715  private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
716    try {
717      final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
718      FileSystem fs = master.getMasterFileSystem().getFileSystem();
719      // check to see if the snapshot already exists
720      return fs.exists(snapshotDir);
721    } catch (IllegalArgumentException iae) {
722      throw new UnknownSnapshotException("Unexpected exception thrown", iae);
723    }
724  }
725
726  /**
727   * Clone the specified snapshot.
728   * The clone will fail if the destination table has a snapshot or restore in progress.
729   *
730   * @param reqSnapshot Snapshot Descriptor from request
731   * @param tableName table to clone
732   * @param snapshot Snapshot Descriptor
733   * @param snapshotTableDesc Table Descriptor
734   * @param nonceKey unique identifier to prevent duplicated RPC
735   * @return procId the ID of the clone snapshot procedure
736   * @throws IOException
737   */
738  private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
739      final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
740      final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
741    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
742    TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
743    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
744    if (cpHost != null) {
745      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
746      cpHost.preCloneSnapshot(snapshotPOJO, htd);
747    }
748    long procId;
749    try {
750      procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl);
751    } catch (IOException e) {
752      LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
753        + " as table " + tableName.getNameAsString(), e);
754      throw e;
755    }
756    LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
757
758    if (cpHost != null) {
759      cpHost.postCloneSnapshot(snapshotPOJO, htd);
760    }
761    return procId;
762  }
763
764  /**
765   * Clone the specified snapshot into a new table.
766   * The operation will fail if the destination table has a snapshot or restore in progress.
767   *
768   * @param snapshot Snapshot Descriptor
769   * @param tableDescriptor Table Descriptor of the table to create
770   * @param nonceKey unique identifier to prevent duplicated RPC
771   * @return procId the ID of the clone snapshot procedure
772   */
773  synchronized long cloneSnapshot(final SnapshotDescription snapshot,
774      final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
775      throws HBaseSnapshotException {
776    TableName tableName = tableDescriptor.getTableName();
777
778    // make sure we aren't running a snapshot on the same table
779    if (isTakingSnapshot(tableName)) {
780      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
781    }
782
783    // make sure we aren't running a restore on the same table
784    if (isRestoringTable(tableName)) {
785      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
786    }
787
788    try {
789      long procId = master.getMasterProcedureExecutor().submitProcedure(
790        new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
791                tableDescriptor, snapshot, restoreAcl),
792        nonceKey);
793      this.restoreTableToProcIdMap.put(tableName, procId);
794      return procId;
795    } catch (Exception e) {
796      String msg = "Couldn't clone the snapshot="
797        + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
798      LOG.error(msg, e);
799      throw new RestoreSnapshotException(msg, e);
800    }
801  }
802
803  /**
804   * Restore or Clone the specified snapshot
805   * @param reqSnapshot
806   * @param nonceKey unique identifier to prevent duplicated RPC
807   * @throws IOException
808   */
809  public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
810      final boolean restoreAcl) throws IOException {
811    FileSystem fs = master.getMasterFileSystem().getFileSystem();
812    Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
813
814    // check if the snapshot exists
815    if (!fs.exists(snapshotDir)) {
816      LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
817      throw new SnapshotDoesNotExistException(
818        ProtobufUtil.createSnapshotDesc(reqSnapshot));
819    }
820
821    // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
822    // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
823    // information.
824    SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
825    SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
826        snapshotDir, snapshot);
827    TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
828    TableName tableName = TableName.valueOf(reqSnapshot.getTable());
829
830    // sanity check the new table descriptor
831    TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
832
833    // stop tracking "abandoned" handlers
834    cleanupSentinels();
835
836    // Verify snapshot validity
837    SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
838
839    // Execute the restore/clone operation
840    long procId;
841    if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
842      procId = restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
843        restoreAcl);
844    } else {
845      procId =
846          cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
847    }
848    return procId;
849  }
850
851  /**
852   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
853   * or restore in progress.
854   * @param reqSnapshot Snapshot Descriptor from request
855   * @param tableName table to restore
856   * @param snapshot Snapshot Descriptor
857   * @param snapshotTableDesc Table Descriptor
858   * @param nonceKey unique identifier to prevent duplicated RPC
859   * @param restoreAcl true to restore acl of snapshot
860   * @return procId the ID of the restore snapshot procedure
861   * @throws IOException
862   */
863  private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
864      final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
865      final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
866    MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
867
868    if (master.getTableStateManager().isTableState(
869      TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
870      throw new UnsupportedOperationException("Table '" +
871        TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
872        "perform a restore operation.");
873    }
874
875    // call Coprocessor pre hook
876    org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
877    if (cpHost != null) {
878      snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
879      cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
880    }
881
882    long procId;
883    try {
884      procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
885    } catch (IOException e) {
886      LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
887        + " as table " + tableName.getNameAsString(), e);
888      throw e;
889    }
890    LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
891
892    if (cpHost != null) {
893      cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
894    }
895
896    return procId;
897  }
898
899  /**
900   * Restore the specified snapshot. The restore will fail if the destination table has a snapshot
901   * or restore in progress.
902   * @param snapshot Snapshot Descriptor
903   * @param tableDescriptor Table Descriptor
904   * @param nonceKey unique identifier to prevent duplicated RPC
905   * @param restoreAcl true to restore acl of snapshot
906   * @return procId the ID of the restore snapshot procedure
907   */
908  private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
909      final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
910      throws HBaseSnapshotException {
911    final TableName tableName = tableDescriptor.getTableName();
912
913    // make sure we aren't running a snapshot on the same table
914    if (isTakingSnapshot(tableName)) {
915      throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
916    }
917
918    // make sure we aren't running a restore on the same table
919    if (isRestoringTable(tableName)) {
920      throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
921    }
922
923    try {
924      long procId = master.getMasterProcedureExecutor().submitProcedure(
925        new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
926                tableDescriptor, snapshot, restoreAcl),
927        nonceKey);
928      this.restoreTableToProcIdMap.put(tableName, procId);
929      return procId;
930    } catch (Exception e) {
931      String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
932          snapshot)  +
933          " on table=" + tableName;
934      LOG.error(msg, e);
935      throw new RestoreSnapshotException(msg, e);
936    }
937  }
938
939  /**
940   * Verify if the restore of the specified table is in progress.
941   *
942   * @param tableName table under restore
943   * @return <tt>true</tt> if there is a restore in progress of the specified table.
944   */
945  private synchronized boolean isRestoringTable(final TableName tableName) {
946    Long procId = this.restoreTableToProcIdMap.get(tableName);
947    if (procId == null) {
948      return false;
949    }
950    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
951    if (procExec.isRunning() && !procExec.isFinished(procId)) {
952      return true;
953    } else {
954      this.restoreTableToProcIdMap.remove(tableName);
955      return false;
956    }
957  }
958
959  /**
960   * Return the handler if it is currently live and has the same snapshot target name.
961   * The handler is removed from the sentinels map if completed.
962   * @param sentinels live handlers
963   * @param snapshot snapshot description
964   * @return null if doesn't match, else a live handler.
965   */
966  private synchronized SnapshotSentinel removeSentinelIfFinished(
967      final Map<TableName, SnapshotSentinel> sentinels,
968      final SnapshotDescription snapshot) {
969    if (!snapshot.hasTable()) {
970      return null;
971    }
972
973    TableName snapshotTable = TableName.valueOf(snapshot.getTable());
974    SnapshotSentinel h = sentinels.get(snapshotTable);
975    if (h == null) {
976      return null;
977    }
978
979    if (!h.getSnapshot().getName().equals(snapshot.getName())) {
980      // specified snapshot is to the one currently running
981      return null;
982    }
983
984    // Remove from the "in-progress" list once completed
985    if (h.isFinished()) {
986      sentinels.remove(snapshotTable);
987    }
988
989    return h;
990  }
991
992  /**
993   * Removes "abandoned" snapshot/restore requests.
994   * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
995   * and the in-progress maps are cleaned up when the status of a completed task is requested.
996   * To avoid having sentinels staying around for long time if something client side is failed,
997   * each operation tries to clean up the in-progress maps sentinels finished from a long time.
998   */
999  private void cleanupSentinels() {
1000    cleanupSentinels(this.snapshotHandlers);
1001    cleanupCompletedRestoreInMap();
1002  }
1003
1004  /**
1005   * Remove the sentinels that are marked as finished and the completion time
1006   * has exceeded the removal timeout.
1007   * @param sentinels map of sentinels to clean
1008   */
1009  private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1010    long currentTime = EnvironmentEdgeManager.currentTime();
1011    long sentinelsCleanupTimeoutMillis =
1012        master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1013          SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1014    Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1015    while (it.hasNext()) {
1016      Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1017      SnapshotSentinel sentinel = entry.getValue();
1018      if (sentinel.isFinished()
1019          && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
1020        it.remove();
1021      }
1022    }
1023  }
1024
1025  /**
1026   * Remove the procedures that are marked as finished
1027   */
1028  private synchronized void cleanupCompletedRestoreInMap() {
1029    ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
1030    Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
1031    while (it.hasNext()) {
1032      Map.Entry<TableName, Long> entry = it.next();
1033      Long procId = entry.getValue();
1034      if (procExec.isRunning() && procExec.isFinished(procId)) {
1035        it.remove();
1036      }
1037    }
1038  }
1039
1040  //
1041  // Implementing Stoppable interface
1042  //
1043
1044  @Override
1045  public void stop(String why) {
1046    // short circuit
1047    if (this.stopped) return;
1048    // make sure we get stop
1049    this.stopped = true;
1050    // pass the stop onto take snapshot handlers
1051    for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
1052      snapshotHandler.cancel(why);
1053    }
1054    if (snapshotHandlerChoreCleanerTask != null) {
1055      snapshotHandlerChoreCleanerTask.cancel(true);
1056    }
1057    try {
1058      if (coordinator != null) {
1059        coordinator.close();
1060      }
1061    } catch (IOException e) {
1062      LOG.error("stop ProcedureCoordinator error", e);
1063    }
1064  }
1065
1066  @Override
1067  public boolean isStopped() {
1068    return this.stopped;
1069  }
1070
1071  /**
1072   * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1073   * Called at the beginning of snapshot() and restoreSnapshot() methods.
1074   * @throws UnsupportedOperationException if snapshot are not supported
1075   */
1076  public void checkSnapshotSupport() throws UnsupportedOperationException {
1077    if (!this.isSnapshotSupported) {
1078      throw new UnsupportedOperationException(
1079        "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1080          HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1081    }
1082  }
1083
1084  /**
1085   * Called at startup, to verify if snapshot operation is supported, and to avoid
1086   * starting the master if there're snapshots present but the cleaners needed are missing.
1087   * Otherwise we can end up with snapshot data loss.
1088   * @param conf The {@link Configuration} object to use
1089   * @param mfs The MasterFileSystem to use
1090   * @throws IOException in case of file-system operation failure
1091   * @throws UnsupportedOperationException in case cleaners are missing and
1092   *         there're snapshot in the system
1093   */
1094  private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1095      throws IOException, UnsupportedOperationException {
1096    // Verify if snapshot is disabled by the user
1097    String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1098    boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1099    boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1100
1101    // Extract cleaners from conf
1102    Set<String> hfileCleaners = new HashSet<>();
1103    String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1104    if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1105
1106    Set<String> logCleaners = new HashSet<>();
1107    cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1108    if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1109
1110    // check if an older version of snapshot directory was present
1111    Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1112    FileSystem fs = mfs.getFileSystem();
1113    List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
1114    if (ss != null && !ss.isEmpty()) {
1115      LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1116      LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1117    }
1118
1119    // If the user has enabled the snapshot, we force the cleaners to be present
1120    // otherwise we still need to check if cleaners are enabled or not and verify
1121    // that there're no snapshot in the .snapshot folder.
1122    if (snapshotEnabled) {
1123      // Inject snapshot cleaners, if snapshot.enable is true
1124      hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1125      hfileCleaners.add(HFileLinkCleaner.class.getName());
1126
1127      // Set cleaners conf
1128      conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1129        hfileCleaners.toArray(new String[hfileCleaners.size()]));
1130      conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1131        logCleaners.toArray(new String[logCleaners.size()]));
1132    } else {
1133      // Verify if cleaners are present
1134      snapshotEnabled =
1135        hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1136        hfileCleaners.contains(HFileLinkCleaner.class.getName());
1137
1138      // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1139      if (snapshotEnabled) {
1140        LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1141          "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1142          (userDisabled ? "is set to 'false'." : "is not set."));
1143      }
1144    }
1145
1146    // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1147    this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1148
1149    // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1150    // otherwise we end up with snapshot data loss.
1151    if (!snapshotEnabled) {
1152      LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1153      Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1154      if (fs.exists(snapshotDir)) {
1155        FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1156          new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1157        if (snapshots != null) {
1158          LOG.error("Snapshots are present, but cleaners are not enabled.");
1159          checkSnapshotSupport();
1160        }
1161      }
1162    }
1163  }
1164
1165  @Override
1166  public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1167      IOException, UnsupportedOperationException {
1168    this.master = master;
1169
1170    this.rootDir = master.getMasterFileSystem().getRootDir();
1171    checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1172
1173    // get the configuration for the coordinator
1174    Configuration conf = master.getConfiguration();
1175    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1176    long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1177                    SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1178            conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1179                    SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1180    int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1181
1182    // setup the default procedure coordinator
1183    String name = master.getServerName().toString();
1184    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1185    ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
1186        master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1187
1188    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1189    this.executorService = master.getExecutorService();
1190    resetTempDir();
1191    snapshotHandlerChoreCleanerTask =
1192        scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
1193  }
1194
1195  @Override
1196  public String getProcedureSignature() {
1197    return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1198  }
1199
1200  @Override
1201  public void execProcedure(ProcedureDescription desc) throws IOException {
1202    takeSnapshot(toSnapshotDescription(desc));
1203  }
1204
1205  @Override
1206  public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
1207      throws IOException {
1208    // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
1209    // In future, when we AC is removed for good, that check should be moved here.
1210  }
1211
1212  @Override
1213  public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1214    return isSnapshotDone(toSnapshotDescription(desc));
1215  }
1216
1217  private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1218      throws IOException {
1219    SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1220    if (!desc.hasInstance()) {
1221      throw new IOException("Snapshot name is not defined: " + desc.toString());
1222    }
1223    String snapshotName = desc.getInstance();
1224    List<NameStringPair> props = desc.getConfigurationList();
1225    String table = null;
1226    for (NameStringPair prop : props) {
1227      if ("table".equalsIgnoreCase(prop.getName())) {
1228        table = prop.getValue();
1229      }
1230    }
1231    if (table == null) {
1232      throw new IOException("Snapshot table is not defined: " + desc.toString());
1233    }
1234    TableName tableName = TableName.valueOf(table);
1235    builder.setTable(tableName.getNameAsString());
1236    builder.setName(snapshotName);
1237    builder.setType(SnapshotDescription.Type.FLUSH);
1238    return builder.build();
1239  }
1240}