View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.classification.InterfaceStability;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FSDataInputStream;
38  import org.apache.hadoop.fs.FileStatus;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.Stoppable;
46  import org.apache.hadoop.hbase.MetaTableAccessor;
47  import org.apache.hadoop.hbase.errorhandling.ForeignException;
48  import org.apache.hadoop.hbase.executor.ExecutorService;
49  import org.apache.hadoop.hbase.ipc.RpcServer;
50  import org.apache.hadoop.hbase.master.AssignmentManager;
51  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
52  import org.apache.hadoop.hbase.master.MasterFileSystem;
53  import org.apache.hadoop.hbase.master.MasterServices;
54  import org.apache.hadoop.hbase.master.MetricsMaster;
55  import org.apache.hadoop.hbase.master.SnapshotSentinel;
56  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
57  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
58  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
59  import org.apache.hadoop.hbase.procedure.Procedure;
60  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
61  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
62  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
63  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
67  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
68  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
69  import org.apache.hadoop.hbase.security.AccessDeniedException;
70  import org.apache.hadoop.hbase.security.User;
71  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
72  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
73  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
74  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
75  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
76  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
77  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
78  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
79  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
80  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
81  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
82  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
83  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
84  import org.apache.hadoop.hbase.util.FSUtils;
85  import org.apache.zookeeper.KeeperException;
86  
87  /**
88   * This class manages the procedure of taking and restoring snapshots. There is only one
89   * SnapshotManager for the master.
90   * <p>
91   * The class provides methods for monitoring in-progress snapshot actions.
92   * <p>
93   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
94   * simplification in the current implementation.
95   */
96  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
97  @InterfaceStability.Unstable
98  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
99    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
100 
101   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
102   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
103 
104   /**
105    * Wait time before removing a finished sentinel from the in-progress map
106    *
107    * NOTE: This is used as a safety auto cleanup.
108    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
109    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
110    * In case something fails on the client side and the snapshot/restore state is not reclaimed
111    * after a default timeout, the entry is removed from the in-progress map.
112    * At this point, if the user asks for the snapshot/restore status, the result will be
113    * snapshot done if exists or failed if it doesn't exists.
114    */
115   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
116 
117   /** Enable or disable snapshot support */
118   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
119 
120   /**
121    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
122    * completion.
123    */
124   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
125 
126   /** Name of the operation to use in the controller */
127   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
128 
129   /** Conf key for # of threads used by the SnapshotManager thread pool */
130   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
131 
132   /** number of current operations running on the master */
133   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
134 
135   private boolean stopped;
136   private MasterServices master;  // Needed by TableEventHandlers
137   private ProcedureCoordinator coordinator;
138 
139   // Is snapshot feature enabled?
140   private boolean isSnapshotSupported = false;
141 
142   // Snapshot handlers map, with table name as key.
143   // The map is always accessed and modified under the object lock using synchronized.
144   // snapshotTable() will insert an Handler in the table.
145   // isSnapshotDone() will remove the handler requested if the operation is finished.
146   private Map<TableName, SnapshotSentinel> snapshotHandlers =
147       new HashMap<TableName, SnapshotSentinel>();
148 
149   // Restore Sentinels map, with table name as key.
150   // The map is always accessed and modified under the object lock using synchronized.
151   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
152   // isRestoreDone() will remove the handler requested if the operation is finished.
153   private Map<TableName, SnapshotSentinel> restoreHandlers =
154       new HashMap<TableName, SnapshotSentinel>();
155 
156   private Path rootDir;
157   private ExecutorService executorService;
158 
159   public SnapshotManager() {}
160 
161   /**
162    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
163    * @param master services for the master where the manager is running
164    * @param coordinator procedure coordinator instance.  exposed for testing.
165    * @param pool HBase ExecutorServcie instance, exposed for testing.
166    */
167   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
168       ProcedureCoordinator coordinator, ExecutorService pool)
169       throws IOException, UnsupportedOperationException {
170     this.master = master;
171 
172     this.rootDir = master.getMasterFileSystem().getRootDir();
173     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
174 
175     this.coordinator = coordinator;
176     this.executorService = pool;
177     resetTempDir();
178   }
179 
180   /**
181    * Gets the list of all completed snapshots.
182    * @return list of SnapshotDescriptions
183    * @throws IOException File system exception
184    */
185   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
186     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
187   }
188 
189   /**
190    * Gets the list of all completed snapshots.
191    * @param snapshotDir snapshot directory
192    * @return list of SnapshotDescriptions
193    * @throws IOException File system exception
194    */
195   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
196     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
197     // first create the snapshot root path and check to see if it exists
198     FileSystem fs = master.getMasterFileSystem().getFileSystem();
199     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
200 
201     // if there are no snapshots, return an empty list
202     if (!fs.exists(snapshotDir)) {
203       return snapshotDescs;
204     }
205 
206     // ignore all the snapshots in progress
207     FileStatus[] snapshots = fs.listStatus(snapshotDir,
208       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
209     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
210     // loop through all the completed snapshots
211     for (FileStatus snapshot : snapshots) {
212       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
213       // if the snapshot is bad
214       if (!fs.exists(info)) {
215         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
216         continue;
217       }
218       FSDataInputStream in = null;
219       try {
220         in = fs.open(info);
221         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
222         if (cpHost != null) {
223           try {
224             cpHost.preListSnapshot(desc);
225           } catch (AccessDeniedException e) {
226             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
227                 + "Either you should be owner of this snapshot or admin user.");
228             // Skip this and try for next snapshot
229             continue;
230           }
231         }
232         snapshotDescs.add(desc);
233 
234         // call coproc post hook
235         if (cpHost != null) {
236           cpHost.postListSnapshot(desc);
237         }
238       } catch (IOException e) {
239         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
240       } finally {
241         if (in != null) {
242           in.close();
243         }
244       }
245     }
246     return snapshotDescs;
247   }
248 
249   /**
250    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
251    * snapshot attempts.
252    *
253    * @throws IOException if we can't reach the filesystem
254    */
255   void resetTempDir() throws IOException {
256     // cleanup any existing snapshots.
257     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
258     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
259       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
260         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
261       }
262     }
263   }
264 
265   /**
266    * Delete the specified snapshot
267    * @param snapshot
268    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
269    * @throws IOException For filesystem IOExceptions
270    */
271   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
272     // check to see if it is completed
273     if (!isSnapshotCompleted(snapshot)) {
274       throw new SnapshotDoesNotExistException(snapshot);
275     }
276 
277     String snapshotName = snapshot.getName();
278     // first create the snapshot description and check to see if it exists
279     FileSystem fs = master.getMasterFileSystem().getFileSystem();
280     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
281     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
282     // just the "name" and it does not contains the "real" snapshot information
283     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
284 
285     // call coproc pre hook
286     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
287     if (cpHost != null) {
288       cpHost.preDeleteSnapshot(snapshot);
289     }
290 
291     LOG.debug("Deleting snapshot: " + snapshotName);
292     // delete the existing snapshot
293     if (!fs.delete(snapshotDir, true)) {
294       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
295     }
296 
297     // call coproc post hook
298     if (cpHost != null) {
299       cpHost.postDeleteSnapshot(snapshot);
300     }
301 
302   }
303 
304   /**
305    * Check if the specified snapshot is done
306    *
307    * @param expected
308    * @return true if snapshot is ready to be restored, false if it is still being taken.
309    * @throws IOException IOException if error from HDFS or RPC
310    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
311    */
312   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
313     // check the request to make sure it has a snapshot
314     if (expected == null) {
315       throw new UnknownSnapshotException(
316          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
317     }
318 
319     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
320 
321     // check to see if the sentinel exists,
322     // and if the task is complete removes it from the in-progress snapshots map.
323     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
324 
325     // stop tracking "abandoned" handlers
326     cleanupSentinels();
327 
328     if (handler == null) {
329       // If there's no handler in the in-progress map, it means one of the following:
330       //   - someone has already requested the snapshot state
331       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
332       //   - the snapshot was never requested
333       // In those cases returns to the user the "done state" if the snapshots exists on disk,
334       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
335       if (!isSnapshotCompleted(expected)) {
336         throw new UnknownSnapshotException("Snapshot " + ssString
337             + " is not currently running or one of the known completed snapshots.");
338       }
339       // was done, return true;
340       return true;
341     }
342 
343     // pass on any failure we find in the sentinel
344     try {
345       handler.rethrowExceptionIfFailed();
346     } catch (ForeignException e) {
347       // Give some procedure info on an exception.
348       String status;
349       Procedure p = coordinator.getProcedure(expected.getName());
350       if (p != null) {
351         status = p.getStatus();
352       } else {
353         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
354       }
355       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
356           expected);
357     }
358 
359     // check to see if we are done
360     if (handler.isFinished()) {
361       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
362       return true;
363     } else if (LOG.isDebugEnabled()) {
364       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
365     }
366     return false;
367   }
368 
369   /**
370    * Check to see if there is a snapshot in progress with the same name or on the same table.
371    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
372    * don't allow snapshot with the same name.
373    * @param snapshot description of the snapshot being checked.
374    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
375    *         table.
376    */
377   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
378     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
379     if (isTakingSnapshot(snapshotTable)) {
380       return true;
381     }
382     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
383     while (it.hasNext()) {
384       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
385       SnapshotSentinel sentinel = entry.getValue();
386       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
387         return true;
388       }
389     }
390     return false;
391   }
392 
393   /**
394    * Check to see if the specified table has a snapshot in progress.  Currently we have a
395    * limitation only allowing a single snapshot per table at a time.
396    * @param tableName name of the table being snapshotted.
397    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
398    */
399   synchronized boolean isTakingSnapshot(final TableName tableName) {
400     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
401     return handler != null && !handler.isFinished();
402   }
403 
404   /**
405    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
406    * aren't already running a snapshot or restore on the requested table.
407    * @param snapshot description of the snapshot we want to start
408    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
409    */
410   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
411       throws HBaseSnapshotException {
412     FileSystem fs = master.getMasterFileSystem().getFileSystem();
413     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
414     TableName snapshotTable =
415         TableName.valueOf(snapshot.getTable());
416 
417     // make sure we aren't already running a snapshot
418     if (isTakingSnapshot(snapshot)) {
419       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
420       throw new SnapshotCreationException("Rejected taking "
421           + ClientSnapshotDescriptionUtils.toString(snapshot)
422           + " because we are already running another snapshot "
423           + (handler != null ? ("on the same table " +
424               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
425               : "with the same name"), snapshot);
426     }
427 
428     // make sure we aren't running a restore on the same table
429     if (isRestoringTable(snapshotTable)) {
430       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
431       throw new SnapshotCreationException("Rejected taking "
432           + ClientSnapshotDescriptionUtils.toString(snapshot)
433           + " because we are already have a restore in progress on the same snapshot "
434           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
435     }
436 
437     try {
438       // delete the working directory, since we aren't running the snapshot. Likely leftovers
439       // from a failed attempt.
440       fs.delete(workingDir, true);
441 
442       // recreate the working directory for the snapshot
443       if (!fs.mkdirs(workingDir)) {
444         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
445             + ") for snapshot" , snapshot);
446       }
447     } catch (HBaseSnapshotException e) {
448       throw e;
449     } catch (IOException e) {
450       throw new SnapshotCreationException(
451           "Exception while checking to see if snapshot could be started.", e, snapshot);
452     }
453   }
454 
455   /**
456    * Take a snapshot of a disabled table.
457    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
458    * @throws HBaseSnapshotException if the snapshot could not be started
459    */
460   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
461       throws HBaseSnapshotException {
462     // setup the snapshot
463     prepareToTakeSnapshot(snapshot);
464 
465     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
466     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
467 
468     // Take the snapshot of the disabled table
469     DisabledTableSnapshotHandler handler =
470         new DisabledTableSnapshotHandler(snapshot, master);
471     snapshotTable(snapshot, handler);
472   }
473 
474   /**
475    * Take a snapshot of an enabled table.
476    * @param snapshot description of the snapshot to take.
477    * @throws HBaseSnapshotException if the snapshot could not be started
478    */
479   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
480       throws HBaseSnapshotException {
481     // setup the snapshot
482     prepareToTakeSnapshot(snapshot);
483 
484     // Take the snapshot of the enabled table
485     EnabledTableSnapshotHandler handler =
486         new EnabledTableSnapshotHandler(snapshot, master, this);
487     snapshotTable(snapshot, handler);
488   }
489 
490   /**
491    * Take a snapshot using the specified handler.
492    * On failure the snapshot temporary working directory is removed.
493    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
494    *       snapshot request if the table is busy with another snapshot/restore operation.
495    * @param snapshot the snapshot description
496    * @param handler the snapshot handler
497    */
498   private synchronized void snapshotTable(SnapshotDescription snapshot,
499       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
500     try {
501       handler.prepare();
502       this.executorService.submit(handler);
503       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
504     } catch (Exception e) {
505       // cleanup the working directory by trying to delete it from the fs.
506       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
507       try {
508         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
509           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
510               ClientSnapshotDescriptionUtils.toString(snapshot));
511         }
512       } catch (IOException e1) {
513         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
514             ClientSnapshotDescriptionUtils.toString(snapshot));
515       }
516       // fail the snapshot
517       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
518     }
519   }
520 
521   /**
522    * Take a snapshot based on the enabled/disabled state of the table.
523    *
524    * @param snapshot
525    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
526    * @throws IOException when some sort of generic IO exception occurs.
527    */
528   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
529     // check to see if we already completed the snapshot
530     if (isSnapshotCompleted(snapshot)) {
531       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
532           + "' already stored on the filesystem.", snapshot);
533     }
534 
535     LOG.debug("No existing snapshot, attempting snapshot...");
536 
537     // stop tracking "abandoned" handlers
538     cleanupSentinels();
539 
540     // check to see if the table exists
541     HTableDescriptor desc = null;
542     try {
543       desc = master.getTableDescriptors().get(
544           TableName.valueOf(snapshot.getTable()));
545     } catch (FileNotFoundException e) {
546       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
547       LOG.error(msg);
548       throw new SnapshotCreationException(msg, e, snapshot);
549     } catch (IOException e) {
550       throw new SnapshotCreationException("Error while geting table description for table "
551           + snapshot.getTable(), e, snapshot);
552     }
553     if (desc == null) {
554       throw new SnapshotCreationException("Table '" + snapshot.getTable()
555           + "' doesn't exist, can't take snapshot.", snapshot);
556     }
557     SnapshotDescription.Builder builder = snapshot.toBuilder();
558     // if not specified, set the snapshot format
559     if (!snapshot.hasVersion()) {
560       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
561     }
562     User user = RpcServer.getRequestUser();
563     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
564       builder.setOwner(user.getShortName());
565     }
566     snapshot = builder.build();
567 
568     // call pre coproc hook
569     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
570     if (cpHost != null) {
571       cpHost.preSnapshot(snapshot, desc);
572     }
573 
574     // if the table is enabled, then have the RS run actually the snapshot work
575     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
576     AssignmentManager assignmentMgr = master.getAssignmentManager();
577     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
578         ZooKeeperProtos.Table.State.ENABLED)) {
579       LOG.debug("Table enabled, starting distributed snapshot.");
580       snapshotEnabledTable(snapshot);
581       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
582     }
583     // For disabled table, snapshot is created by the master
584     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
585         ZooKeeperProtos.Table.State.DISABLED)) {
586       LOG.debug("Table is disabled, running snapshot entirely on master.");
587       snapshotDisabledTable(snapshot);
588       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
589     } else {
590       LOG.error("Can't snapshot table '" + snapshot.getTable()
591           + "', isn't open or closed, we don't know what to do!");
592       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
593           + " isn't fully open.");
594       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
595     }
596 
597     // call post coproc hook
598     if (cpHost != null) {
599       cpHost.postSnapshot(snapshot, desc);
600     }
601   }
602 
603   /**
604    * Set the handler for the current snapshot
605    * <p>
606    * Exposed for TESTING
607    * @param tableName
608    * @param handler handler the master should use
609    *
610    * TODO get rid of this if possible, repackaging, modify tests.
611    */
612   public synchronized void setSnapshotHandlerForTesting(
613       final TableName tableName,
614       final SnapshotSentinel handler) {
615     if (handler != null) {
616       this.snapshotHandlers.put(tableName, handler);
617     } else {
618       this.snapshotHandlers.remove(tableName);
619     }
620   }
621 
622   /**
623    * @return distributed commit coordinator for all running snapshots
624    */
625   ProcedureCoordinator getCoordinator() {
626     return coordinator;
627   }
628 
629   /**
630    * Check to see if the snapshot is one of the currently completed snapshots
631    * Returns true if the snapshot exists in the "completed snapshots folder".
632    *
633    * @param snapshot expected snapshot to check
634    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
635    *         not stored
636    * @throws IOException if the filesystem throws an unexpected exception,
637    * @throws IllegalArgumentException if snapshot name is invalid.
638    */
639   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
640     try {
641       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
642       FileSystem fs = master.getMasterFileSystem().getFileSystem();
643       // check to see if the snapshot already exists
644       return fs.exists(snapshotDir);
645     } catch (IllegalArgumentException iae) {
646       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
647     }
648   }
649 
650   /**
651    * Clone the specified snapshot into a new table.
652    * The operation will fail if the destination table has a snapshot or restore in progress.
653    *
654    * @param snapshot Snapshot Descriptor
655    * @param hTableDescriptor Table Descriptor of the table to create
656    */
657   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
658       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
659     TableName tableName = hTableDescriptor.getTableName();
660 
661     // make sure we aren't running a snapshot on the same table
662     if (isTakingSnapshot(tableName)) {
663       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
664     }
665 
666     // make sure we aren't running a restore on the same table
667     if (isRestoringTable(tableName)) {
668       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
669     }
670 
671     try {
672       CloneSnapshotHandler handler =
673         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
674       this.executorService.submit(handler);
675       this.restoreHandlers.put(tableName, handler);
676     } catch (Exception e) {
677       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
678         " on table=" + tableName;
679       LOG.error(msg, e);
680       throw new RestoreSnapshotException(msg, e);
681     }
682   }
683 
684   /**
685    * Restore the specified snapshot
686    * @param reqSnapshot
687    * @throws IOException
688    */
689   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
690     FileSystem fs = master.getMasterFileSystem().getFileSystem();
691     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
692     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
693 
694     // check if the snapshot exists
695     if (!fs.exists(snapshotDir)) {
696       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
697       throw new SnapshotDoesNotExistException(reqSnapshot);
698     }
699 
700     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
701     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
702     // information.
703     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
704     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
705         snapshotDir, snapshot);
706     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
707     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
708 
709     // stop tracking "abandoned" handlers
710     cleanupSentinels();
711 
712     // Verify snapshot validity
713     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
714 
715     // Execute the restore/clone operation
716     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
717       if (master.getAssignmentManager().getTableStateManager().isTableState(
718           TableName.valueOf(snapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
719         throw new UnsupportedOperationException("Table '" +
720             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
721             "perform a restore operation" +
722             ".");
723       }
724 
725       // call coproc pre hook
726       if (cpHost != null) {
727         cpHost.preRestoreSnapshot(snapshot, snapshotTableDesc);
728       }
729 
730       int tableRegionCount = -1;
731       try {
732         // Table already exist. Check and update the region quota for this table namespace.
733         // The region quota may not be updated correctly if there are concurrent restore snapshot
734         // requests for the same table
735 
736         tableRegionCount = getRegionCountOfTable(tableName);
737         int snapshotRegionCount = manifest.getRegionManifestsMap().size();
738 
739         // Update region quota when snapshotRegionCount is larger. If we updated the region count
740         // to a smaller value before retoreSnapshot and the retoreSnapshot fails, we may fail to
741         // reset the region count to its original value if the region quota is consumed by other
742         // tables in the namespace
743         if (tableRegionCount > 0 && tableRegionCount < snapshotRegionCount) {
744           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
745         }
746         restoreSnapshot(snapshot, snapshotTableDesc);
747         // Update the region quota if snapshotRegionCount is smaller. This step should not fail
748         // because we have reserved enough region quota before hand
749         if (tableRegionCount > 0 && tableRegionCount > snapshotRegionCount) {
750           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
751         }
752       } catch (QuotaExceededException e) {
753         LOG.error("Region quota exceeded while restoring the snapshot " + snapshot.getName()
754           + " as table " + tableName.getNameAsString(), e);
755         // If QEE is thrown before restoreSnapshot, quota information is not updated, so we
756         // should throw the exception directly. If QEE is thrown after restoreSnapshot, there
757         // must be unexpected reasons, we also throw the exception directly
758         throw e;
759       } catch (IOException e) {
760         if (tableRegionCount > 0) {
761           // reset the region count for table
762           checkAndUpdateNamespaceRegionQuota(tableRegionCount, tableName);
763         }
764         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
765             + " as table " + tableName.getNameAsString(), e);
766         throw e;
767       }
768       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
769 
770       if (cpHost != null) {
771         cpHost.postRestoreSnapshot(snapshot, snapshotTableDesc);
772       }
773     } else {
774       HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc, tableName);
775       if (cpHost != null) {
776         cpHost.preCloneSnapshot(snapshot, htd);
777       }
778       try {
779         checkAndUpdateNamespaceQuota(manifest, tableName);
780         cloneSnapshot(snapshot, htd);
781       } catch (IOException e) {
782         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
783         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
784             + " as table " + tableName.getNameAsString(), e);
785         throw e;
786       }
787       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
788 
789       if (cpHost != null) {
790         cpHost.postCloneSnapshot(snapshot, htd);
791       }
792     }
793   }
794   
795   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
796       throws IOException {
797     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
798       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
799         manifest.getRegionManifestsMap().size());
800     }
801   }
802 
803   private void checkAndUpdateNamespaceRegionQuota(int updatedRegionCount, TableName tableName)
804       throws IOException {
805     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
806       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
807         updatedRegionCount);
808     }
809   }
810 
811   /**
812    * @return cached region count, or -1 if quota manager is disabled or table status not found
813   */
814   private int getRegionCountOfTable(TableName tableName) throws IOException {
815     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
816       return this.master.getMasterQuotaManager().getRegionCountOfTable(tableName);
817     }
818     return -1;
819   }
820 
821   /**
822    * Restore the specified snapshot.
823    * The restore will fail if the destination table has a snapshot or restore in progress.
824    *
825    * @param snapshot Snapshot Descriptor
826    * @param hTableDescriptor Table Descriptor
827    */
828   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
829       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
830     TableName tableName = hTableDescriptor.getTableName();
831 
832     // make sure we aren't running a snapshot on the same table
833     if (isTakingSnapshot(tableName)) {
834       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
835     }
836 
837     // make sure we aren't running a restore on the same table
838     if (isRestoringTable(tableName)) {
839       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
840     }
841 
842     try {
843       RestoreSnapshotHandler handler =
844         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
845       this.executorService.submit(handler);
846       restoreHandlers.put(tableName, handler);
847     } catch (Exception e) {
848       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
849           snapshot)  +
850           " on table=" + tableName;
851       LOG.error(msg, e);
852       throw new RestoreSnapshotException(msg, e);
853     }
854   }
855 
856   /**
857    * Verify if the restore of the specified table is in progress.
858    *
859    * @param tableName table under restore
860    * @return <tt>true</tt> if there is a restore in progress of the specified table.
861    */
862   private synchronized boolean isRestoringTable(final TableName tableName) {
863     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
864     return(sentinel != null && !sentinel.isFinished());
865   }
866 
867   /**
868    * Returns the status of a restore operation.
869    * If the in-progress restore is failed throws the exception that caused the failure.
870    *
871    * @param snapshot
872    * @return false if in progress, true if restore is completed or not requested.
873    * @throws IOException if there was a failure during the restore
874    */
875   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
876     // check to see if the sentinel exists,
877     // and if the task is complete removes it from the in-progress restore map.
878     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
879 
880     // stop tracking "abandoned" handlers
881     cleanupSentinels();
882 
883     if (sentinel == null) {
884       // there is no sentinel so restore is not in progress.
885       return true;
886     }
887 
888     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
889         + sentinel.getSnapshot().getName() + " table=" +
890         TableName.valueOf(snapshot.getTable()));
891 
892     // If the restore is failed, rethrow the exception
893     sentinel.rethrowExceptionIfFailed();
894 
895     // check to see if we are done
896     if (sentinel.isFinished()) {
897       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
898           " has completed. Notifying the client.");
899       return true;
900     }
901 
902     if (LOG.isDebugEnabled()) {
903       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
904           ClientSnapshotDescriptionUtils.toString(snapshot));
905     }
906     return false;
907   }
908 
909   /**
910    * Return the handler if it is currently live and has the same snapshot target name.
911    * The handler is removed from the sentinels map if completed.
912    * @param sentinels live handlers
913    * @param snapshot snapshot description
914    * @return null if doesn't match, else a live handler.
915    */
916   private synchronized SnapshotSentinel removeSentinelIfFinished(
917       final Map<TableName, SnapshotSentinel> sentinels,
918       final SnapshotDescription snapshot) {
919     if (!snapshot.hasTable()) {
920       return null;
921     }
922 
923     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
924     SnapshotSentinel h = sentinels.get(snapshotTable);
925     if (h == null) {
926       return null;
927     }
928 
929     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
930       // specified snapshot is to the one currently running
931       return null;
932     }
933 
934     // Remove from the "in-progress" list once completed
935     if (h.isFinished()) {
936       sentinels.remove(snapshotTable);
937     }
938 
939     return h;
940   }
941 
942   /**
943    * Removes "abandoned" snapshot/restore requests.
944    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
945    * and the in-progress maps are cleaned up when the status of a completed task is requested.
946    * To avoid having sentinels staying around for long time if something client side is failed,
947    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
948    */
949   private void cleanupSentinels() {
950     cleanupSentinels(this.snapshotHandlers);
951     cleanupSentinels(this.restoreHandlers);
952   }
953 
954   /**
955    * Remove the sentinels that are marked as finished and the completion time
956    * has exceeded the removal timeout.
957    * @param sentinels map of sentinels to clean
958    */
959   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
960     long currentTime = EnvironmentEdgeManager.currentTime();
961     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
962         sentinels.entrySet().iterator();
963     while (it.hasNext()) {
964       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
965       SnapshotSentinel sentinel = entry.getValue();
966       if (sentinel.isFinished() &&
967           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
968       {
969         it.remove();
970       }
971     }
972   }
973 
974   //
975   // Implementing Stoppable interface
976   //
977 
978   @Override
979   public void stop(String why) {
980     // short circuit
981     if (this.stopped) return;
982     // make sure we get stop
983     this.stopped = true;
984     // pass the stop onto take snapshot handlers
985     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
986       snapshotHandler.cancel(why);
987     }
988 
989     // pass the stop onto all the restore handlers
990     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
991       restoreHandler.cancel(why);
992     }
993     try {
994       if (coordinator != null) {
995         coordinator.close();
996       }
997     } catch (IOException e) {
998       LOG.error("stop ProcedureCoordinator error", e);
999     }
1000   }
1001 
1002   @Override
1003   public boolean isStopped() {
1004     return this.stopped;
1005   }
1006 
1007   /**
1008    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1009    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1010    * @throws UnsupportedOperationException if snapshot are not supported
1011    */
1012   public void checkSnapshotSupport() throws UnsupportedOperationException {
1013     if (!this.isSnapshotSupported) {
1014       throw new UnsupportedOperationException(
1015         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1016           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1017     }
1018   }
1019 
1020   /**
1021    * Called at startup, to verify if snapshot operation is supported, and to avoid
1022    * starting the master if there're snapshots present but the cleaners needed are missing.
1023    * Otherwise we can end up with snapshot data loss.
1024    * @param conf The {@link Configuration} object to use
1025    * @param mfs The MasterFileSystem to use
1026    * @throws IOException in case of file-system operation failure
1027    * @throws UnsupportedOperationException in case cleaners are missing and
1028    *         there're snapshot in the system
1029    */
1030   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1031       throws IOException, UnsupportedOperationException {
1032     // Verify if snapshot is disabled by the user
1033     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1034     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1035     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1036 
1037     // Extract cleaners from conf
1038     Set<String> hfileCleaners = new HashSet<String>();
1039     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1040     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1041 
1042     Set<String> logCleaners = new HashSet<String>();
1043     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1044     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1045 
1046     // check if an older version of snapshot directory was present
1047     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1048     FileSystem fs = mfs.getFileSystem();
1049     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1050     if (ss != null && !ss.isEmpty()) {
1051       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1052       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1053     }
1054 
1055     // If the user has enabled the snapshot, we force the cleaners to be present
1056     // otherwise we still need to check if cleaners are enabled or not and verify
1057     // that there're no snapshot in the .snapshot folder.
1058     if (snapshotEnabled) {
1059       // Inject snapshot cleaners, if snapshot.enable is true
1060       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1061       hfileCleaners.add(HFileLinkCleaner.class.getName());
1062       logCleaners.add(SnapshotLogCleaner.class.getName());
1063 
1064       // Set cleaners conf
1065       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1066         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1067       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1068         logCleaners.toArray(new String[logCleaners.size()]));
1069     } else {
1070       // Verify if cleaners are present
1071       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
1072         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1073         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1074 
1075       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1076       if (snapshotEnabled) {
1077         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1078           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1079           (userDisabled ? "is set to 'false'." : "is not set."));
1080       }
1081     }
1082 
1083     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1084     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1085 
1086     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1087     // otherwise we end up with snapshot data loss.
1088     if (!snapshotEnabled) {
1089       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1090       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1091       if (fs.exists(snapshotDir)) {
1092         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1093           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1094         if (snapshots != null) {
1095           LOG.error("Snapshots are present, but cleaners are not enabled.");
1096           checkSnapshotSupport();
1097         }
1098       }
1099     }
1100   }
1101 
1102   @Override
1103   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1104       IOException, UnsupportedOperationException {
1105     this.master = master;
1106 
1107     this.rootDir = master.getMasterFileSystem().getRootDir();
1108     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1109 
1110     // get the configuration for the coordinator
1111     Configuration conf = master.getConfiguration();
1112     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1113     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1114                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1115             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1116                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1117     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1118 
1119     // setup the default procedure coordinator
1120     String name = master.getServerName().toString();
1121     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1122     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1123         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1124 
1125     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1126     this.executorService = master.getExecutorService();
1127     resetTempDir();
1128   }
1129 
1130   @Override
1131   public String getProcedureSignature() {
1132     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1133   }
1134 
1135   @Override
1136   public void execProcedure(ProcedureDescription desc) throws IOException {
1137     takeSnapshot(toSnapshotDescription(desc));
1138   }
1139 
1140   @Override
1141   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1142     return isSnapshotDone(toSnapshotDescription(desc));
1143   }
1144 
1145   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1146       throws IOException {
1147     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1148     if (!desc.hasInstance()) {
1149       throw new IOException("Snapshot name is not defined: " + desc.toString());
1150     }
1151     String snapshotName = desc.getInstance();
1152     List<NameStringPair> props = desc.getConfigurationList();
1153     String table = null;
1154     for (NameStringPair prop : props) {
1155       if ("table".equalsIgnoreCase(prop.getName())) {
1156         table = prop.getValue();
1157       }
1158     }
1159     if (table == null) {
1160       throw new IOException("Snapshot table is not defined: " + desc.toString());
1161     }
1162     TableName tableName = TableName.valueOf(table);
1163     builder.setTable(tableName.getNameAsString());
1164     builder.setName(snapshotName);
1165     builder.setType(SnapshotDescription.Type.FLUSH);
1166     return builder.build();
1167   }
1168 }