1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.HTableDescriptor;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.catalog.MetaReader;
45  import org.apache.hadoop.hbase.errorhandling.ForeignException;
46  import org.apache.hadoop.hbase.exceptions.HBaseSnapshotException;
47  import org.apache.hadoop.hbase.exceptions.RestoreSnapshotException;
48  import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
49  import org.apache.hadoop.hbase.exceptions.SnapshotDoesNotExistException;
50  import org.apache.hadoop.hbase.exceptions.SnapshotExistsException;
51  import org.apache.hadoop.hbase.exceptions.TablePartiallyOpenException;
52  import org.apache.hadoop.hbase.exceptions.UnknownSnapshotException;
53  import org.apache.hadoop.hbase.executor.ExecutorService;
54  import org.apache.hadoop.hbase.master.AssignmentManager;
55  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
56  import org.apache.hadoop.hbase.master.MasterFileSystem;
57  import org.apache.hadoop.hbase.master.MasterServices;
58  import org.apache.hadoop.hbase.master.MetricsMaster;
59  import org.apache.hadoop.hbase.master.SnapshotSentinel;
60  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
61  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
62  import org.apache.hadoop.hbase.procedure.Procedure;
63  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
64  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
65  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
67  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
68  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
69  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
70  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
71  import org.apache.hadoop.hbase.util.Bytes;
72  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
73  import org.apache.hadoop.hbase.util.FSTableDescriptors;
74  import org.apache.hadoop.hbase.util.FSUtils;
75  import org.apache.zookeeper.KeeperException;
76  
77  /**
78   * This class manages the procedure of taking and restoring snapshots. There is only one
79   * SnapshotManager for the master.
80   * <p>
81   * The class provides methods for monitoring in-progress snapshot actions.
82   * <p>
83   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
84   * simplification in the current implementation.
85   */
86  @InterfaceAudience.Private
87  @InterfaceStability.Unstable
88  public class SnapshotManager implements Stoppable {
89    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
90  
91    /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
92    private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
93  
94    /**
95     * Wait time before removing a finished sentinel from the in-progress map
96     *
97     * NOTE: This is used as a safety auto cleanup.
98     * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
99     * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
100    * In case something fails on the client side and the snapshot/restore state is not reclaimed
101    * after a default timeout, the entry is removed from the in-progress map.
102    * At this point, if the user asks for the snapshot/restore status, the result will be
103    * snapshot done if exists or failed if it doesn't exists.
104    */
105   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
106 
107   /** Enable or disable snapshot support */
108   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
109 
110   /**
111    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
112    * completion.
113    */
114   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
115 
116   /** By default, check to see if the snapshot is complete (ms) */
117   private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
118 
119   /**
120    * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
121    * completion.
122    */
123   private static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
124 
125   /** Name of the operation to use in the controller */
126   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
127 
128   /** Conf key for # of threads used by the SnapshotManager thread pool */
129   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
130 
131   /** number of current operations running on the master */
132   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
133 
134   private boolean stopped;
135   private final long wakeFrequency;
136   private final MasterServices master;  // Needed by TableEventHandlers
137   private final MetricsMaster metricsMaster;
138   private final ProcedureCoordinator coordinator;
139 
140   // Is snapshot feature enabled?
141   private boolean isSnapshotSupported = false;
142 
143   // Snapshot handlers map, with table name as key.
144   // The map is always accessed and modified under the object lock using synchronized.
145   // snapshotTable() will insert an Handler in the table.
146   // isSnapshotDone() will remove the handler requested if the operation is finished.
147   private Map<String, SnapshotSentinel> snapshotHandlers = new HashMap<String, SnapshotSentinel>();
148 
149   // Restore Sentinels map, with table name as key.
150   // The map is always accessed and modified under the object lock using synchronized.
151   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
152   // isRestoreDone() will remove the handler requested if the operation is finished.
153   private Map<String, SnapshotSentinel> restoreHandlers = new HashMap<String, SnapshotSentinel>();
154 
155   private final Path rootDir;
156   private final ExecutorService executorService;
157 
158   /**
159    * Construct a snapshot manager.
160    * @param master
161    */
162   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster)
163       throws KeeperException, IOException, UnsupportedOperationException {
164     this.master = master;
165     this.metricsMaster = metricsMaster;
166 
167     this.rootDir = master.getMasterFileSystem().getRootDir();
168     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
169 
170     // get the configuration for the coordinator
171     Configuration conf = master.getConfiguration();
172     this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
173     long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
174     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
175 
176     // setup the default procedure coordinator
177     String name = master.getServerName().toString();
178     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads, wakeFrequency);
179     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
180         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
181     this.coordinator = new ProcedureCoordinator(comms, tpool);
182     this.executorService = master.getExecutorService();
183     resetTempDir();
184   }
185 
186   /**
187    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
188    * @param master services for the master where the manager is running
189    * @param coordinator procedure coordinator instance.  exposed for testing.
190    * @param pool HBase ExecutorServcie instance, exposed for testing.
191    */
192   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
193       ProcedureCoordinator coordinator, ExecutorService pool)
194       throws IOException, UnsupportedOperationException {
195     this.master = master;
196     this.metricsMaster = metricsMaster;
197 
198     this.rootDir = master.getMasterFileSystem().getRootDir();
199     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
200 
201     this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
202       SNAPSHOT_WAKE_MILLIS_DEFAULT);
203     this.coordinator = coordinator;
204     this.executorService = pool;
205     resetTempDir();
206   }
207 
208   /**
209    * Gets the list of all completed snapshots.
210    * @return list of SnapshotDescriptions
211    * @throws IOException File system exception
212    */
213   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
214     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
215   }
216 
217   /**
218    * Gets the list of all completed snapshots.
219    * @param snapshotDir snapshot directory
220    * @return list of SnapshotDescriptions
221    * @throws IOException File system exception
222    */
223   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
224     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
225     // first create the snapshot root path and check to see if it exists
226     FileSystem fs = master.getMasterFileSystem().getFileSystem();
227     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
228 
229     // if there are no snapshots, return an empty list
230     if (!fs.exists(snapshotDir)) {
231       return snapshotDescs;
232     }
233 
234     // ignore all the snapshots in progress
235     FileStatus[] snapshots = fs.listStatus(snapshotDir,
236       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
237     // loop through all the completed snapshots
238     for (FileStatus snapshot : snapshots) {
239       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
240       // if the snapshot is bad
241       if (!fs.exists(info)) {
242         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
243         continue;
244       }
245       FSDataInputStream in = null;
246       try {
247         in = fs.open(info);
248         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
249         snapshotDescs.add(desc);
250       } catch (IOException e) {
251         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
252       } finally {
253         if (in != null) {
254           in.close();
255         }
256       }
257     }
258     return snapshotDescs;
259   }
260 
261   /**
262    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
263    * snapshot attempts.
264    *
265    * @throws IOException if we can't reach the filesystem
266    */
267   void resetTempDir() throws IOException {
268     // cleanup any existing snapshots.
269     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
270     if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
271       LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
272     }
273   }
274 
275   /**
276    * Delete the specified snapshot
277    * @param snapshot
278    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
279    * @throws IOException For filesystem IOExceptions
280    */
281   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
282 
283     // call coproc pre hook
284     MasterCoprocessorHost cpHost = master.getCoprocessorHost();
285     if (cpHost != null) {
286       cpHost.preDeleteSnapshot(snapshot);
287     }
288 
289     // check to see if it is completed
290     if (!isSnapshotCompleted(snapshot)) {
291       throw new SnapshotDoesNotExistException(snapshot);
292     }
293 
294     String snapshotName = snapshot.getName();
295     LOG.debug("Deleting snapshot: " + snapshotName);
296     // first create the snapshot description and check to see if it exists
297     MasterFileSystem fs = master.getMasterFileSystem();
298     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
299 
300     // delete the existing snapshot
301     if (!fs.getFileSystem().delete(snapshotDir, true)) {
302       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
303     }
304 
305     // call coproc post hook
306     if (cpHost != null) {
307       cpHost.postDeleteSnapshot(snapshot);
308     }
309 
310   }
311 
312   /**
313    * Check if the specified snapshot is done
314    *
315    * @param expected
316    * @return true if snapshot is ready to be restored, false if it is still being taken.
317    * @throws IOException IOException if error from HDFS or RPC
318    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
319    */
320   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
321     // check the request to make sure it has a snapshot
322     if (expected == null) {
323       throw new UnknownSnapshotException(
324          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
325     }
326 
327     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
328 
329     // check to see if the sentinel exists,
330     // and if the task is complete removes it from the in-progress snapshots map.
331     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
332 
333     // stop tracking "abandoned" handlers
334     cleanupSentinels();
335 
336     if (handler == null) {
337       // If there's no handler in the in-progress map, it means one of the following:
338       //   - someone has already requested the snapshot state
339       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
340       //   - the snapshot was never requested
341       // In those cases returns to the user the "done state" if the snapshots exists on disk,
342       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
343       if (!isSnapshotCompleted(expected)) {
344         throw new UnknownSnapshotException("Snapshot " + ssString
345             + " is not currently running or one of the known completed snapshots.");
346       }
347       // was done, return true;
348       return true;
349     }
350 
351     // pass on any failure we find in the sentinel
352     try {
353       handler.rethrowExceptionIfFailed();
354     } catch (ForeignException e) {
355       // Give some procedure info on an exception.
356       String status;
357       Procedure p = coordinator.getProcedure(expected.getName());
358       if (p != null) {
359         status = p.getStatus();
360       } else {
361         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
362       }
363       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
364           expected);
365     }
366 
367     // check to see if we are done
368     if (handler.isFinished()) {
369       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
370       return true;
371     } else if (LOG.isDebugEnabled()) {
372       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
373     }
374     return false;
375   }
376 
377   /**
378    * Check to see if the specified table has a snapshot in progress.  Currently we have a
379    * limitation only allowing a single snapshot per table at a time.
380    * @param tableName name of the table being snapshotted.
381    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
382    */
383   synchronized boolean isTakingSnapshot(final String tableName) {
384     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
385     return handler != null && !handler.isFinished();
386   }
387 
388   /**
389    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
390    * aren't already running a snapshot or restore on the requested table.
391    * @param snapshot description of the snapshot we want to start
392    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
393    */
394   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
395       throws HBaseSnapshotException {
396     FileSystem fs = master.getMasterFileSystem().getFileSystem();
397     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
398 
399     // make sure we aren't already running a snapshot
400     if (isTakingSnapshot(snapshot.getTable())) {
401       SnapshotSentinel handler = this.snapshotHandlers.get(snapshot.getTable());
402       throw new SnapshotCreationException("Rejected taking "
403           + ClientSnapshotDescriptionUtils.toString(snapshot)
404           + " because we are already running another snapshot "
405           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
406     }
407 
408     // make sure we aren't running a restore on the same table
409     if (isRestoringTable(snapshot.getTable())) {
410       SnapshotSentinel handler = restoreHandlers.get(snapshot.getTable());
411       throw new SnapshotCreationException("Rejected taking "
412           + ClientSnapshotDescriptionUtils.toString(snapshot)
413           + " because we are already have a restore in progress on the same snapshot "
414           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
415     }
416 
417     try {
418       // delete the working directory, since we aren't running the snapshot. Likely leftovers
419       // from a failed attempt.
420       fs.delete(workingDir, true);
421 
422       // recreate the working directory for the snapshot
423       if (!fs.mkdirs(workingDir)) {
424         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
425             + ") for snapshot" , snapshot);
426       }
427     } catch (HBaseSnapshotException e) {
428       throw e;
429     } catch (IOException e) {
430       throw new SnapshotCreationException(
431           "Exception while checking to see if snapshot could be started.", e, snapshot);
432     }
433   }
434 
435   /**
436    * Take a snapshot of a disabled table.
437    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
438    * @throws HBaseSnapshotException if the snapshot could not be started
439    */
440   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
441       throws HBaseSnapshotException {
442     // setup the snapshot
443     prepareToTakeSnapshot(snapshot);
444 
445     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
446     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
447 
448     // Take the snapshot of the disabled table
449     DisabledTableSnapshotHandler handler =
450         new DisabledTableSnapshotHandler(snapshot, master, metricsMaster);
451     snapshotTable(snapshot, handler);
452   }
453 
454   /**
455    * Take a snapshot of an enabled table.
456    * @param snapshot description of the snapshot to take.
457    * @throws HBaseSnapshotException if the snapshot could not be started
458    */
459   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
460       throws HBaseSnapshotException {
461     // setup the snapshot
462     prepareToTakeSnapshot(snapshot);
463 
464     // Take the snapshot of the enabled table
465     EnabledTableSnapshotHandler handler =
466         new EnabledTableSnapshotHandler(snapshot, master, this, metricsMaster);
467     snapshotTable(snapshot, handler);
468   }
469 
470   /**
471    * Take a snapshot using the specified handler.
472    * On failure the snapshot temporary working directory is removed.
473    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
474    *       snapshot request if the table is busy with another snapshot/restore operation.
475    * @param snapshot the snapshot description
476    * @param handler the snapshot handler
477    */
478   private synchronized void snapshotTable(SnapshotDescription snapshot,
479       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
480     try {
481       handler.prepare();
482       this.executorService.submit(handler);
483       this.snapshotHandlers.put(snapshot.getTable(), handler);
484     } catch (Exception e) {
485       // cleanup the working directory by trying to delete it from the fs.
486       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
487       try {
488         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
489           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
490               ClientSnapshotDescriptionUtils.toString(snapshot));
491         }
492       } catch (IOException e1) {
493         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
494             ClientSnapshotDescriptionUtils.toString(snapshot));
495       }
496       // fail the snapshot
497       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
498     }
499   }
500 
501   /**
502    * Take a snapshot based on the enabled/disabled state of the table.
503    *
504    * @param snapshot
505    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
506    * @throws IOException when some sort of generic IO exception occurs.
507    */
508   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
509     // check to see if we already completed the snapshot
510     if (isSnapshotCompleted(snapshot)) {
511       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
512           + "' already stored on the filesystem.", snapshot);
513     }
514 
515     LOG.debug("No existing snapshot, attempting snapshot...");
516 
517     // stop tracking "abandoned" handlers
518     cleanupSentinels();
519 
520     // check to see if the table exists
521     HTableDescriptor desc = null;
522     try {
523       desc = master.getTableDescriptors().get(snapshot.getTable());
524     } catch (FileNotFoundException e) {
525       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
526       LOG.error(msg);
527       throw new SnapshotCreationException(msg, e, snapshot);
528     } catch (IOException e) {
529       throw new SnapshotCreationException("Error while geting table description for table "
530           + snapshot.getTable(), e, snapshot);
531     }
532     if (desc == null) {
533       throw new SnapshotCreationException("Table '" + snapshot.getTable()
534           + "' doesn't exist, can't take snapshot.", snapshot);
535     }
536 
537     // set the snapshot version, now that we are ready to take it
538     snapshot = snapshot.toBuilder().setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
539         .build();
540 
541     // call pre coproc hook
542     MasterCoprocessorHost cpHost = master.getCoprocessorHost();
543     if (cpHost != null) {
544       cpHost.preSnapshot(snapshot, desc);
545     }
546 
547     // if the table is enabled, then have the RS run actually the snapshot work
548     AssignmentManager assignmentMgr = master.getAssignmentManager();
549     if (assignmentMgr.getZKTable().isEnabledTable(snapshot.getTable())) {
550       LOG.debug("Table enabled, starting distributed snapshot.");
551       snapshotEnabledTable(snapshot);
552       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
553     }
554     // For disabled table, snapshot is created by the master
555     else if (assignmentMgr.getZKTable().isDisabledTable(snapshot.getTable())) {
556       LOG.debug("Table is disabled, running snapshot entirely on master.");
557       snapshotDisabledTable(snapshot);
558       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
559     } else {
560       LOG.error("Can't snapshot table '" + snapshot.getTable()
561           + "', isn't open or closed, we don't know what to do!");
562       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
563           + " isn't fully open.");
564       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
565     }
566 
567     // call post coproc hook
568     if (cpHost != null) {
569       cpHost.postSnapshot(snapshot, desc);
570     }
571   }
572 
573   /**
574    * Set the handler for the current snapshot
575    * <p>
576    * Exposed for TESTING
577    * @param tableName
578    * @param handler handler the master should use
579    *
580    * TODO get rid of this if possible, repackaging, modify tests.
581    */
582   public synchronized void setSnapshotHandlerForTesting(final String tableName,
583       final SnapshotSentinel handler) {
584     if (handler != null) {
585       this.snapshotHandlers.put(tableName, handler);
586     } else {
587       this.snapshotHandlers.remove(tableName);
588     }
589   }
590 
591   /**
592    * @return distributed commit coordinator for all running snapshots
593    */
594   ProcedureCoordinator getCoordinator() {
595     return coordinator;
596   }
597 
598   /**
599    * Check to see if the snapshot is one of the currently completed snapshots
600    * Returns true if the snapshot exists in the "completed snapshots folder".
601    *
602    * @param snapshot expected snapshot to check
603    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
604    *         not stored
605    * @throws IOException if the filesystem throws an unexpected exception,
606    * @throws IllegalArgumentException if snapshot name is invalid.
607    */
608   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
609     try {
610       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
611       FileSystem fs = master.getMasterFileSystem().getFileSystem();
612 
613       // check to see if the snapshot already exists
614       return fs.exists(snapshotDir);
615     } catch (IllegalArgumentException iae) {
616       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
617     }
618   }
619 
620   /**
621    * Clone the specified snapshot into a new table.
622    * The operation will fail if the destination table has a snapshot or restore in progress.
623    *
624    * @param snapshot Snapshot Descriptor
625    * @param hTableDescriptor Table Descriptor of the table to create
626    */
627   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
628       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
629     String tableName = hTableDescriptor.getNameAsString();
630 
631     // make sure we aren't running a snapshot on the same table
632     if (isTakingSnapshot(tableName)) {
633       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
634     }
635 
636     // make sure we aren't running a restore on the same table
637     if (isRestoringTable(tableName)) {
638       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
639     }
640 
641     try {
642       CloneSnapshotHandler handler =
643         new CloneSnapshotHandler(master, snapshot, hTableDescriptor, metricsMaster).prepare();
644       this.executorService.submit(handler);
645       this.restoreHandlers.put(tableName, handler);
646     } catch (Exception e) {
647       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
648         " on table=" + tableName;
649       LOG.error(msg, e);
650       throw new RestoreSnapshotException(msg, e);
651     }
652   }
653 
654   /**
655    * Restore the specified snapshot
656    * @param reqSnapshot
657    * @throws IOException
658    */
659   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
660     FileSystem fs = master.getMasterFileSystem().getFileSystem();
661     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
662     MasterCoprocessorHost cpHost = master.getCoprocessorHost();
663 
664     // check if the snapshot exists
665     if (!fs.exists(snapshotDir)) {
666       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
667       throw new SnapshotDoesNotExistException(reqSnapshot);
668     }
669 
670     // read snapshot information
671     SnapshotDescription fsSnapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
672     HTableDescriptor snapshotTableDesc = FSTableDescriptors.getTableDescriptor(fs, snapshotDir);
673     String tableName = reqSnapshot.getTable();
674 
675     // stop tracking "abandoned" handlers
676     cleanupSentinels();
677 
678     // Execute the restore/clone operation
679     if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) {
680       if (master.getAssignmentManager().getZKTable().isEnabledTable(fsSnapshot.getTable())) {
681         throw new UnsupportedOperationException("Table '" +
682           fsSnapshot.getTable() + "' must be disabled in order to perform a restore operation.");
683       }
684 
685       // call coproc pre hook
686       if (cpHost != null) {
687         cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
688       }
689       restoreSnapshot(fsSnapshot, snapshotTableDesc);
690       LOG.info("Restore snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
691 
692       if (cpHost != null) {
693         cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
694       }
695     } else {
696       HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc,
697                                                          Bytes.toBytes(tableName));
698       if (cpHost != null) {
699         cpHost.preCloneSnapshot(reqSnapshot, htd);
700       }
701       cloneSnapshot(fsSnapshot, htd);
702       LOG.info("Clone snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
703 
704       if (cpHost != null) {
705         cpHost.postCloneSnapshot(reqSnapshot, htd);
706       }
707     }
708   }
709 
710   /**
711    * Restore the specified snapshot.
712    * The restore will fail if the destination table has a snapshot or restore in progress.
713    *
714    * @param snapshot Snapshot Descriptor
715    * @param hTableDescriptor Table Descriptor
716    */
717   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
718       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
719     String tableName = hTableDescriptor.getNameAsString();
720 
721     // make sure we aren't running a snapshot on the same table
722     if (isTakingSnapshot(tableName)) {
723       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
724     }
725 
726     // make sure we aren't running a restore on the same table
727     if (isRestoringTable(tableName)) {
728       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
729     }
730 
731     try {
732       RestoreSnapshotHandler handler =
733         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor, metricsMaster).prepare();
734       this.executorService.submit(handler);
735       restoreHandlers.put(hTableDescriptor.getNameAsString(), handler);
736     } catch (Exception e) {
737       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
738           snapshot)  +
739           " on table=" + tableName;
740       LOG.error(msg, e);
741       throw new RestoreSnapshotException(msg, e);
742     }
743   }
744 
745   /**
746    * Verify if the restore of the specified table is in progress.
747    *
748    * @param tableName table under restore
749    * @return <tt>true</tt> if there is a restore in progress of the specified table.
750    */
751   private synchronized boolean isRestoringTable(final String tableName) {
752     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
753     return(sentinel != null && !sentinel.isFinished());
754   }
755 
756   /**
757    * Returns the status of a restore operation.
758    * If the in-progress restore is failed throws the exception that caused the failure.
759    *
760    * @param snapshot
761    * @return false if in progress, true if restore is completed or not requested.
762    * @throws IOException if there was a failure during the restore
763    */
764   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
765     // check to see if the sentinel exists,
766     // and if the task is complete removes it from the in-progress restore map.
767     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
768 
769     // stop tracking "abandoned" handlers
770     cleanupSentinels();
771 
772     if (sentinel == null) {
773       // there is no sentinel so restore is not in progress.
774       return true;
775     }
776 
777     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
778         + sentinel.getSnapshot().getName() + " table=" + snapshot.getTable());
779 
780     // If the restore is failed, rethrow the exception
781     sentinel.rethrowExceptionIfFailed();
782 
783     // check to see if we are done
784     if (sentinel.isFinished()) {
785       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
786           " has completed. Notifying the client.");
787       return true;
788     }
789 
790     if (LOG.isDebugEnabled()) {
791       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
792           ClientSnapshotDescriptionUtils.toString(snapshot));
793     }
794     return false;
795   }
796 
797   /**
798    * Return the handler if it is currently live and has the same snapshot target name.
799    * The handler is removed from the sentinels map if completed.
800    * @param sentinels live handlers
801    * @param snapshot snapshot description
802    * @return null if doesn't match, else a live handler.
803    */
804   private synchronized SnapshotSentinel removeSentinelIfFinished(
805       final Map<String, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) {
806     SnapshotSentinel h = sentinels.get(snapshot.getTable());
807     if (h == null) {
808       return null;
809     }
810 
811     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
812       // specified snapshot is to the one currently running
813       return null;
814     }
815 
816     // Remove from the "in-progress" list once completed
817     if (h.isFinished()) {
818       sentinels.remove(snapshot.getTable());
819     }
820 
821     return h;
822   }
823 
824   /**
825    * Removes "abandoned" snapshot/restore requests.
826    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
827    * and the in-progress maps are cleaned up when the status of a completed task is requested.
828    * To avoid having sentinels staying around for long time if something client side is failed,
829    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
830    */
831   private void cleanupSentinels() {
832     cleanupSentinels(this.snapshotHandlers);
833     cleanupSentinels(this.restoreHandlers);
834   }
835 
836   /**
837    * Remove the sentinels that are marked as finished and the completion time
838    * has exceeded the removal timeout.
839    * @param sentinels map of sentinels to clean
840    */
841   private synchronized void cleanupSentinels(final Map<String, SnapshotSentinel> sentinels) {
842     long currentTime = EnvironmentEdgeManager.currentTimeMillis();
843     Iterator<Map.Entry<String, SnapshotSentinel>> it = sentinels.entrySet().iterator();
844     while (it.hasNext()) {
845       Map.Entry<String, SnapshotSentinel> entry = it.next();
846       SnapshotSentinel sentinel = entry.getValue();
847       if (sentinel.isFinished() &&
848           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
849       {
850         it.remove();
851       }
852     }
853   }
854 
855   //
856   // Implementing Stoppable interface
857   //
858 
859   @Override
860   public void stop(String why) {
861     // short circuit
862     if (this.stopped) return;
863     // make sure we get stop
864     this.stopped = true;
865     // pass the stop onto take snapshot handlers
866     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
867       snapshotHandler.cancel(why);
868     }
869 
870     // pass the stop onto all the restore handlers
871     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
872       restoreHandler.cancel(why);
873     }
874   }
875 
876   @Override
877   public boolean isStopped() {
878     return this.stopped;
879   }
880 
881   /**
882    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
883    * Called at the beginning of snapshot() and restoreSnapshot() methods.
884    * @throws UnsupportedOperationException if snapshot are not supported
885    */
886   public void checkSnapshotSupport() throws UnsupportedOperationException {
887     if (!this.isSnapshotSupported) {
888       throw new UnsupportedOperationException(
889         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
890           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
891     }
892   }
893 
894   /**
895    * Called at startup, to verify if snapshot operation is supported, and to avoid
896    * starting the master if there're snapshots present but the cleaners needed are missing.
897    * Otherwise we can end up with snapshot data loss.
898    * @param conf The {@link Configuration} object to use
899    * @param mfs The MasterFileSystem to use
900    * @throws IOException in case of file-system operation failure
901    * @throws UnsupportedOperationException in case cleaners are missing and
902    *         there're snapshot in the system
903    */
904   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
905       throws IOException, UnsupportedOperationException {
906     // Verify if snapshot is disabled by the user
907     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
908     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
909     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
910 
911     // Extract cleaners from conf
912     Set<String> hfileCleaners = new HashSet<String>();
913     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
914     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
915 
916     Set<String> logCleaners = new HashSet<String>();
917     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
918     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
919 
920     // check if an older version of snapshot directory was present
921     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
922     FileSystem fs = mfs.getFileSystem();
923     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
924     if (ss != null && !ss.isEmpty()) {
925       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
926       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
927     }
928 
929     // If the user has enabled the snapshot, we force the cleaners to be present
930     // otherwise we still need to check if cleaners are enabled or not and verify
931     // that there're no snapshot in the .snapshot folder.
932     if (snapshotEnabled) {
933       // Inject snapshot cleaners, if snapshot.enable is true
934       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
935       hfileCleaners.add(HFileLinkCleaner.class.getName());
936       logCleaners.add(SnapshotLogCleaner.class.getName());
937 
938       // Set cleaners conf
939       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
940         hfileCleaners.toArray(new String[hfileCleaners.size()]));
941       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
942         logCleaners.toArray(new String[logCleaners.size()]));
943     } else {
944       // Verify if cleaners are present
945       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
946         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
947         hfileCleaners.contains(HFileLinkCleaner.class.getName());
948 
949       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
950       if (snapshotEnabled) {
951         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
952           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
953           (userDisabled ? "is set to 'false'." : "is not set."));
954       }
955     }
956 
957     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
958     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
959 
960     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
961     // otherwise we end up with snapshot data loss.
962     if (!snapshotEnabled) {
963       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
964       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
965       if (fs.exists(snapshotDir)) {
966         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
967           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
968         if (snapshots != null) {
969           LOG.error("Snapshots are present, but cleaners are not enabled.");
970           checkSnapshotSupport();
971         }
972       }
973     }
974   }
975 }