View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.errorhandling.ForeignException;
48  import org.apache.hadoop.hbase.executor.ExecutorService;
49  import org.apache.hadoop.hbase.ipc.RpcServer;
50  import org.apache.hadoop.hbase.master.AssignmentManager;
51  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
52  import org.apache.hadoop.hbase.master.MasterFileSystem;
53  import org.apache.hadoop.hbase.master.MasterServices;
54  import org.apache.hadoop.hbase.master.MetricsMaster;
55  import org.apache.hadoop.hbase.master.SnapshotSentinel;
56  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
57  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
58  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
59  import org.apache.hadoop.hbase.procedure.Procedure;
60  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
61  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
62  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
67  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
68  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.security.User;
71  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
72  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
73  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
74  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
75  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
76  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
77  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
78  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
79  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
80  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
81  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
82  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
83  import org.apache.hadoop.hbase.util.FSUtils;
84  import org.apache.zookeeper.KeeperException;
85  
86  /**
87   * This class manages the procedure of taking and restoring snapshots. There is only one
88   * SnapshotManager for the master.
89   * <p>
90   * The class provides methods for monitoring in-progress snapshot actions.
91   * <p>
92   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
93   * simplification in the current implementation.
94   */
95  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
96  @InterfaceStability.Unstable
97  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
98    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
99  
100   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
101   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
102 
103   /**
104    * Wait time before removing a finished sentinel from the in-progress map
105    *
106    * NOTE: This is used as a safety auto cleanup.
107    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
108    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
109    * In case something fails on the client side and the snapshot/restore state is not reclaimed
110    * after a default timeout, the entry is removed from the in-progress map.
111    * At this point, if the user asks for the snapshot/restore status, the result will be
112    * snapshot done if exists or failed if it doesn't exists.
113    */
114   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
115 
116   /** Enable or disable snapshot support */
117   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
118 
119   /**
120    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
121    * completion.
122    */
123   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
124 
125   /** Name of the operation to use in the controller */
126   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
127 
128   /** Conf key for # of threads used by the SnapshotManager thread pool */
129   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
130 
131   /** number of current operations running on the master */
132   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
133 
134   private boolean stopped;
135   private MasterServices master;  // Needed by TableEventHandlers
136   private ProcedureCoordinator coordinator;
137 
138   // Is snapshot feature enabled?
139   private boolean isSnapshotSupported = false;
140 
141   // Snapshot handlers map, with table name as key.
142   // The map is always accessed and modified under the object lock using synchronized.
143   // snapshotTable() will insert an Handler in the table.
144   // isSnapshotDone() will remove the handler requested if the operation is finished.
145   private Map<TableName, SnapshotSentinel> snapshotHandlers =
146       new HashMap<TableName, SnapshotSentinel>();
147 
148   // Restore Sentinels map, with table name as key.
149   // The map is always accessed and modified under the object lock using synchronized.
150   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
151   // isRestoreDone() will remove the handler requested if the operation is finished.
152   private Map<TableName, SnapshotSentinel> restoreHandlers =
153       new HashMap<TableName, SnapshotSentinel>();
154 
155   private Path rootDir;
156   private ExecutorService executorService;
157 
158   public SnapshotManager() {}
159 
160   /**
161    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
162    * @param master services for the master where the manager is running
163    * @param coordinator procedure coordinator instance.  exposed for testing.
164    * @param pool HBase ExecutorServcie instance, exposed for testing.
165    */
166   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
167       ProcedureCoordinator coordinator, ExecutorService pool)
168       throws IOException, UnsupportedOperationException {
169     this.master = master;
170 
171     this.rootDir = master.getMasterFileSystem().getRootDir();
172     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
173 
174     this.coordinator = coordinator;
175     this.executorService = pool;
176     resetTempDir();
177   }
178 
179   /**
180    * Gets the list of all completed snapshots.
181    * @return list of SnapshotDescriptions
182    * @throws IOException File system exception
183    */
184   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
185     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
186   }
187 
188   /**
189    * Gets the list of all completed snapshots.
190    * @param snapshotDir snapshot directory
191    * @return list of SnapshotDescriptions
192    * @throws IOException File system exception
193    */
194   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
195     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
196     // first create the snapshot root path and check to see if it exists
197     FileSystem fs = master.getMasterFileSystem().getFileSystem();
198     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
199 
200     // if there are no snapshots, return an empty list
201     if (!fs.exists(snapshotDir)) {
202       return snapshotDescs;
203     }
204 
205     // ignore all the snapshots in progress
206     FileStatus[] snapshots = fs.listStatus(snapshotDir,
207       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
208     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
209     // loop through all the completed snapshots
210     for (FileStatus snapshot : snapshots) {
211       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
212       // if the snapshot is bad
213       if (!fs.exists(info)) {
214         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
215         continue;
216       }
217       FSDataInputStream in = null;
218       try {
219         in = fs.open(info);
220         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
221         if (cpHost != null) {
222           try {
223             cpHost.preListSnapshot(desc);
224           } catch (AccessDeniedException e) {
225             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
226                 + "Either you should be owner of this snapshot or admin user.");
227             // Skip this and try for next snapshot
228             continue;
229           }
230         }
231         snapshotDescs.add(desc);
232 
233         // call coproc post hook
234         if (cpHost != null) {
235           cpHost.postListSnapshot(desc);
236         }
237       } catch (IOException e) {
238         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
239       } finally {
240         if (in != null) {
241           in.close();
242         }
243       }
244     }
245     return snapshotDescs;
246   }
247 
248   /**
249    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
250    * snapshot attempts.
251    *
252    * @throws IOException if we can't reach the filesystem
253    */
254   void resetTempDir() throws IOException {
255     // cleanup any existing snapshots.
256     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
257     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
258       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
259         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
260       }
261     }
262   }
263 
264   /**
265    * Delete the specified snapshot
266    * @param snapshot
267    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
268    * @throws IOException For filesystem IOExceptions
269    */
270   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
271     // check to see if it is completed
272     if (!isSnapshotCompleted(snapshot)) {
273       throw new SnapshotDoesNotExistException(snapshot);
274     }
275 
276     String snapshotName = snapshot.getName();
277     // first create the snapshot description and check to see if it exists
278     FileSystem fs = master.getMasterFileSystem().getFileSystem();
279     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
280     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
281     // just the "name" and it does not contains the "real" snapshot information
282     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
283 
284     // call coproc pre hook
285     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
286     if (cpHost != null) {
287       cpHost.preDeleteSnapshot(snapshot);
288     }
289 
290     LOG.debug("Deleting snapshot: " + snapshotName);
291     // delete the existing snapshot
292     if (!fs.delete(snapshotDir, true)) {
293       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
294     }
295 
296     // call coproc post hook
297     if (cpHost != null) {
298       cpHost.postDeleteSnapshot(snapshot);
299     }
300 
301   }
302 
303   /**
304    * Check if the specified snapshot is done
305    *
306    * @param expected
307    * @return true if snapshot is ready to be restored, false if it is still being taken.
308    * @throws IOException IOException if error from HDFS or RPC
309    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
310    */
311   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
312     // check the request to make sure it has a snapshot
313     if (expected == null) {
314       throw new UnknownSnapshotException(
315          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
316     }
317 
318     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
319 
320     // check to see if the sentinel exists,
321     // and if the task is complete removes it from the in-progress snapshots map.
322     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
323 
324     // stop tracking "abandoned" handlers
325     cleanupSentinels();
326 
327     if (handler == null) {
328       // If there's no handler in the in-progress map, it means one of the following:
329       //   - someone has already requested the snapshot state
330       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
331       //   - the snapshot was never requested
332       // In those cases returns to the user the "done state" if the snapshots exists on disk,
333       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
334       if (!isSnapshotCompleted(expected)) {
335         throw new UnknownSnapshotException("Snapshot " + ssString
336             + " is not currently running or one of the known completed snapshots.");
337       }
338       // was done, return true;
339       return true;
340     }
341 
342     // pass on any failure we find in the sentinel
343     try {
344       handler.rethrowExceptionIfFailed();
345     } catch (ForeignException e) {
346       // Give some procedure info on an exception.
347       String status;
348       Procedure p = coordinator.getProcedure(expected.getName());
349       if (p != null) {
350         status = p.getStatus();
351       } else {
352         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
353       }
354       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
355           expected);
356     }
357 
358     // check to see if we are done
359     if (handler.isFinished()) {
360       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
361       return true;
362     } else if (LOG.isDebugEnabled()) {
363       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
364     }
365     return false;
366   }
367 
368   /**
369    * Check to see if there is a snapshot in progress with the same name or on the same table.
370    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
371    * don't allow snapshot with the same name.
372    * @param snapshot description of the snapshot being checked.
373    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
374    *         table.
375    */
376   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
377     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
378     if (isTakingSnapshot(snapshotTable)) {
379       return true;
380     }
381     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
382     while (it.hasNext()) {
383       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
384       SnapshotSentinel sentinel = entry.getValue();
385       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
386         return true;
387       }
388     }
389     return false;
390   }
391 
392   /**
393    * Check to see if the specified table has a snapshot in progress.  Currently we have a
394    * limitation only allowing a single snapshot per table at a time.
395    * @param tableName name of the table being snapshotted.
396    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
397    */
398   synchronized boolean isTakingSnapshot(final TableName tableName) {
399     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
400     return handler != null && !handler.isFinished();
401   }
402 
403   /**
404    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
405    * aren't already running a snapshot or restore on the requested table.
406    * @param snapshot description of the snapshot we want to start
407    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
408    */
409   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
410       throws HBaseSnapshotException {
411     FileSystem fs = master.getMasterFileSystem().getFileSystem();
412     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
413     TableName snapshotTable =
414         TableName.valueOf(snapshot.getTable());
415 
416     // make sure we aren't already running a snapshot
417     if (isTakingSnapshot(snapshot)) {
418       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
419       throw new SnapshotCreationException("Rejected taking "
420           + ClientSnapshotDescriptionUtils.toString(snapshot)
421           + " because we are already running another snapshot "
422           + (handler != null ? ("on the same table " +
423               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
424               : "with the same name"), snapshot);
425     }
426 
427     // make sure we aren't running a restore on the same table
428     if (isRestoringTable(snapshotTable)) {
429       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
430       throw new SnapshotCreationException("Rejected taking "
431           + ClientSnapshotDescriptionUtils.toString(snapshot)
432           + " because we are already have a restore in progress on the same snapshot "
433           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
434     }
435 
436     try {
437       // delete the working directory, since we aren't running the snapshot. Likely leftovers
438       // from a failed attempt.
439       fs.delete(workingDir, true);
440 
441       // recreate the working directory for the snapshot
442       if (!fs.mkdirs(workingDir)) {
443         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
444             + ") for snapshot" , snapshot);
445       }
446     } catch (HBaseSnapshotException e) {
447       throw e;
448     } catch (IOException e) {
449       throw new SnapshotCreationException(
450           "Exception while checking to see if snapshot could be started.", e, snapshot);
451     }
452   }
453 
454   /**
455    * Take a snapshot of a disabled table.
456    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
457    * @throws HBaseSnapshotException if the snapshot could not be started
458    */
459   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
460       throws HBaseSnapshotException {
461     // setup the snapshot
462     prepareToTakeSnapshot(snapshot);
463 
464     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
465     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
466 
467     // Take the snapshot of the disabled table
468     DisabledTableSnapshotHandler handler =
469         new DisabledTableSnapshotHandler(snapshot, master);
470     snapshotTable(snapshot, handler);
471   }
472 
473   /**
474    * Take a snapshot of an enabled table.
475    * @param snapshot description of the snapshot to take.
476    * @throws HBaseSnapshotException if the snapshot could not be started
477    */
478   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
479       throws HBaseSnapshotException {
480     // setup the snapshot
481     prepareToTakeSnapshot(snapshot);
482 
483     // Take the snapshot of the enabled table
484     EnabledTableSnapshotHandler handler =
485         new EnabledTableSnapshotHandler(snapshot, master, this);
486     snapshotTable(snapshot, handler);
487   }
488 
489   /**
490    * Take a snapshot using the specified handler.
491    * On failure the snapshot temporary working directory is removed.
492    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
493    *       snapshot request if the table is busy with another snapshot/restore operation.
494    * @param snapshot the snapshot description
495    * @param handler the snapshot handler
496    */
497   private synchronized void snapshotTable(SnapshotDescription snapshot,
498       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
499     try {
500       handler.prepare();
501       this.executorService.submit(handler);
502       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
503     } catch (Exception e) {
504       // cleanup the working directory by trying to delete it from the fs.
505       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
506       try {
507         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
508           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
509               ClientSnapshotDescriptionUtils.toString(snapshot));
510         }
511       } catch (IOException e1) {
512         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
513             ClientSnapshotDescriptionUtils.toString(snapshot));
514       }
515       // fail the snapshot
516       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
517     }
518   }
519 
520   /**
521    * Take a snapshot based on the enabled/disabled state of the table.
522    *
523    * @param snapshot
524    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
525    * @throws IOException when some sort of generic IO exception occurs.
526    */
527   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
528     // check to see if we already completed the snapshot
529     if (isSnapshotCompleted(snapshot)) {
530       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
531           + "' already stored on the filesystem.", snapshot);
532     }
533 
534     LOG.debug("No existing snapshot, attempting snapshot...");
535 
536     // stop tracking "abandoned" handlers
537     cleanupSentinels();
538 
539     // check to see if the table exists
540     HTableDescriptor desc = null;
541     try {
542       desc = master.getTableDescriptors().get(
543           TableName.valueOf(snapshot.getTable()));
544     } catch (FileNotFoundException e) {
545       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
546       LOG.error(msg);
547       throw new SnapshotCreationException(msg, e, snapshot);
548     } catch (IOException e) {
549       throw new SnapshotCreationException("Error while geting table description for table "
550           + snapshot.getTable(), e, snapshot);
551     }
552     if (desc == null) {
553       throw new SnapshotCreationException("Table '" + snapshot.getTable()
554           + "' doesn't exist, can't take snapshot.", snapshot);
555     }
556     SnapshotDescription.Builder builder = snapshot.toBuilder();
557     // if not specified, set the snapshot format
558     if (!snapshot.hasVersion()) {
559       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
560     }
561     User user = RpcServer.getRequestUser();
562     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
563       builder.setOwner(user.getShortName());
564     }
565     snapshot = builder.build();
566 
567     // call pre coproc hook
568     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
569     if (cpHost != null) {
570       cpHost.preSnapshot(snapshot, desc);
571     }
572 
573     // if the table is enabled, then have the RS run actually the snapshot work
574     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
575     AssignmentManager assignmentMgr = master.getAssignmentManager();
576     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
577         ZooKeeperProtos.Table.State.ENABLED)) {
578       LOG.debug("Table enabled, starting distributed snapshot.");
579       snapshotEnabledTable(snapshot);
580       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
581     }
582     // For disabled table, snapshot is created by the master
583     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
584         ZooKeeperProtos.Table.State.DISABLED)) {
585       LOG.debug("Table is disabled, running snapshot entirely on master.");
586       snapshotDisabledTable(snapshot);
587       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
588     } else {
589       LOG.error("Can't snapshot table '" + snapshot.getTable()
590           + "', isn't open or closed, we don't know what to do!");
591       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
592           + " isn't fully open.");
593       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
594     }
595 
596     // call post coproc hook
597     if (cpHost != null) {
598       cpHost.postSnapshot(snapshot, desc);
599     }
600   }
601 
602   /**
603    * Set the handler for the current snapshot
604    * <p>
605    * Exposed for TESTING
606    * @param tableName
607    * @param handler handler the master should use
608    *
609    * TODO get rid of this if possible, repackaging, modify tests.
610    */
611   public synchronized void setSnapshotHandlerForTesting(
612       final TableName tableName,
613       final SnapshotSentinel handler) {
614     if (handler != null) {
615       this.snapshotHandlers.put(tableName, handler);
616     } else {
617       this.snapshotHandlers.remove(tableName);
618     }
619   }
620 
621   /**
622    * @return distributed commit coordinator for all running snapshots
623    */
624   ProcedureCoordinator getCoordinator() {
625     return coordinator;
626   }
627 
628   /**
629    * Check to see if the snapshot is one of the currently completed snapshots
630    * Returns true if the snapshot exists in the "completed snapshots folder".
631    *
632    * @param snapshot expected snapshot to check
633    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
634    *         not stored
635    * @throws IOException if the filesystem throws an unexpected exception,
636    * @throws IllegalArgumentException if snapshot name is invalid.
637    */
638   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
639     try {
640       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
641       FileSystem fs = master.getMasterFileSystem().getFileSystem();
642       // check to see if the snapshot already exists
643       return fs.exists(snapshotDir);
644     } catch (IllegalArgumentException iae) {
645       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
646     }
647   }
648 
649   /**
650    * Clone the specified snapshot into a new table.
651    * The operation will fail if the destination table has a snapshot or restore in progress.
652    *
653    * @param snapshot Snapshot Descriptor
654    * @param hTableDescriptor Table Descriptor of the table to create
655    */
656   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
657       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
658     TableName tableName = hTableDescriptor.getTableName();
659 
660     // make sure we aren't running a snapshot on the same table
661     if (isTakingSnapshot(tableName)) {
662       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
663     }
664 
665     // make sure we aren't running a restore on the same table
666     if (isRestoringTable(tableName)) {
667       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
668     }
669 
670     try {
671       CloneSnapshotHandler handler =
672         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
673       this.executorService.submit(handler);
674       this.restoreHandlers.put(tableName, handler);
675     } catch (Exception e) {
676       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
677         " on table=" + tableName;
678       LOG.error(msg, e);
679       throw new RestoreSnapshotException(msg, e);
680     }
681   }
682 
683   /**
684    * Restore the specified snapshot
685    * @param reqSnapshot
686    * @throws IOException
687    */
688   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
689     FileSystem fs = master.getMasterFileSystem().getFileSystem();
690     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
691     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
692 
693     // check if the snapshot exists
694     if (!fs.exists(snapshotDir)) {
695       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
696       throw new SnapshotDoesNotExistException(reqSnapshot);
697     }
698 
699     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
700     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
701     // information.
702     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
703     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
704         snapshotDir, snapshot);
705     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
706     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
707 
708     // stop tracking "abandoned" handlers
709     cleanupSentinels();
710 
711     // Verify snapshot validity
712     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
713 
714     // Execute the restore/clone operation
715     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
716       if (master.getAssignmentManager().getTableStateManager().isTableState(
717           TableName.valueOf(snapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
718         throw new UnsupportedOperationException("Table '" +
719             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
720             "perform a restore operation" +
721             ".");
722       }
723 
724       // call coproc pre hook
725       if (cpHost != null) {
726         cpHost.preRestoreSnapshot(snapshot, snapshotTableDesc);
727       }
728 
729       int tableRegionCount = -1;
730       try {
731         // Table already exist. Check and update the region quota for this table namespace.
732         // The region quota may not be updated correctly if there are concurrent restore snapshot
733         // requests for the same table
734 
735         tableRegionCount = getRegionCountOfTable(tableName);
736         int snapshotRegionCount = manifest.getRegionManifestsMap().size();
737 
738         // Update region quota when snapshotRegionCount is larger. If we updated the region count
739         // to a smaller value before retoreSnapshot and the retoreSnapshot fails, we may fail to
740         // reset the region count to its original value if the region quota is consumed by other
741         // tables in the namespace
742         if (tableRegionCount > 0 && tableRegionCount < snapshotRegionCount) {
743           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
744         }
745         restoreSnapshot(snapshot, snapshotTableDesc);
746         // Update the region quota if snapshotRegionCount is smaller. This step should not fail
747         // because we have reserved enough region quota before hand
748         if (tableRegionCount > 0 && tableRegionCount > snapshotRegionCount) {
749           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
750         }
751       } catch (QuotaExceededException e) {
752         LOG.error("Region quota exceeded while restoring the snapshot " + snapshot.getName()
753           + " as table " + tableName.getNameAsString(), e);
754         // If QEE is thrown before restoreSnapshot, quota information is not updated, so we
755         // should throw the exception directly. If QEE is thrown after restoreSnapshot, there
756         // must be unexpected reasons, we also throw the exception directly
757         throw e;
758       } catch (IOException e) {
759         if (tableRegionCount > 0) {
760           // reset the region count for table
761           checkAndUpdateNamespaceRegionQuota(tableRegionCount, tableName);
762         }
763         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
764             + " as table " + tableName.getNameAsString(), e);
765         throw e;
766       }
767       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
768 
769       if (cpHost != null) {
770         cpHost.postRestoreSnapshot(snapshot, snapshotTableDesc);
771       }
772     } else {
773       HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
774       if (cpHost != null) {
775         cpHost.preCloneSnapshot(snapshot, htd);
776       }
777       try {
778         checkAndUpdateNamespaceQuota(manifest, tableName);
779         cloneSnapshot(snapshot, htd);
780       } catch (IOException e) {
781         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
782         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
783             + " as table " + tableName.getNameAsString(), e);
784         throw e;
785       }
786       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
787 
788       if (cpHost != null) {
789         cpHost.postCloneSnapshot(snapshot, htd);
790       }
791     }
792   }
793 
794   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
795       throws IOException {
796     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
797       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
798         manifest.getRegionManifestsMap().size());
799     }
800   }
801 
802   private void checkAndUpdateNamespaceRegionQuota(int updatedRegionCount, TableName tableName)
803       throws IOException {
804     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
805       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
806         updatedRegionCount);
807     }
808   }
809 
810   /**
811    * @return cached region count, or -1 if quota manager is disabled or table status not found
812   */
813   private int getRegionCountOfTable(TableName tableName) throws IOException {
814     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
815       return this.master.getMasterQuotaManager().getRegionCountOfTable(tableName);
816     }
817     return -1;
818   }
819 
820   /**
821    * Restore the specified snapshot.
822    * The restore will fail if the destination table has a snapshot or restore in progress.
823    *
824    * @param snapshot Snapshot Descriptor
825    * @param hTableDescriptor Table Descriptor
826    */
827   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
828       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
829     TableName tableName = hTableDescriptor.getTableName();
830 
831     // make sure we aren't running a snapshot on the same table
832     if (isTakingSnapshot(tableName)) {
833       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
834     }
835 
836     // make sure we aren't running a restore on the same table
837     if (isRestoringTable(tableName)) {
838       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
839     }
840 
841     try {
842       RestoreSnapshotHandler handler =
843         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
844       this.executorService.submit(handler);
845       restoreHandlers.put(tableName, handler);
846     } catch (Exception e) {
847       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
848           snapshot)  +
849           " on table=" + tableName;
850       LOG.error(msg, e);
851       throw new RestoreSnapshotException(msg, e);
852     }
853   }
854 
855   /**
856    * Verify if the restore of the specified table is in progress.
857    *
858    * @param tableName table under restore
859    * @return <tt>true</tt> if there is a restore in progress of the specified table.
860    */
861   private synchronized boolean isRestoringTable(final TableName tableName) {
862     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
863     return(sentinel != null && !sentinel.isFinished());
864   }
865 
866   /**
867    * Returns the status of a restore operation.
868    * If the in-progress restore is failed throws the exception that caused the failure.
869    *
870    * @param snapshot
871    * @return false if in progress, true if restore is completed or not requested.
872    * @throws IOException if there was a failure during the restore
873    */
874   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
875     // check to see if the sentinel exists,
876     // and if the task is complete removes it from the in-progress restore map.
877     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
878 
879     // stop tracking "abandoned" handlers
880     cleanupSentinels();
881 
882     if (sentinel == null) {
883       // there is no sentinel so restore is not in progress.
884       return true;
885     }
886 
887     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
888         + sentinel.getSnapshot().getName() + " table=" +
889         TableName.valueOf(snapshot.getTable()));
890 
891     // If the restore is failed, rethrow the exception
892     sentinel.rethrowExceptionIfFailed();
893 
894     // check to see if we are done
895     if (sentinel.isFinished()) {
896       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
897           " has completed. Notifying the client.");
898       return true;
899     }
900 
901     if (LOG.isDebugEnabled()) {
902       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
903           ClientSnapshotDescriptionUtils.toString(snapshot));
904     }
905     return false;
906   }
907 
908   /**
909    * Return the handler if it is currently live and has the same snapshot target name.
910    * The handler is removed from the sentinels map if completed.
911    * @param sentinels live handlers
912    * @param snapshot snapshot description
913    * @return null if doesn't match, else a live handler.
914    */
915   private synchronized SnapshotSentinel removeSentinelIfFinished(
916       final Map<TableName, SnapshotSentinel> sentinels,
917       final SnapshotDescription snapshot) {
918     if (!snapshot.hasTable()) {
919       return null;
920     }
921 
922     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
923     SnapshotSentinel h = sentinels.get(snapshotTable);
924     if (h == null) {
925       return null;
926     }
927 
928     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
929       // specified snapshot is to the one currently running
930       return null;
931     }
932 
933     // Remove from the "in-progress" list once completed
934     if (h.isFinished()) {
935       sentinels.remove(snapshotTable);
936     }
937 
938     return h;
939   }
940 
941   /**
942    * Removes "abandoned" snapshot/restore requests.
943    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
944    * and the in-progress maps are cleaned up when the status of a completed task is requested.
945    * To avoid having sentinels staying around for long time if something client side is failed,
946    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
947    */
948   private void cleanupSentinels() {
949     cleanupSentinels(this.snapshotHandlers);
950     cleanupSentinels(this.restoreHandlers);
951   }
952 
953   /**
954    * Remove the sentinels that are marked as finished and the completion time
955    * has exceeded the removal timeout.
956    * @param sentinels map of sentinels to clean
957    */
958   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
959     long currentTime = EnvironmentEdgeManager.currentTime();
960     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
961         sentinels.entrySet().iterator();
962     while (it.hasNext()) {
963       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
964       SnapshotSentinel sentinel = entry.getValue();
965       if (sentinel.isFinished() &&
966           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
967       {
968         it.remove();
969       }
970     }
971   }
972 
973   //
974   // Implementing Stoppable interface
975   //
976 
977   @Override
978   public void stop(String why) {
979     // short circuit
980     if (this.stopped) return;
981     // make sure we get stop
982     this.stopped = true;
983     // pass the stop onto take snapshot handlers
984     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
985       snapshotHandler.cancel(why);
986     }
987 
988     // pass the stop onto all the restore handlers
989     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
990       restoreHandler.cancel(why);
991     }
992     try {
993       if (coordinator != null) {
994         coordinator.close();
995       }
996     } catch (IOException e) {
997       LOG.error("stop ProcedureCoordinator error", e);
998     }
999   }
1000 
1001   @Override
1002   public boolean isStopped() {
1003     return this.stopped;
1004   }
1005 
1006   /**
1007    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1008    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1009    * @throws UnsupportedOperationException if snapshot are not supported
1010    */
1011   public void checkSnapshotSupport() throws UnsupportedOperationException {
1012     if (!this.isSnapshotSupported) {
1013       throw new UnsupportedOperationException(
1014         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1015           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1016     }
1017   }
1018 
1019   /**
1020    * Called at startup, to verify if snapshot operation is supported, and to avoid
1021    * starting the master if there're snapshots present but the cleaners needed are missing.
1022    * Otherwise we can end up with snapshot data loss.
1023    * @param conf The {@link Configuration} object to use
1024    * @param mfs The MasterFileSystem to use
1025    * @throws IOException in case of file-system operation failure
1026    * @throws UnsupportedOperationException in case cleaners are missing and
1027    *         there're snapshot in the system
1028    */
1029   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1030       throws IOException, UnsupportedOperationException {
1031     // Verify if snapshot is disabled by the user
1032     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1033     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1034     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1035 
1036     // Extract cleaners from conf
1037     Set<String> hfileCleaners = new HashSet<String>();
1038     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1039     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1040 
1041     Set<String> logCleaners = new HashSet<String>();
1042     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1043     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1044 
1045     // check if an older version of snapshot directory was present
1046     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1047     FileSystem fs = mfs.getFileSystem();
1048     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1049     if (ss != null && !ss.isEmpty()) {
1050       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1051       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1052     }
1053 
1054     // If the user has enabled the snapshot, we force the cleaners to be present
1055     // otherwise we still need to check if cleaners are enabled or not and verify
1056     // that there're no snapshot in the .snapshot folder.
1057     if (snapshotEnabled) {
1058       // Inject snapshot cleaners, if snapshot.enable is true
1059       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1060       hfileCleaners.add(HFileLinkCleaner.class.getName());
1061       logCleaners.add(SnapshotLogCleaner.class.getName());
1062 
1063       // Set cleaners conf
1064       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1065         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1066       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1067         logCleaners.toArray(new String[logCleaners.size()]));
1068     } else {
1069       // Verify if cleaners are present
1070       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
1071         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1072         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1073 
1074       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1075       if (snapshotEnabled) {
1076         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1077           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1078           (userDisabled ? "is set to 'false'." : "is not set."));
1079       }
1080     }
1081 
1082     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1083     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1084 
1085     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1086     // otherwise we end up with snapshot data loss.
1087     if (!snapshotEnabled) {
1088       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1089       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1090       if (fs.exists(snapshotDir)) {
1091         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1092           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1093         if (snapshots != null) {
1094           LOG.error("Snapshots are present, but cleaners are not enabled.");
1095           checkSnapshotSupport();
1096         }
1097       }
1098     }
1099   }
1100 
1101   @Override
1102   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1103       IOException, UnsupportedOperationException {
1104     this.master = master;
1105 
1106     this.rootDir = master.getMasterFileSystem().getRootDir();
1107     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1108 
1109     // get the configuration for the coordinator
1110     Configuration conf = master.getConfiguration();
1111     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1112     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1113                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1114             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1115                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1116     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1117 
1118     // setup the default procedure coordinator
1119     String name = master.getServerName().toString();
1120     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1121     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1122         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1123 
1124     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1125     this.executorService = master.getExecutorService();
1126     resetTempDir();
1127   }
1128 
1129   @Override
1130   public String getProcedureSignature() {
1131     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1132   }
1133 
1134   @Override
1135   public void execProcedure(ProcedureDescription desc) throws IOException {
1136     takeSnapshot(toSnapshotDescription(desc));
1137   }
1138 
1139   @Override
1140   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1141     return isSnapshotDone(toSnapshotDescription(desc));
1142   }
1143 
1144   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1145       throws IOException {
1146     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1147     if (!desc.hasInstance()) {
1148       throw new IOException("Snapshot name is not defined: " + desc.toString());
1149     }
1150     String snapshotName = desc.getInstance();
1151     List<NameStringPair> props = desc.getConfigurationList();
1152     String table = null;
1153     for (NameStringPair prop : props) {
1154       if ("table".equalsIgnoreCase(prop.getName())) {
1155         table = prop.getValue();
1156       }
1157     }
1158     if (table == null) {
1159       throw new IOException("Snapshot table is not defined: " + desc.toString());
1160     }
1161     TableName tableName = TableName.valueOf(table);
1162     builder.setTable(tableName.getNameAsString());
1163     builder.setName(snapshotName);
1164     builder.setType(SnapshotDescription.Type.FLUSH);
1165     return builder.build();
1166   }
1167 }