View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.HTableDescriptor;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.catalog.MetaReader;
46  import org.apache.hadoop.hbase.errorhandling.ForeignException;
47  import org.apache.hadoop.hbase.executor.ExecutorService;
48  import org.apache.hadoop.hbase.master.AssignmentManager;
49  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
50  import org.apache.hadoop.hbase.master.MasterFileSystem;
51  import org.apache.hadoop.hbase.master.MasterServices;
52  import org.apache.hadoop.hbase.master.MetricsMaster;
53  import org.apache.hadoop.hbase.master.SnapshotSentinel;
54  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
55  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
56  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
57  import org.apache.hadoop.hbase.procedure.Procedure;
58  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
59  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
60  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
61  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
62  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
65  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
66  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
67  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
68  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
69  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
70  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
71  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
72  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
73  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
74  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
75  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
76  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
77  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
78  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
79  import org.apache.hadoop.hbase.util.FSUtils;
80  import org.apache.zookeeper.KeeperException;
81  
82  /**
83   * This class manages the procedure of taking and restoring snapshots. There is only one
84   * SnapshotManager for the master.
85   * <p>
86   * The class provides methods for monitoring in-progress snapshot actions.
87   * <p>
88   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
89   * simplification in the current implementation.
90   */
91  @InterfaceAudience.Private
92  @InterfaceStability.Unstable
93  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
94    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
95  
96    /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
97    private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
98  
99    /**
100    * Wait time before removing a finished sentinel from the in-progress map
101    *
102    * NOTE: This is used as a safety auto cleanup.
103    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
104    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
105    * In case something fails on the client side and the snapshot/restore state is not reclaimed
106    * after a default timeout, the entry is removed from the in-progress map.
107    * At this point, if the user asks for the snapshot/restore status, the result will be
108    * snapshot done if exists or failed if it doesn't exists.
109    */
110   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
111 
112   /** Enable or disable snapshot support */
113   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
114 
115   /**
116    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
117    * completion.
118    */
119   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
120 
121   /** By default, check to see if the snapshot is complete (ms) */
122   private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
123 
124   /**
125    * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
126    * completion.
127    */
128   private static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
129 
130   /** Name of the operation to use in the controller */
131   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
132 
133   /** Conf key for # of threads used by the SnapshotManager thread pool */
134   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
135 
136   /** number of current operations running on the master */
137   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
138 
139   private boolean stopped;
140   private MasterServices master;  // Needed by TableEventHandlers
141   private ProcedureCoordinator coordinator;
142 
143   // Is snapshot feature enabled?
144   private boolean isSnapshotSupported = false;
145 
146   // Snapshot handlers map, with table name as key.
147   // The map is always accessed and modified under the object lock using synchronized.
148   // snapshotTable() will insert an Handler in the table.
149   // isSnapshotDone() will remove the handler requested if the operation is finished.
150   private Map<TableName, SnapshotSentinel> snapshotHandlers =
151       new HashMap<TableName, SnapshotSentinel>();
152 
153   // Restore Sentinels map, with table name as key.
154   // The map is always accessed and modified under the object lock using synchronized.
155   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
156   // isRestoreDone() will remove the handler requested if the operation is finished.
157   private Map<TableName, SnapshotSentinel> restoreHandlers =
158       new HashMap<TableName, SnapshotSentinel>();
159 
160   private Path rootDir;
161   private ExecutorService executorService;
162 
163   public SnapshotManager() {}
164 
165   /**
166    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
167    * @param master services for the master where the manager is running
168    * @param coordinator procedure coordinator instance.  exposed for testing.
169    * @param pool HBase ExecutorServcie instance, exposed for testing.
170    */
171   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
172       ProcedureCoordinator coordinator, ExecutorService pool)
173       throws IOException, UnsupportedOperationException {
174     this.master = master;
175 
176     this.rootDir = master.getMasterFileSystem().getRootDir();
177     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
178 
179     this.coordinator = coordinator;
180     this.executorService = pool;
181     resetTempDir();
182   }
183 
184   /**
185    * Gets the list of all completed snapshots.
186    * @return list of SnapshotDescriptions
187    * @throws IOException File system exception
188    */
189   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
190     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
191   }
192 
193   /**
194    * Gets the list of all completed snapshots.
195    * @param snapshotDir snapshot directory
196    * @return list of SnapshotDescriptions
197    * @throws IOException File system exception
198    */
199   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
200     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
201     // first create the snapshot root path and check to see if it exists
202     FileSystem fs = master.getMasterFileSystem().getFileSystem();
203     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
204 
205     // if there are no snapshots, return an empty list
206     if (!fs.exists(snapshotDir)) {
207       return snapshotDescs;
208     }
209 
210     // ignore all the snapshots in progress
211     FileStatus[] snapshots = fs.listStatus(snapshotDir,
212       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
213     // loop through all the completed snapshots
214     for (FileStatus snapshot : snapshots) {
215       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
216       // if the snapshot is bad
217       if (!fs.exists(info)) {
218         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
219         continue;
220       }
221       FSDataInputStream in = null;
222       try {
223         in = fs.open(info);
224         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
225         snapshotDescs.add(desc);
226       } catch (IOException e) {
227         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
228       } finally {
229         if (in != null) {
230           in.close();
231         }
232       }
233     }
234     return snapshotDescs;
235   }
236 
237   /**
238    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
239    * snapshot attempts.
240    *
241    * @throws IOException if we can't reach the filesystem
242    */
243   void resetTempDir() throws IOException {
244     // cleanup any existing snapshots.
245     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
246     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
247       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
248         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
249       }
250     }
251   }
252 
253   /**
254    * Delete the specified snapshot
255    * @param snapshot
256    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
257    * @throws IOException For filesystem IOExceptions
258    */
259   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
260 
261     // call coproc pre hook
262     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
263     if (cpHost != null) {
264       cpHost.preDeleteSnapshot(snapshot);
265     }
266 
267     // check to see if it is completed
268     if (!isSnapshotCompleted(snapshot)) {
269       throw new SnapshotDoesNotExistException(snapshot);
270     }
271 
272     String snapshotName = snapshot.getName();
273     LOG.debug("Deleting snapshot: " + snapshotName);
274     // first create the snapshot description and check to see if it exists
275     MasterFileSystem fs = master.getMasterFileSystem();
276     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
277 
278     // delete the existing snapshot
279     if (!fs.getFileSystem().delete(snapshotDir, true)) {
280       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
281     }
282 
283     // call coproc post hook
284     if (cpHost != null) {
285       cpHost.postDeleteSnapshot(snapshot);
286     }
287 
288   }
289 
290   /**
291    * Check if the specified snapshot is done
292    *
293    * @param expected
294    * @return true if snapshot is ready to be restored, false if it is still being taken.
295    * @throws IOException IOException if error from HDFS or RPC
296    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
297    */
298   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
299     // check the request to make sure it has a snapshot
300     if (expected == null) {
301       throw new UnknownSnapshotException(
302          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
303     }
304 
305     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
306 
307     // check to see if the sentinel exists,
308     // and if the task is complete removes it from the in-progress snapshots map.
309     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
310 
311     // stop tracking "abandoned" handlers
312     cleanupSentinels();
313 
314     if (handler == null) {
315       // If there's no handler in the in-progress map, it means one of the following:
316       //   - someone has already requested the snapshot state
317       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
318       //   - the snapshot was never requested
319       // In those cases returns to the user the "done state" if the snapshots exists on disk,
320       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
321       if (!isSnapshotCompleted(expected)) {
322         throw new UnknownSnapshotException("Snapshot " + ssString
323             + " is not currently running or one of the known completed snapshots.");
324       }
325       // was done, return true;
326       return true;
327     }
328 
329     // pass on any failure we find in the sentinel
330     try {
331       handler.rethrowExceptionIfFailed();
332     } catch (ForeignException e) {
333       // Give some procedure info on an exception.
334       String status;
335       Procedure p = coordinator.getProcedure(expected.getName());
336       if (p != null) {
337         status = p.getStatus();
338       } else {
339         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
340       }
341       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
342           expected);
343     }
344 
345     // check to see if we are done
346     if (handler.isFinished()) {
347       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
348       return true;
349     } else if (LOG.isDebugEnabled()) {
350       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
351     }
352     return false;
353   }
354 
355   /**
356    * Check to see if there is a snapshot in progress with the same name or on the same table.
357    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
358    * don't allow snapshot with the same name.
359    * @param snapshot description of the snapshot being checked.
360    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
361    *         table.
362    */
363   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
364     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
365     if (isTakingSnapshot(snapshotTable)) {
366       return true;
367     }
368     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
369     while (it.hasNext()) {
370       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
371       SnapshotSentinel sentinel = entry.getValue();
372       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
373         return true;
374       }
375     }
376     return false;
377   }
378 
379   /**
380    * Check to see if the specified table has a snapshot in progress.  Currently we have a
381    * limitation only allowing a single snapshot per table at a time.
382    * @param tableName name of the table being snapshotted.
383    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
384    */
385   synchronized boolean isTakingSnapshot(final TableName tableName) {
386     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
387     return handler != null && !handler.isFinished();
388   }
389 
390   /**
391    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
392    * aren't already running a snapshot or restore on the requested table.
393    * @param snapshot description of the snapshot we want to start
394    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
395    */
396   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
397       throws HBaseSnapshotException {
398     FileSystem fs = master.getMasterFileSystem().getFileSystem();
399     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
400     TableName snapshotTable =
401         TableName.valueOf(snapshot.getTable());
402 
403     // make sure we aren't already running a snapshot
404     if (isTakingSnapshot(snapshot)) {
405       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
406       throw new SnapshotCreationException("Rejected taking "
407           + ClientSnapshotDescriptionUtils.toString(snapshot)
408           + " because we are already running another snapshot "
409           + (handler != null ? ("on the same table " +
410               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
411               : "with the same name"), snapshot);
412     }
413 
414     // make sure we aren't running a restore on the same table
415     if (isRestoringTable(snapshotTable)) {
416       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
417       throw new SnapshotCreationException("Rejected taking "
418           + ClientSnapshotDescriptionUtils.toString(snapshot)
419           + " because we are already have a restore in progress on the same snapshot "
420           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
421     }
422 
423     try {
424       // delete the working directory, since we aren't running the snapshot. Likely leftovers
425       // from a failed attempt.
426       fs.delete(workingDir, true);
427 
428       // recreate the working directory for the snapshot
429       if (!fs.mkdirs(workingDir)) {
430         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
431             + ") for snapshot" , snapshot);
432       }
433     } catch (HBaseSnapshotException e) {
434       throw e;
435     } catch (IOException e) {
436       throw new SnapshotCreationException(
437           "Exception while checking to see if snapshot could be started.", e, snapshot);
438     }
439   }
440 
441   /**
442    * Take a snapshot of a disabled table.
443    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
444    * @throws HBaseSnapshotException if the snapshot could not be started
445    */
446   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
447       throws HBaseSnapshotException {
448     // setup the snapshot
449     prepareToTakeSnapshot(snapshot);
450 
451     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
452     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
453 
454     // Take the snapshot of the disabled table
455     DisabledTableSnapshotHandler handler =
456         new DisabledTableSnapshotHandler(snapshot, master);
457     snapshotTable(snapshot, handler);
458   }
459 
460   /**
461    * Take a snapshot of an enabled table.
462    * @param snapshot description of the snapshot to take.
463    * @throws HBaseSnapshotException if the snapshot could not be started
464    */
465   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
466       throws HBaseSnapshotException {
467     // setup the snapshot
468     prepareToTakeSnapshot(snapshot);
469 
470     // Take the snapshot of the enabled table
471     EnabledTableSnapshotHandler handler =
472         new EnabledTableSnapshotHandler(snapshot, master, this);
473     snapshotTable(snapshot, handler);
474   }
475 
476   /**
477    * Take a snapshot using the specified handler.
478    * On failure the snapshot temporary working directory is removed.
479    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
480    *       snapshot request if the table is busy with another snapshot/restore operation.
481    * @param snapshot the snapshot description
482    * @param handler the snapshot handler
483    */
484   private synchronized void snapshotTable(SnapshotDescription snapshot,
485       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
486     try {
487       handler.prepare();
488       this.executorService.submit(handler);
489       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
490     } catch (Exception e) {
491       // cleanup the working directory by trying to delete it from the fs.
492       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
493       try {
494         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
495           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
496               ClientSnapshotDescriptionUtils.toString(snapshot));
497         }
498       } catch (IOException e1) {
499         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
500             ClientSnapshotDescriptionUtils.toString(snapshot));
501       }
502       // fail the snapshot
503       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
504     }
505   }
506 
507   /**
508    * Take a snapshot based on the enabled/disabled state of the table.
509    *
510    * @param snapshot
511    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
512    * @throws IOException when some sort of generic IO exception occurs.
513    */
514   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
515     // check to see if we already completed the snapshot
516     if (isSnapshotCompleted(snapshot)) {
517       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
518           + "' already stored on the filesystem.", snapshot);
519     }
520 
521     LOG.debug("No existing snapshot, attempting snapshot...");
522 
523     // stop tracking "abandoned" handlers
524     cleanupSentinels();
525 
526     // check to see if the table exists
527     HTableDescriptor desc = null;
528     try {
529       desc = master.getTableDescriptors().get(
530           TableName.valueOf(snapshot.getTable()));
531     } catch (FileNotFoundException e) {
532       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
533       LOG.error(msg);
534       throw new SnapshotCreationException(msg, e, snapshot);
535     } catch (IOException e) {
536       throw new SnapshotCreationException("Error while geting table description for table "
537           + snapshot.getTable(), e, snapshot);
538     }
539     if (desc == null) {
540       throw new SnapshotCreationException("Table '" + snapshot.getTable()
541           + "' doesn't exist, can't take snapshot.", snapshot);
542     }
543 
544     // if not specified, set the snapshot format
545     if (!snapshot.hasVersion()) {
546       snapshot = snapshot.toBuilder()
547           .setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION)
548           .build();
549     }
550 
551     // call pre coproc hook
552     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
553     if (cpHost != null) {
554       cpHost.preSnapshot(snapshot, desc);
555     }
556 
557     // if the table is enabled, then have the RS run actually the snapshot work
558     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
559     AssignmentManager assignmentMgr = master.getAssignmentManager();
560     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
561         ZooKeeperProtos.Table.State.ENABLED)) {
562       LOG.debug("Table enabled, starting distributed snapshot.");
563       snapshotEnabledTable(snapshot);
564       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
565     }
566     // For disabled table, snapshot is created by the master
567     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
568         ZooKeeperProtos.Table.State.DISABLED)) {
569       LOG.debug("Table is disabled, running snapshot entirely on master.");
570       snapshotDisabledTable(snapshot);
571       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
572     } else {
573       LOG.error("Can't snapshot table '" + snapshot.getTable()
574           + "', isn't open or closed, we don't know what to do!");
575       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
576           + " isn't fully open.");
577       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
578     }
579 
580     // call post coproc hook
581     if (cpHost != null) {
582       cpHost.postSnapshot(snapshot, desc);
583     }
584   }
585 
586   /**
587    * Set the handler for the current snapshot
588    * <p>
589    * Exposed for TESTING
590    * @param tableName
591    * @param handler handler the master should use
592    *
593    * TODO get rid of this if possible, repackaging, modify tests.
594    */
595   public synchronized void setSnapshotHandlerForTesting(
596       final TableName tableName,
597       final SnapshotSentinel handler) {
598     if (handler != null) {
599       this.snapshotHandlers.put(tableName, handler);
600     } else {
601       this.snapshotHandlers.remove(tableName);
602     }
603   }
604 
605   /**
606    * @return distributed commit coordinator for all running snapshots
607    */
608   ProcedureCoordinator getCoordinator() {
609     return coordinator;
610   }
611 
612   /**
613    * Check to see if the snapshot is one of the currently completed snapshots
614    * Returns true if the snapshot exists in the "completed snapshots folder".
615    *
616    * @param snapshot expected snapshot to check
617    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
618    *         not stored
619    * @throws IOException if the filesystem throws an unexpected exception,
620    * @throws IllegalArgumentException if snapshot name is invalid.
621    */
622   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
623     try {
624       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
625       FileSystem fs = master.getMasterFileSystem().getFileSystem();
626       // check to see if the snapshot already exists
627       return fs.exists(snapshotDir);
628     } catch (IllegalArgumentException iae) {
629       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
630     }
631   }
632 
633   /**
634    * Clone the specified snapshot into a new table.
635    * The operation will fail if the destination table has a snapshot or restore in progress.
636    *
637    * @param snapshot Snapshot Descriptor
638    * @param hTableDescriptor Table Descriptor of the table to create
639    */
640   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
641       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
642     TableName tableName = hTableDescriptor.getTableName();
643 
644     // make sure we aren't running a snapshot on the same table
645     if (isTakingSnapshot(tableName)) {
646       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
647     }
648 
649     // make sure we aren't running a restore on the same table
650     if (isRestoringTable(tableName)) {
651       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
652     }
653 
654     try {
655       CloneSnapshotHandler handler =
656         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
657       this.executorService.submit(handler);
658       this.restoreHandlers.put(tableName, handler);
659     } catch (Exception e) {
660       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
661         " on table=" + tableName;
662       LOG.error(msg, e);
663       throw new RestoreSnapshotException(msg, e);
664     }
665   }
666 
667   /**
668    * Restore the specified snapshot
669    * @param reqSnapshot
670    * @throws IOException
671    */
672   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
673     FileSystem fs = master.getMasterFileSystem().getFileSystem();
674     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
675     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
676 
677     // check if the snapshot exists
678     if (!fs.exists(snapshotDir)) {
679       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
680       throw new SnapshotDoesNotExistException(reqSnapshot);
681     }
682 
683     // read snapshot information
684     SnapshotDescription fsSnapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
685     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
686         snapshotDir, fsSnapshot);
687     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
688     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
689 
690     // stop tracking "abandoned" handlers
691     cleanupSentinels();
692 
693     // Verify snapshot validity
694     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
695 
696     // Execute the restore/clone operation
697     if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) {
698       if (master.getAssignmentManager().getTableStateManager().isTableState(
699           TableName.valueOf(fsSnapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
700         throw new UnsupportedOperationException("Table '" +
701             TableName.valueOf(fsSnapshot.getTable()) + "' must be disabled in order to " +
702             "perform a restore operation" +
703             ".");
704       }
705 
706       // call coproc pre hook
707       if (cpHost != null) {
708         cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
709       }
710       restoreSnapshot(fsSnapshot, snapshotTableDesc);
711       LOG.info("Restore snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
712 
713       if (cpHost != null) {
714         cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
715       }
716     } else {
717       HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc, tableName);
718       if (cpHost != null) {
719         cpHost.preCloneSnapshot(reqSnapshot, htd);
720       }
721       cloneSnapshot(fsSnapshot, htd);
722       LOG.info("Clone snapshot=" + fsSnapshot.getName() + " as table=" + tableName);
723 
724       if (cpHost != null) {
725         cpHost.postCloneSnapshot(reqSnapshot, htd);
726       }
727     }
728   }
729 
730   /**
731    * Restore the specified snapshot.
732    * The restore will fail if the destination table has a snapshot or restore in progress.
733    *
734    * @param snapshot Snapshot Descriptor
735    * @param hTableDescriptor Table Descriptor
736    */
737   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
738       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
739     TableName tableName = hTableDescriptor.getTableName();
740 
741     // make sure we aren't running a snapshot on the same table
742     if (isTakingSnapshot(tableName)) {
743       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
744     }
745 
746     // make sure we aren't running a restore on the same table
747     if (isRestoringTable(tableName)) {
748       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
749     }
750 
751     try {
752       RestoreSnapshotHandler handler =
753         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
754       this.executorService.submit(handler);
755       restoreHandlers.put(tableName, handler);
756     } catch (Exception e) {
757       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
758           snapshot)  +
759           " on table=" + tableName;
760       LOG.error(msg, e);
761       throw new RestoreSnapshotException(msg, e);
762     }
763   }
764 
765   /**
766    * Verify if the restore of the specified table is in progress.
767    *
768    * @param tableName table under restore
769    * @return <tt>true</tt> if there is a restore in progress of the specified table.
770    */
771   private synchronized boolean isRestoringTable(final TableName tableName) {
772     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
773     return(sentinel != null && !sentinel.isFinished());
774   }
775 
776   /**
777    * Returns the status of a restore operation.
778    * If the in-progress restore is failed throws the exception that caused the failure.
779    *
780    * @param snapshot
781    * @return false if in progress, true if restore is completed or not requested.
782    * @throws IOException if there was a failure during the restore
783    */
784   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
785     // check to see if the sentinel exists,
786     // and if the task is complete removes it from the in-progress restore map.
787     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
788 
789     // stop tracking "abandoned" handlers
790     cleanupSentinels();
791 
792     if (sentinel == null) {
793       // there is no sentinel so restore is not in progress.
794       return true;
795     }
796 
797     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
798         + sentinel.getSnapshot().getName() + " table=" +
799         TableName.valueOf(snapshot.getTable()));
800 
801     // If the restore is failed, rethrow the exception
802     sentinel.rethrowExceptionIfFailed();
803 
804     // check to see if we are done
805     if (sentinel.isFinished()) {
806       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
807           " has completed. Notifying the client.");
808       return true;
809     }
810 
811     if (LOG.isDebugEnabled()) {
812       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
813           ClientSnapshotDescriptionUtils.toString(snapshot));
814     }
815     return false;
816   }
817 
818   /**
819    * Return the handler if it is currently live and has the same snapshot target name.
820    * The handler is removed from the sentinels map if completed.
821    * @param sentinels live handlers
822    * @param snapshot snapshot description
823    * @return null if doesn't match, else a live handler.
824    */
825   private synchronized SnapshotSentinel removeSentinelIfFinished(
826       final Map<TableName, SnapshotSentinel> sentinels,
827       final SnapshotDescription snapshot) {
828     if (!snapshot.hasTable()) {
829       return null;
830     }
831 
832     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
833     SnapshotSentinel h = sentinels.get(snapshotTable);
834     if (h == null) {
835       return null;
836     }
837 
838     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
839       // specified snapshot is to the one currently running
840       return null;
841     }
842 
843     // Remove from the "in-progress" list once completed
844     if (h.isFinished()) {
845       sentinels.remove(snapshotTable);
846     }
847 
848     return h;
849   }
850 
851   /**
852    * Removes "abandoned" snapshot/restore requests.
853    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
854    * and the in-progress maps are cleaned up when the status of a completed task is requested.
855    * To avoid having sentinels staying around for long time if something client side is failed,
856    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
857    */
858   private void cleanupSentinels() {
859     cleanupSentinels(this.snapshotHandlers);
860     cleanupSentinels(this.restoreHandlers);
861   }
862 
863   /**
864    * Remove the sentinels that are marked as finished and the completion time
865    * has exceeded the removal timeout.
866    * @param sentinels map of sentinels to clean
867    */
868   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
869     long currentTime = EnvironmentEdgeManager.currentTimeMillis();
870     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
871         sentinels.entrySet().iterator();
872     while (it.hasNext()) {
873       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
874       SnapshotSentinel sentinel = entry.getValue();
875       if (sentinel.isFinished() &&
876           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
877       {
878         it.remove();
879       }
880     }
881   }
882 
883   //
884   // Implementing Stoppable interface
885   //
886 
887   @Override
888   public void stop(String why) {
889     // short circuit
890     if (this.stopped) return;
891     // make sure we get stop
892     this.stopped = true;
893     // pass the stop onto take snapshot handlers
894     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
895       snapshotHandler.cancel(why);
896     }
897 
898     // pass the stop onto all the restore handlers
899     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
900       restoreHandler.cancel(why);
901     }
902     try {
903       coordinator.close();
904     } catch (IOException e) {
905       LOG.error("stop ProcedureCoordinator error", e);
906     }
907   }
908 
909   @Override
910   public boolean isStopped() {
911     return this.stopped;
912   }
913 
914   /**
915    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
916    * Called at the beginning of snapshot() and restoreSnapshot() methods.
917    * @throws UnsupportedOperationException if snapshot are not supported
918    */
919   public void checkSnapshotSupport() throws UnsupportedOperationException {
920     if (!this.isSnapshotSupported) {
921       throw new UnsupportedOperationException(
922         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
923           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
924     }
925   }
926 
927   /**
928    * Called at startup, to verify if snapshot operation is supported, and to avoid
929    * starting the master if there're snapshots present but the cleaners needed are missing.
930    * Otherwise we can end up with snapshot data loss.
931    * @param conf The {@link Configuration} object to use
932    * @param mfs The MasterFileSystem to use
933    * @throws IOException in case of file-system operation failure
934    * @throws UnsupportedOperationException in case cleaners are missing and
935    *         there're snapshot in the system
936    */
937   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
938       throws IOException, UnsupportedOperationException {
939     // Verify if snapshot is disabled by the user
940     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
941     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
942     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
943 
944     // Extract cleaners from conf
945     Set<String> hfileCleaners = new HashSet<String>();
946     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
947     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
948 
949     Set<String> logCleaners = new HashSet<String>();
950     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
951     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
952 
953     // check if an older version of snapshot directory was present
954     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
955     FileSystem fs = mfs.getFileSystem();
956     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
957     if (ss != null && !ss.isEmpty()) {
958       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
959       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
960     }
961 
962     // If the user has enabled the snapshot, we force the cleaners to be present
963     // otherwise we still need to check if cleaners are enabled or not and verify
964     // that there're no snapshot in the .snapshot folder.
965     if (snapshotEnabled) {
966       // Inject snapshot cleaners, if snapshot.enable is true
967       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
968       hfileCleaners.add(HFileLinkCleaner.class.getName());
969       logCleaners.add(SnapshotLogCleaner.class.getName());
970 
971       // Set cleaners conf
972       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
973         hfileCleaners.toArray(new String[hfileCleaners.size()]));
974       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
975         logCleaners.toArray(new String[logCleaners.size()]));
976     } else {
977       // Verify if cleaners are present
978       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
979         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
980         hfileCleaners.contains(HFileLinkCleaner.class.getName());
981 
982       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
983       if (snapshotEnabled) {
984         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
985           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
986           (userDisabled ? "is set to 'false'." : "is not set."));
987       }
988     }
989 
990     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
991     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
992 
993     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
994     // otherwise we end up with snapshot data loss.
995     if (!snapshotEnabled) {
996       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
997       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
998       if (fs.exists(snapshotDir)) {
999         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1000           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1001         if (snapshots != null) {
1002           LOG.error("Snapshots are present, but cleaners are not enabled.");
1003           checkSnapshotSupport();
1004         }
1005       }
1006     }
1007   }
1008 
1009   @Override
1010   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1011       IOException, UnsupportedOperationException {
1012     this.master = master;
1013 
1014     this.rootDir = master.getMasterFileSystem().getRootDir();
1015     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1016 
1017     // get the configuration for the coordinator
1018     Configuration conf = master.getConfiguration();
1019     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1020     long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
1021     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1022 
1023     // setup the default procedure coordinator
1024     String name = master.getServerName().toString();
1025     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1026     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1027         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1028 
1029     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1030     this.executorService = master.getExecutorService();
1031     resetTempDir();
1032   }
1033 
1034   @Override
1035   public String getProcedureSignature() {
1036     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1037   }
1038 
1039   @Override
1040   public void execProcedure(ProcedureDescription desc) throws IOException {
1041     takeSnapshot(toSnapshotDescription(desc));
1042   }
1043 
1044   @Override
1045   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1046     return isSnapshotDone(toSnapshotDescription(desc));
1047   }
1048 
1049   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1050       throws IOException {
1051     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1052     if (!desc.hasInstance()) {
1053       throw new IOException("Snapshot name is not defined: " + desc.toString());
1054     }
1055     String snapshotName = desc.getInstance();
1056     List<NameStringPair> props = desc.getConfigurationList();
1057     String table = null;
1058     for (NameStringPair prop : props) {
1059       if ("table".equalsIgnoreCase(prop.getName())) {
1060         table = prop.getValue();
1061       }
1062     }
1063     if (table == null) {
1064       throw new IOException("Snapshot table is not defined: " + desc.toString());
1065     }
1066     TableName tableName = TableName.valueOf(table);
1067     builder.setTable(tableName.getNameAsString());
1068     builder.setName(snapshotName);
1069     builder.setType(SnapshotDescription.Type.FLUSH);
1070     return builder.build();
1071   }
1072 }