View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.ConcurrentMap;
32  import java.util.concurrent.ThreadPoolExecutor;
33  import java.util.concurrent.locks.ReentrantReadWriteLock;
34
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FileStatus;
40  import org.apache.hadoop.fs.FileSystem;
41  import org.apache.hadoop.fs.Path;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.MetaTableAccessor;
46  import org.apache.hadoop.hbase.Stoppable;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.classification.InterfaceAudience;
49  import org.apache.hadoop.hbase.classification.InterfaceStability;
50  import org.apache.hadoop.hbase.client.TableState;
51  import org.apache.hadoop.hbase.errorhandling.ForeignException;
52  import org.apache.hadoop.hbase.executor.ExecutorService;
53  import org.apache.hadoop.hbase.ipc.RpcServer;
54  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
55  import org.apache.hadoop.hbase.master.MasterFileSystem;
56  import org.apache.hadoop.hbase.master.MasterServices;
57  import org.apache.hadoop.hbase.master.MetricsMaster;
58  import org.apache.hadoop.hbase.master.SnapshotSentinel;
59  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
60  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
61  import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
62  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
63  import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
64  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
65  import org.apache.hadoop.hbase.procedure.Procedure;
66  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
67  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
68  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
69  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
70  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
73  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
74  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
75  import org.apache.hadoop.hbase.security.AccessDeniedException;
76  import org.apache.hadoop.hbase.security.User;
77  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
78  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
79  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
80  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
81  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
82  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
83  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
84  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
85  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
86  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
87  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
88  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89  import org.apache.hadoop.hbase.util.FSUtils;
90  import org.apache.hadoop.hbase.util.KeyLocker;
91  import org.apache.hadoop.hbase.wal.WAL;
92  import org.apache.zookeeper.KeeperException;
93
94  /**
95   * This class manages the procedure of taking and restoring snapshots. There is only one
96   * SnapshotManager for the master.
97   * <p>
98   * The class provides methods for monitoring in-progress snapshot actions.
99   * <p>
100  * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
101  * simplification in the current implementation.
102  */
103 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
104 @InterfaceStability.Unstable
105 public class SnapshotManager extends MasterProcedureManager implements Stoppable {
106   private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
107
108   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
109   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
110
111   /**
112    * Wait time before removing a finished sentinel from the in-progress map
113    *
114    * NOTE: This is used as a safety auto cleanup.
115    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
116    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
117    * In case something fails on the client side and the snapshot/restore state is not reclaimed
118    * after a default timeout, the entry is removed from the in-progress map.
119    * At this point, if the user asks for the snapshot/restore status, the result will be
120    * snapshot done if exists or failed if it doesn't exists.
121    */
122   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
123
124   /** Enable or disable snapshot support */
125   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
126
127   /**
128    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
129    * completion.
130    */
131   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
132
133   /** Name of the operation to use in the controller */
134   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
135
136   /** Conf key for # of threads used by the SnapshotManager thread pool */
137   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
138
139   /** number of current operations running on the master */
140   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
141
142   private boolean stopped;
143   private MasterServices master;  // Needed by TableEventHandlers
144   private ProcedureCoordinator coordinator;
145
146   // Is snapshot feature enabled?
147   private boolean isSnapshotSupported = false;
148
149   // Snapshot handlers map, with table name as key.
150   // The map is always accessed and modified under the object lock using synchronized.
151   // snapshotTable() will insert an Handler in the table.
152   // isSnapshotDone() will remove the handler requested if the operation is finished.
153   private Map<TableName, SnapshotSentinel> snapshotHandlers =
154       new HashMap<TableName, SnapshotSentinel>();
155
156   // Restore map, with table name as key, procedure ID as value.
157   // The map is always accessed and modified under the object lock using synchronized.
158   // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
159   //
160   // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
161   // restart/failover. This is just a stopgap implementation until implementation of taking
162   // snapshot using Procedure-V2.
163   private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<TableName, Long>();
164
165   private Path rootDir;
166   private ExecutorService executorService;
167
168   /**
169    *  Locks for snapshot operations
170    *  key is snapshot's filename in progress, value is the related lock
171    *    - create snapshot
172    *    - SnapshotCleaner
173    * */
174   private KeyLocker<String> locks = new KeyLocker<String>();
175
176
177
178   public SnapshotManager() {}
179
180   /**
181    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
182    * @param master services for the master where the manager is running
183    * @param coordinator procedure coordinator instance.  exposed for testing.
184    * @param pool HBase ExecutorServcie instance, exposed for testing.
185    */
186   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
187       ProcedureCoordinator coordinator, ExecutorService pool)
188       throws IOException, UnsupportedOperationException {
189     this.master = master;
190
191     this.rootDir = master.getMasterFileSystem().getRootDir();
192     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
193
194     this.coordinator = coordinator;
195     this.executorService = pool;
196     resetTempDir();
197   }
198
199   /**
200    * Gets the list of all completed snapshots.
201    * @return list of SnapshotDescriptions
202    * @throws IOException File system exception
203    */
204   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
205     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
206   }
207
208   /**
209    * Gets the list of all completed snapshots.
210    * @param snapshotDir snapshot directory
211    * @return list of SnapshotDescriptions
212    * @throws IOException File system exception
213    */
214   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
215     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
216     // first create the snapshot root path and check to see if it exists
217     FileSystem fs = master.getMasterFileSystem().getFileSystem();
218     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
219
220     // if there are no snapshots, return an empty list
221     if (!fs.exists(snapshotDir)) {
222       return snapshotDescs;
223     }
224
225     // ignore all the snapshots in progress
226     FileStatus[] snapshots = fs.listStatus(snapshotDir,
227       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
228     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
229     // loop through all the completed snapshots
230     for (FileStatus snapshot : snapshots) {
231       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
232       // if the snapshot is bad
233       if (!fs.exists(info)) {
234         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
235         continue;
236       }
237       FSDataInputStream in = null;
238       try {
239         in = fs.open(info);
240         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
241         if (cpHost != null) {
242           try {
243             cpHost.preListSnapshot(desc);
244           } catch (AccessDeniedException e) {
245             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
246                 + "Either you should be owner of this snapshot or admin user.");
247             // Skip this and try for next snapshot
248             continue;
249           }
250         }
251         snapshotDescs.add(desc);
252
253         // call coproc post hook
254         if (cpHost != null) {
255           cpHost.postListSnapshot(desc);
256         }
257       } catch (IOException e) {
258         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
259       } finally {
260         if (in != null) {
261           in.close();
262         }
263       }
264     }
265     return snapshotDescs;
266   }
267
268   /**
269    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
270    * snapshot attempts.
271    *
272    * @throws IOException if we can't reach the filesystem
273    */
274   void resetTempDir() throws IOException {
275     // cleanup any existing snapshots.
276     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
277     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
278       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
279         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
280       }
281     }
282   }
283
284   /**
285    * Delete the specified snapshot
286    * @param snapshot
287    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
288    * @throws IOException For filesystem IOExceptions
289    */
290   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
291     // check to see if it is completed
292     if (!isSnapshotCompleted(snapshot)) {
293       throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
294     }
295
296     String snapshotName = snapshot.getName();
297     // first create the snapshot description and check to see if it exists
298     FileSystem fs = master.getMasterFileSystem().getFileSystem();
299     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
300     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
301     // just the "name" and it does not contains the "real" snapshot information
302     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
303
304     // call coproc pre hook
305     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
306     if (cpHost != null) {
307       cpHost.preDeleteSnapshot(snapshot);
308     }
309
310     LOG.debug("Deleting snapshot: " + snapshotName);
311     // delete the existing snapshot
312     if (!fs.delete(snapshotDir, true)) {
313       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
314     }
315
316     // call coproc post hook
317     if (cpHost != null) {
318       cpHost.postDeleteSnapshot(snapshot);
319     }
320
321   }
322
323   /**
324    * Check if the specified snapshot is done
325    *
326    * @param expected
327    * @return true if snapshot is ready to be restored, false if it is still being taken.
328    * @throws IOException IOException if error from HDFS or RPC
329    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
330    */
331   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
332     // check the request to make sure it has a snapshot
333     if (expected == null) {
334       throw new UnknownSnapshotException(
335          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
336     }
337
338     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
339
340     // check to see if the sentinel exists,
341     // and if the task is complete removes it from the in-progress snapshots map.
342     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
343
344     // stop tracking "abandoned" handlers
345     cleanupSentinels();
346
347     if (handler == null) {
348       // If there's no handler in the in-progress map, it means one of the following:
349       //   - someone has already requested the snapshot state
350       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
351       //   - the snapshot was never requested
352       // In those cases returns to the user the "done state" if the snapshots exists on disk,
353       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
354       if (!isSnapshotCompleted(expected)) {
355         throw new UnknownSnapshotException("Snapshot " + ssString
356             + " is not currently running or one of the known completed snapshots.");
357       }
358       // was done, return true;
359       return true;
360     }
361
362     // pass on any failure we find in the sentinel
363     try {
364       handler.rethrowExceptionIfFailed();
365     } catch (ForeignException e) {
366       // Give some procedure info on an exception.
367       String status;
368       Procedure p = coordinator.getProcedure(expected.getName());
369       if (p != null) {
370         status = p.getStatus();
371       } else {
372         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
373       }
374       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
375         ProtobufUtil.createSnapshotDesc(expected));
376     }
377
378     // check to see if we are done
379     if (handler.isFinished()) {
380       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
381       return true;
382     } else if (LOG.isDebugEnabled()) {
383       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
384     }
385     return false;
386   }
387
388   /**
389    * Check to see if there is a snapshot in progress with the same name or on the same table.
390    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
391    * don't allow snapshot with the same name.
392    * @param snapshot description of the snapshot being checked.
393    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
394    *         table.
395    */
396   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
397     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
398     if (isTakingSnapshot(snapshotTable)) {
399       return true;
400     }
401     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
402     while (it.hasNext()) {
403       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
404       SnapshotSentinel sentinel = entry.getValue();
405       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
406         return true;
407       }
408     }
409     return false;
410   }
411
412   /**
413    * Check to see if the specified table has a snapshot in progress.  Currently we have a
414    * limitation only allowing a single snapshot per table at a time.
415    * @param tableName name of the table being snapshotted.
416    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
417    */
418   synchronized boolean isTakingSnapshot(final TableName tableName) {
419     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
420     return handler != null && !handler.isFinished();
421   }
422
423   /**
424    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
425    * aren't already running a snapshot or restore on the requested table.
426    * @param snapshot description of the snapshot we want to start
427    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
428    */
429   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
430       throws HBaseSnapshotException {
431     FileSystem fs = master.getMasterFileSystem().getFileSystem();
432     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
433     TableName snapshotTable =
434         TableName.valueOf(snapshot.getTable());
435
436     // make sure we aren't already running a snapshot
437     if (isTakingSnapshot(snapshot)) {
438       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
439       throw new SnapshotCreationException("Rejected taking "
440           + ClientSnapshotDescriptionUtils.toString(snapshot)
441           + " because we are already running another snapshot "
442           + (handler != null ? ("on the same table " +
443               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
444               : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot));
445     }
446
447     // make sure we aren't running a restore on the same table
448     if (isRestoringTable(snapshotTable)) {
449       throw new SnapshotCreationException("Rejected taking "
450           + ClientSnapshotDescriptionUtils.toString(snapshot)
451           + " because we are already have a restore in progress on the same snapshot.");
452     }
453
454     try {
455       // delete the working directory, since we aren't running the snapshot. Likely leftovers
456       // from a failed attempt.
457       fs.delete(workingDir, true);
458
459       // recreate the working directory for the snapshot
460       if (!fs.mkdirs(workingDir)) {
461         throw new SnapshotCreationException(
462             "Couldn't create working directory (" + workingDir + ") for snapshot",
463             ProtobufUtil.createSnapshotDesc(snapshot));
464       }
465     } catch (HBaseSnapshotException e) {
466       throw e;
467     } catch (IOException e) {
468       throw new SnapshotCreationException(
469           "Exception while checking to see if snapshot could be started.", e,
470           ProtobufUtil.createSnapshotDesc(snapshot));
471     }
472   }
473
474   /**
475    * Take a snapshot of a disabled table.
476    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
477    * @throws HBaseSnapshotException if the snapshot could not be started
478    */
479   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
480       throws HBaseSnapshotException {
481     // setup the snapshot
482     prepareToTakeSnapshot(snapshot);
483
484     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
485     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
486
487     // Take the snapshot of the disabled table
488     DisabledTableSnapshotHandler handler =
489         new DisabledTableSnapshotHandler(snapshot, master, this);
490     snapshotTable(snapshot, handler);
491   }
492
493   /**
494    * Take a snapshot of an enabled table.
495    * @param snapshot description of the snapshot to take.
496    * @throws HBaseSnapshotException if the snapshot could not be started
497    */
498   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
499       throws HBaseSnapshotException {
500     // setup the snapshot
501     prepareToTakeSnapshot(snapshot);
502
503     // Take the snapshot of the enabled table
504     EnabledTableSnapshotHandler handler =
505         new EnabledTableSnapshotHandler(snapshot, master, this);
506     snapshotTable(snapshot, handler);
507   }
508
509   /**
510    * Take a snapshot using the specified handler.
511    * On failure the snapshot temporary working directory is removed.
512    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
513    *       snapshot request if the table is busy with another snapshot/restore operation.
514    * @param snapshot the snapshot description
515    * @param handler the snapshot handler
516    */
517   private synchronized void snapshotTable(SnapshotDescription snapshot,
518       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
519     try {
520       handler.prepare();
521       this.executorService.submit(handler);
522       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
523     } catch (Exception e) {
524       // cleanup the working directory by trying to delete it from the fs.
525       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
526       try {
527         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
528           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
529               ClientSnapshotDescriptionUtils.toString(snapshot));
530         }
531       } catch (IOException e1) {
532         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
533             ClientSnapshotDescriptionUtils.toString(snapshot));
534       }
535       // fail the snapshot
536       throw new SnapshotCreationException("Could not build snapshot handler", e,
537         ProtobufUtil.createSnapshotDesc(snapshot));
538     }
539   }
540
541   /**
542    * Take a snapshot based on the enabled/disabled state of the table.
543    *
544    * @param snapshot
545    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
546    * @throws IOException when some sort of generic IO exception occurs.
547    */
548   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
549     // check to see if we already completed the snapshot
550     if (isSnapshotCompleted(snapshot)) {
551       throw new SnapshotExistsException(
552           "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
553           ProtobufUtil.createSnapshotDesc(snapshot));
554     }
555
556     LOG.debug("No existing snapshot, attempting snapshot...");
557
558     // stop tracking "abandoned" handlers
559     cleanupSentinels();
560
561     // check to see if the table exists
562     HTableDescriptor desc = null;
563     try {
564       desc = master.getTableDescriptors().get(
565           TableName.valueOf(snapshot.getTable()));
566     } catch (FileNotFoundException e) {
567       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
568       LOG.error(msg);
569       throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
570     } catch (IOException e) {
571       throw new SnapshotCreationException(
572           "Error while geting table description for table " + snapshot.getTable(), e,
573           ProtobufUtil.createSnapshotDesc(snapshot));
574     }
575     if (desc == null) {
576       throw new SnapshotCreationException(
577           "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
578           ProtobufUtil.createSnapshotDesc(snapshot));
579     }
580     SnapshotDescription.Builder builder = snapshot.toBuilder();
581     // if not specified, set the snapshot format
582     if (!snapshot.hasVersion()) {
583       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
584     }
585     User user = RpcServer.getRequestUser();
586     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
587       builder.setOwner(user.getShortName());
588     }
589     snapshot = builder.build();
590
591     // call pre coproc hook
592     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
593     if (cpHost != null) {
594       cpHost.preSnapshot(snapshot, desc);
595     }
596
597     // if the table is enabled, then have the RS run actually the snapshot work
598     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
599     if (master.getTableStateManager().isTableState(snapshotTable,
600         TableState.State.ENABLED)) {
601       LOG.debug("Table enabled, starting distributed snapshot.");
602       snapshotEnabledTable(snapshot);
603       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
604     }
605     // For disabled table, snapshot is created by the master
606     else if (master.getTableStateManager().isTableState(snapshotTable,
607         TableState.State.DISABLED)) {
608       LOG.debug("Table is disabled, running snapshot entirely on master.");
609       snapshotDisabledTable(snapshot);
610       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
611     } else {
612       LOG.error("Can't snapshot table '" + snapshot.getTable()
613           + "', isn't open or closed, we don't know what to do!");
614       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
615           + " isn't fully open.");
616       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
617         ProtobufUtil.createSnapshotDesc(snapshot));
618     }
619
620     // call post coproc hook
621     if (cpHost != null) {
622       cpHost.postSnapshot(snapshot, desc);
623     }
624   }
625
626   /**
627    * Set the handler for the current snapshot
628    * <p>
629    * Exposed for TESTING
630    * @param tableName
631    * @param handler handler the master should use
632    *
633    * TODO get rid of this if possible, repackaging, modify tests.
634    */
635   public synchronized void setSnapshotHandlerForTesting(
636       final TableName tableName,
637       final SnapshotSentinel handler) {
638     if (handler != null) {
639       this.snapshotHandlers.put(tableName, handler);
640     } else {
641       this.snapshotHandlers.remove(tableName);
642     }
643   }
644
645   /**
646    * @return distributed commit coordinator for all running snapshots
647    */
648   ProcedureCoordinator getCoordinator() {
649     return coordinator;
650   }
651
652   /**
653    * Check to see if the snapshot is one of the currently completed snapshots
654    * Returns true if the snapshot exists in the "completed snapshots folder".
655    *
656    * @param snapshot expected snapshot to check
657    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
658    *         not stored
659    * @throws IOException if the filesystem throws an unexpected exception,
660    * @throws IllegalArgumentException if snapshot name is invalid.
661    */
662   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
663     try {
664       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
665       FileSystem fs = master.getMasterFileSystem().getFileSystem();
666       // check to see if the snapshot already exists
667       return fs.exists(snapshotDir);
668     } catch (IllegalArgumentException iae) {
669       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
670     }
671   }
672
673   /**
674    * Clone the specified snapshot.
675    * The clone will fail if the destination table has a snapshot or restore in progress.
676    *
677    * @param reqSnapshot Snapshot Descriptor from request
678    * @param tableName table to clone
679    * @param snapshot Snapshot Descriptor
680    * @param snapshotTableDesc Table Descriptor
681    * @param nonceGroup unique value to prevent duplicated RPC
682    * @param nonce unique value to prevent duplicated RPC
683    * @return procId the ID of the clone snapshot procedure
684    * @throws IOException
685    */
686   private long cloneSnapshot(
687       final SnapshotDescription reqSnapshot,
688       final TableName tableName,
689       final SnapshotDescription snapshot,
690       final HTableDescriptor snapshotTableDesc,
691       final long nonceGroup,
692       final long nonce) throws IOException {
693     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
694     HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
695     if (cpHost != null) {
696       cpHost.preCloneSnapshot(reqSnapshot, htd);
697     }
698     long procId;
699     try {
700       procId = cloneSnapshot(snapshot, htd, nonceGroup, nonce);
701     } catch (IOException e) {
702       LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
703         + " as table " + tableName.getNameAsString(), e);
704       throw e;
705     }
706     LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
707
708     if (cpHost != null) {
709       cpHost.postCloneSnapshot(reqSnapshot, htd);
710     }
711     return procId;
712   }
713
714   /**
715    * Clone the specified snapshot into a new table.
716    * The operation will fail if the destination table has a snapshot or restore in progress.
717    *
718    * @param snapshot Snapshot Descriptor
719    * @param hTableDescriptor Table Descriptor of the table to create
720    * @param nonceGroup unique value to prevent duplicated RPC
721    * @param nonce unique value to prevent duplicated RPC
722    * @return procId the ID of the clone snapshot procedure
723    */
724   synchronized long cloneSnapshot(
725       final SnapshotDescription snapshot,
726       final HTableDescriptor hTableDescriptor,
727       final long nonceGroup,
728       final long nonce) throws HBaseSnapshotException {
729     TableName tableName = hTableDescriptor.getTableName();
730
731     // make sure we aren't running a snapshot on the same table
732     if (isTakingSnapshot(tableName)) {
733       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
734     }
735
736     // make sure we aren't running a restore on the same table
737     if (isRestoringTable(tableName)) {
738       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
739     }
740
741     try {
742       long procId = master.getMasterProcedureExecutor().submitProcedure(
743         new CloneSnapshotProcedure(
744           master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
745         nonceGroup,
746         nonce);
747       this.restoreTableToProcIdMap.put(tableName, procId);
748       return procId;
749     } catch (Exception e) {
750       String msg = "Couldn't clone the snapshot="
751         + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
752       LOG.error(msg, e);
753       throw new RestoreSnapshotException(msg, e);
754     }
755   }
756
757   /**
758    * Restore or Clone the specified snapshot
759    * @param reqSnapshot
760    * @param nonceGroup unique value to prevent duplicated RPC
761    * @param nonce unique value to prevent duplicated RPC
762    * @throws IOException
763    */
764   public long restoreOrCloneSnapshot(
765       SnapshotDescription reqSnapshot,
766       final long nonceGroup,
767       final long nonce) throws IOException {
768     FileSystem fs = master.getMasterFileSystem().getFileSystem();
769     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
770
771     // check if the snapshot exists
772     if (!fs.exists(snapshotDir)) {
773       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
774       throw new SnapshotDoesNotExistException(
775         ProtobufUtil.createSnapshotDesc(reqSnapshot));
776     }
777
778     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
779     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
780     // information.
781     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
782     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
783         snapshotDir, snapshot);
784     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
785     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
786
787     // stop tracking "abandoned" handlers
788     cleanupSentinels();
789
790     // Verify snapshot validity
791     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
792
793     // Execute the restore/clone operation
794     long procId;
795     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
796       procId = restoreSnapshot(
797         reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
798     } else {
799       procId = cloneSnapshot(
800         reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
801     }
802     return procId;
803   }
804
805   /**
806    * Restore the specified snapshot.
807    * The restore will fail if the destination table has a snapshot or restore in progress.
808    *
809    * @param reqSnapshot Snapshot Descriptor from request
810    * @param tableName table to restore
811    * @param snapshot Snapshot Descriptor
812    * @param snapshotTableDesc Table Descriptor
813    * @param nonceGroup unique value to prevent duplicated RPC
814    * @param nonce unique value to prevent duplicated RPC
815    * @return procId the ID of the restore snapshot procedure
816    * @throws IOException
817    */
818   private long restoreSnapshot(
819       final SnapshotDescription reqSnapshot,
820       final TableName tableName,
821       final SnapshotDescription snapshot,
822       final HTableDescriptor snapshotTableDesc,
823       final long nonceGroup,
824       final long nonce) throws IOException {
825     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
826
827     if (master.getTableStateManager().isTableState(
828       TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
829       throw new UnsupportedOperationException("Table '" +
830         TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
831         "perform a restore operation.");
832     }
833
834     // call Coprocessor pre hook
835     if (cpHost != null) {
836       cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
837     }
838
839     long procId;
840     try {
841       procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceGroup, nonce);
842     } catch (IOException e) {
843       LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
844         + " as table " + tableName.getNameAsString(), e);
845       throw e;
846     }
847     LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
848
849     if (cpHost != null) {
850       cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
851     }
852
853     return procId;
854   }
855
856   /**
857    * Restore the specified snapshot.
858    * The restore will fail if the destination table has a snapshot or restore in progress.
859    *
860    * @param snapshot Snapshot Descriptor
861    * @param hTableDescriptor Table Descriptor
862    * @param nonceGroup unique value to prevent duplicated RPC
863    * @param nonce unique value to prevent duplicated RPC
864    * @return procId the ID of the restore snapshot procedure
865    */
866   private synchronized long restoreSnapshot(
867       final SnapshotDescription snapshot,
868       final HTableDescriptor hTableDescriptor,
869       final long nonceGroup,
870       final long nonce) throws HBaseSnapshotException {
871     TableName tableName = hTableDescriptor.getTableName();
872
873     // make sure we aren't running a snapshot on the same table
874     if (isTakingSnapshot(tableName)) {
875       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
876     }
877
878     // make sure we aren't running a restore on the same table
879     if (isRestoringTable(tableName)) {
880       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
881     }
882
883     try {
884       long procId = master.getMasterProcedureExecutor().submitProcedure(
885         new RestoreSnapshotProcedure(
886           master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
887         nonceGroup,
888         nonce);
889       this.restoreTableToProcIdMap.put(tableName, procId);
890       return procId;
891     } catch (Exception e) {
892       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
893           snapshot)  +
894           " on table=" + tableName;
895       LOG.error(msg, e);
896       throw new RestoreSnapshotException(msg, e);
897     }
898   }
899
900   /**
901    * Verify if the restore of the specified table is in progress.
902    *
903    * @param tableName table under restore
904    * @return <tt>true</tt> if there is a restore in progress of the specified table.
905    */
906   private synchronized boolean isRestoringTable(final TableName tableName) {
907     Long procId = this.restoreTableToProcIdMap.get(tableName);
908     if (procId == null) {
909       return false;
910     }
911     ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
912     if (procExec.isRunning() && !procExec.isFinished(procId)) {
913       return true;
914     } else {
915       this.restoreTableToProcIdMap.remove(tableName);
916       return false;
917     }
918
919   }
920
921   /**
922    * Return the handler if it is currently live and has the same snapshot target name.
923    * The handler is removed from the sentinels map if completed.
924    * @param sentinels live handlers
925    * @param snapshot snapshot description
926    * @return null if doesn't match, else a live handler.
927    */
928   private synchronized SnapshotSentinel removeSentinelIfFinished(
929       final Map<TableName, SnapshotSentinel> sentinels,
930       final SnapshotDescription snapshot) {
931     if (!snapshot.hasTable()) {
932       return null;
933     }
934
935     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
936     SnapshotSentinel h = sentinels.get(snapshotTable);
937     if (h == null) {
938       return null;
939     }
940
941     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
942       // specified snapshot is to the one currently running
943       return null;
944     }
945
946     // Remove from the "in-progress" list once completed
947     if (h.isFinished()) {
948       sentinels.remove(snapshotTable);
949     }
950
951     return h;
952   }
953
954   /**
955    * Removes "abandoned" snapshot/restore requests.
956    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
957    * and the in-progress maps are cleaned up when the status of a completed task is requested.
958    * To avoid having sentinels staying around for long time if something client side is failed,
959    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
960    */
961   private void cleanupSentinels() {
962     cleanupSentinels(this.snapshotHandlers);
963     cleanupCompletedRestoreInMap();
964   }
965
966   /**
967    * Remove the sentinels that are marked as finished and the completion time
968    * has exceeded the removal timeout.
969    * @param sentinels map of sentinels to clean
970    */
971   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
972     long currentTime = EnvironmentEdgeManager.currentTime();
973     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
974         sentinels.entrySet().iterator();
975     while (it.hasNext()) {
976       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
977       SnapshotSentinel sentinel = entry.getValue();
978       if (sentinel.isFinished() &&
979           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
980       {
981         it.remove();
982       }
983     }
984   }
985
986   /**
987    * Remove the procedures that are marked as finished
988    */
989   private synchronized void cleanupCompletedRestoreInMap() {
990     ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
991     Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
992     while (it.hasNext()) {
993       Map.Entry<TableName, Long> entry = it.next();
994       Long procId = entry.getValue();
995       if (procExec.isRunning() && procExec.isFinished(procId)) {
996         it.remove();
997       }
998     }
999   }
1000
1001   //
1002   // Implementing Stoppable interface
1003   //
1004
1005   @Override
1006   public void stop(String why) {
1007     // short circuit
1008     if (this.stopped) return;
1009     // make sure we get stop
1010     this.stopped = true;
1011     // pass the stop onto take snapshot handlers
1012     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
1013       snapshotHandler.cancel(why);
1014     }
1015
1016     try {
1017       if (coordinator != null) {
1018         coordinator.close();
1019       }
1020     } catch (IOException e) {
1021       LOG.error("stop ProcedureCoordinator error", e);
1022     }
1023   }
1024
1025   @Override
1026   public boolean isStopped() {
1027     return this.stopped;
1028   }
1029
1030   /**
1031    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1032    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1033    * @throws UnsupportedOperationException if snapshot are not supported
1034    */
1035   public void checkSnapshotSupport() throws UnsupportedOperationException {
1036     if (!this.isSnapshotSupported) {
1037       throw new UnsupportedOperationException(
1038         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1039           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1040     }
1041   }
1042
1043   /**
1044    * Called at startup, to verify if snapshot operation is supported, and to avoid
1045    * starting the master if there're snapshots present but the cleaners needed are missing.
1046    * Otherwise we can end up with snapshot data loss.
1047    * @param conf The {@link Configuration} object to use
1048    * @param mfs The MasterFileSystem to use
1049    * @throws IOException in case of file-system operation failure
1050    * @throws UnsupportedOperationException in case cleaners are missing and
1051    *         there're snapshot in the system
1052    */
1053   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1054       throws IOException, UnsupportedOperationException {
1055     // Verify if snapshot is disabled by the user
1056     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1057     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1058     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1059
1060     // Extract cleaners from conf
1061     Set<String> hfileCleaners = new HashSet<String>();
1062     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1063     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1064
1065     Set<String> logCleaners = new HashSet<String>();
1066     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1067     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1068
1069     // check if an older version of snapshot directory was present
1070     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1071     FileSystem fs = mfs.getFileSystem();
1072     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1073     if (ss != null && !ss.isEmpty()) {
1074       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1075       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1076     }
1077
1078     // If the user has enabled the snapshot, we force the cleaners to be present
1079     // otherwise we still need to check if cleaners are enabled or not and verify
1080     // that there're no snapshot in the .snapshot folder.
1081     if (snapshotEnabled) {
1082       // Inject snapshot cleaners, if snapshot.enable is true
1083       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1084       hfileCleaners.add(HFileLinkCleaner.class.getName());
1085
1086       // Set cleaners conf
1087       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1088         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1089       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1090         logCleaners.toArray(new String[logCleaners.size()]));
1091     } else {
1092       // Verify if cleaners are present
1093       snapshotEnabled =
1094         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1095         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1096
1097       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1098       if (snapshotEnabled) {
1099         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1100           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1101           (userDisabled ? "is set to 'false'." : "is not set."));
1102       }
1103     }
1104
1105     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1106     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1107
1108     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1109     // otherwise we end up with snapshot data loss.
1110     if (!snapshotEnabled) {
1111       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1112       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1113       if (fs.exists(snapshotDir)) {
1114         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1115           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1116         if (snapshots != null) {
1117           LOG.error("Snapshots are present, but cleaners are not enabled.");
1118           checkSnapshotSupport();
1119         }
1120       }
1121     }
1122   }
1123
1124   @Override
1125   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1126       IOException, UnsupportedOperationException {
1127     this.master = master;
1128
1129     this.rootDir = master.getMasterFileSystem().getRootDir();
1130     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1131
1132     // get the configuration for the coordinator
1133     Configuration conf = master.getConfiguration();
1134     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1135     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1136                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1137             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1138                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1139     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1140
1141     // setup the default procedure coordinator
1142     String name = master.getServerName().toString();
1143     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1144     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1145         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1146
1147     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1148     this.executorService = master.getExecutorService();
1149     resetTempDir();
1150   }
1151
1152   @Override
1153   public String getProcedureSignature() {
1154     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1155   }
1156
1157   @Override
1158   public void execProcedure(ProcedureDescription desc) throws IOException {
1159     takeSnapshot(toSnapshotDescription(desc));
1160   }
1161
1162   @Override
1163   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1164     return isSnapshotDone(toSnapshotDescription(desc));
1165   }
1166
1167   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1168       throws IOException {
1169     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1170     if (!desc.hasInstance()) {
1171       throw new IOException("Snapshot name is not defined: " + desc.toString());
1172     }
1173     String snapshotName = desc.getInstance();
1174     List<NameStringPair> props = desc.getConfigurationList();
1175     String table = null;
1176     for (NameStringPair prop : props) {
1177       if ("table".equalsIgnoreCase(prop.getName())) {
1178         table = prop.getValue();
1179       }
1180     }
1181     if (table == null) {
1182       throw new IOException("Snapshot table is not defined: " + desc.toString());
1183     }
1184     TableName tableName = TableName.valueOf(table);
1185     builder.setTable(tableName.getNameAsString());
1186     builder.setName(snapshotName);
1187     builder.setType(SnapshotDescription.Type.FLUSH);
1188     return builder.build();
1189   }
1190
1191   public KeyLocker<String> getLocks() {
1192     return locks;
1193   }
1194
1195 }