View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MetaTableAccessor;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.TableState;
48  import org.apache.hadoop.hbase.errorhandling.ForeignException;
49  import org.apache.hadoop.hbase.executor.ExecutorService;
50  import org.apache.hadoop.hbase.ipc.RpcServer;
51  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
52  import org.apache.hadoop.hbase.master.MasterFileSystem;
53  import org.apache.hadoop.hbase.master.MasterServices;
54  import org.apache.hadoop.hbase.master.MetricsMaster;
55  import org.apache.hadoop.hbase.master.SnapshotSentinel;
56  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
57  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
58  import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
59  import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
60  import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
61  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
62  import org.apache.hadoop.hbase.procedure.Procedure;
63  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
64  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
65  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
66  import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
67  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
68  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
69  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
70  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
72  import org.apache.hadoop.hbase.security.AccessDeniedException;
73  import org.apache.hadoop.hbase.security.User;
74  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
75  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
76  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
77  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
78  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
79  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
80  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
81  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
82  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
83  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
84  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
85  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
86  import org.apache.hadoop.hbase.util.FSUtils;
87  import org.apache.zookeeper.KeeperException;
88
89  /**
90   * This class manages the procedure of taking and restoring snapshots. There is only one
91   * SnapshotManager for the master.
92   * <p>
93   * The class provides methods for monitoring in-progress snapshot actions.
94   * <p>
95   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
96   * simplification in the current implementation.
97   */
98  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
99  @InterfaceStability.Unstable
100 public class SnapshotManager extends MasterProcedureManager implements Stoppable {
101   private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
102
103   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
104   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
105
106   /**
107    * Wait time before removing a finished sentinel from the in-progress map
108    *
109    * NOTE: This is used as a safety auto cleanup.
110    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
111    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
112    * In case something fails on the client side and the snapshot/restore state is not reclaimed
113    * after a default timeout, the entry is removed from the in-progress map.
114    * At this point, if the user asks for the snapshot/restore status, the result will be
115    * snapshot done if exists or failed if it doesn't exists.
116    */
117   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
118
119   /** Enable or disable snapshot support */
120   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
121
122   /**
123    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
124    * completion.
125    */
126   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
127
128   /** Name of the operation to use in the controller */
129   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
130
131   /** Conf key for # of threads used by the SnapshotManager thread pool */
132   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
133
134   /** number of current operations running on the master */
135   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
136
137   private boolean stopped;
138   private MasterServices master;  // Needed by TableEventHandlers
139   private ProcedureCoordinator coordinator;
140
141   // Is snapshot feature enabled?
142   private boolean isSnapshotSupported = false;
143
144   // Snapshot handlers map, with table name as key.
145   // The map is always accessed and modified under the object lock using synchronized.
146   // snapshotTable() will insert an Handler in the table.
147   // isSnapshotDone() will remove the handler requested if the operation is finished.
148   private Map<TableName, SnapshotSentinel> snapshotHandlers =
149       new HashMap<TableName, SnapshotSentinel>();
150
151   // Restore map, with table name as key, procedure ID as value.
152   // The map is always accessed and modified under the object lock using synchronized.
153   // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
154   //
155   // TODO: just as the Apache HBase 1.x implementation, this map would not survive master
156   // restart/failover. This is just a stopgap implementation until implementation of taking
157   // snapshot using Procedure-V2.
158   private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<TableName, Long>();
159
160   private Path rootDir;
161   private ExecutorService executorService;
162
163   public SnapshotManager() {}
164
165   /**
166    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
167    * @param master services for the master where the manager is running
168    * @param coordinator procedure coordinator instance.  exposed for testing.
169    * @param pool HBase ExecutorServcie instance, exposed for testing.
170    */
171   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
172       ProcedureCoordinator coordinator, ExecutorService pool)
173       throws IOException, UnsupportedOperationException {
174     this.master = master;
175
176     this.rootDir = master.getMasterFileSystem().getRootDir();
177     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
178
179     this.coordinator = coordinator;
180     this.executorService = pool;
181     resetTempDir();
182   }
183
184   /**
185    * Gets the list of all completed snapshots.
186    * @return list of SnapshotDescriptions
187    * @throws IOException File system exception
188    */
189   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
190     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
191   }
192
193   /**
194    * Gets the list of all completed snapshots.
195    * @param snapshotDir snapshot directory
196    * @return list of SnapshotDescriptions
197    * @throws IOException File system exception
198    */
199   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
200     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
201     // first create the snapshot root path and check to see if it exists
202     FileSystem fs = master.getMasterFileSystem().getFileSystem();
203     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
204
205     // if there are no snapshots, return an empty list
206     if (!fs.exists(snapshotDir)) {
207       return snapshotDescs;
208     }
209
210     // ignore all the snapshots in progress
211     FileStatus[] snapshots = fs.listStatus(snapshotDir,
212       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
213     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
214     // loop through all the completed snapshots
215     for (FileStatus snapshot : snapshots) {
216       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
217       // if the snapshot is bad
218       if (!fs.exists(info)) {
219         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
220         continue;
221       }
222       FSDataInputStream in = null;
223       try {
224         in = fs.open(info);
225         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
226         if (cpHost != null) {
227           try {
228             cpHost.preListSnapshot(desc);
229           } catch (AccessDeniedException e) {
230             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
231                 + "Either you should be owner of this snapshot or admin user.");
232             // Skip this and try for next snapshot
233             continue;
234           }
235         }
236         snapshotDescs.add(desc);
237
238         // call coproc post hook
239         if (cpHost != null) {
240           cpHost.postListSnapshot(desc);
241         }
242       } catch (IOException e) {
243         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
244       } finally {
245         if (in != null) {
246           in.close();
247         }
248       }
249     }
250     return snapshotDescs;
251   }
252
253   /**
254    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
255    * snapshot attempts.
256    *
257    * @throws IOException if we can't reach the filesystem
258    */
259   void resetTempDir() throws IOException {
260     // cleanup any existing snapshots.
261     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
262     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
263       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
264         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
265       }
266     }
267   }
268
269   /**
270    * Delete the specified snapshot
271    * @param snapshot
272    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
273    * @throws IOException For filesystem IOExceptions
274    */
275   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
276     // check to see if it is completed
277     if (!isSnapshotCompleted(snapshot)) {
278       throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
279     }
280
281     String snapshotName = snapshot.getName();
282     // first create the snapshot description and check to see if it exists
283     FileSystem fs = master.getMasterFileSystem().getFileSystem();
284     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
285     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
286     // just the "name" and it does not contains the "real" snapshot information
287     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
288
289     // call coproc pre hook
290     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
291     if (cpHost != null) {
292       cpHost.preDeleteSnapshot(snapshot);
293     }
294
295     LOG.debug("Deleting snapshot: " + snapshotName);
296     // delete the existing snapshot
297     if (!fs.delete(snapshotDir, true)) {
298       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
299     }
300
301     // call coproc post hook
302     if (cpHost != null) {
303       cpHost.postDeleteSnapshot(snapshot);
304     }
305
306   }
307
308   /**
309    * Check if the specified snapshot is done
310    *
311    * @param expected
312    * @return true if snapshot is ready to be restored, false if it is still being taken.
313    * @throws IOException IOException if error from HDFS or RPC
314    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
315    */
316   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
317     // check the request to make sure it has a snapshot
318     if (expected == null) {
319       throw new UnknownSnapshotException(
320          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
321     }
322
323     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
324
325     // check to see if the sentinel exists,
326     // and if the task is complete removes it from the in-progress snapshots map.
327     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
328
329     // stop tracking "abandoned" handlers
330     cleanupSentinels();
331
332     if (handler == null) {
333       // If there's no handler in the in-progress map, it means one of the following:
334       //   - someone has already requested the snapshot state
335       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
336       //   - the snapshot was never requested
337       // In those cases returns to the user the "done state" if the snapshots exists on disk,
338       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
339       if (!isSnapshotCompleted(expected)) {
340         throw new UnknownSnapshotException("Snapshot " + ssString
341             + " is not currently running or one of the known completed snapshots.");
342       }
343       // was done, return true;
344       return true;
345     }
346
347     // pass on any failure we find in the sentinel
348     try {
349       handler.rethrowExceptionIfFailed();
350     } catch (ForeignException e) {
351       // Give some procedure info on an exception.
352       String status;
353       Procedure p = coordinator.getProcedure(expected.getName());
354       if (p != null) {
355         status = p.getStatus();
356       } else {
357         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
358       }
359       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
360         ProtobufUtil.createSnapshotDesc(expected));
361     }
362
363     // check to see if we are done
364     if (handler.isFinished()) {
365       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
366       return true;
367     } else if (LOG.isDebugEnabled()) {
368       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
369     }
370     return false;
371   }
372
373   /**
374    * Check to see if there is a snapshot in progress with the same name or on the same table.
375    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
376    * don't allow snapshot with the same name.
377    * @param snapshot description of the snapshot being checked.
378    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
379    *         table.
380    */
381   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
382     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
383     if (isTakingSnapshot(snapshotTable)) {
384       return true;
385     }
386     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
387     while (it.hasNext()) {
388       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
389       SnapshotSentinel sentinel = entry.getValue();
390       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
391         return true;
392       }
393     }
394     return false;
395   }
396
397   /**
398    * Check to see if the specified table has a snapshot in progress.  Currently we have a
399    * limitation only allowing a single snapshot per table at a time.
400    * @param tableName name of the table being snapshotted.
401    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
402    */
403   synchronized boolean isTakingSnapshot(final TableName tableName) {
404     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
405     return handler != null && !handler.isFinished();
406   }
407
408   /**
409    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
410    * aren't already running a snapshot or restore on the requested table.
411    * @param snapshot description of the snapshot we want to start
412    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
413    */
414   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
415       throws HBaseSnapshotException {
416     FileSystem fs = master.getMasterFileSystem().getFileSystem();
417     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
418     TableName snapshotTable =
419         TableName.valueOf(snapshot.getTable());
420
421     // make sure we aren't already running a snapshot
422     if (isTakingSnapshot(snapshot)) {
423       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
424       throw new SnapshotCreationException("Rejected taking "
425           + ClientSnapshotDescriptionUtils.toString(snapshot)
426           + " because we are already running another snapshot "
427           + (handler != null ? ("on the same table " +
428               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
429               : "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot));
430     }
431
432     // make sure we aren't running a restore on the same table
433     if (isRestoringTable(snapshotTable)) {
434       throw new SnapshotCreationException("Rejected taking "
435           + ClientSnapshotDescriptionUtils.toString(snapshot)
436           + " because we are already have a restore in progress on the same snapshot.");
437     }
438
439     try {
440       // delete the working directory, since we aren't running the snapshot. Likely leftovers
441       // from a failed attempt.
442       fs.delete(workingDir, true);
443
444       // recreate the working directory for the snapshot
445       if (!fs.mkdirs(workingDir)) {
446         throw new SnapshotCreationException(
447             "Couldn't create working directory (" + workingDir + ") for snapshot",
448             ProtobufUtil.createSnapshotDesc(snapshot));
449       }
450     } catch (HBaseSnapshotException e) {
451       throw e;
452     } catch (IOException e) {
453       throw new SnapshotCreationException(
454           "Exception while checking to see if snapshot could be started.", e,
455           ProtobufUtil.createSnapshotDesc(snapshot));
456     }
457   }
458
459   /**
460    * Take a snapshot of a disabled table.
461    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
462    * @throws HBaseSnapshotException if the snapshot could not be started
463    */
464   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
465       throws HBaseSnapshotException {
466     // setup the snapshot
467     prepareToTakeSnapshot(snapshot);
468
469     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
470     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
471
472     // Take the snapshot of the disabled table
473     DisabledTableSnapshotHandler handler =
474         new DisabledTableSnapshotHandler(snapshot, master);
475     snapshotTable(snapshot, handler);
476   }
477
478   /**
479    * Take a snapshot of an enabled table.
480    * @param snapshot description of the snapshot to take.
481    * @throws HBaseSnapshotException if the snapshot could not be started
482    */
483   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
484       throws HBaseSnapshotException {
485     // setup the snapshot
486     prepareToTakeSnapshot(snapshot);
487
488     // Take the snapshot of the enabled table
489     EnabledTableSnapshotHandler handler =
490         new EnabledTableSnapshotHandler(snapshot, master, this);
491     snapshotTable(snapshot, handler);
492   }
493
494   /**
495    * Take a snapshot using the specified handler.
496    * On failure the snapshot temporary working directory is removed.
497    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
498    *       snapshot request if the table is busy with another snapshot/restore operation.
499    * @param snapshot the snapshot description
500    * @param handler the snapshot handler
501    */
502   private synchronized void snapshotTable(SnapshotDescription snapshot,
503       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
504     try {
505       handler.prepare();
506       this.executorService.submit(handler);
507       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
508     } catch (Exception e) {
509       // cleanup the working directory by trying to delete it from the fs.
510       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
511       try {
512         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
513           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
514               ClientSnapshotDescriptionUtils.toString(snapshot));
515         }
516       } catch (IOException e1) {
517         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
518             ClientSnapshotDescriptionUtils.toString(snapshot));
519       }
520       // fail the snapshot
521       throw new SnapshotCreationException("Could not build snapshot handler", e,
522         ProtobufUtil.createSnapshotDesc(snapshot));
523     }
524   }
525
526   /**
527    * Take a snapshot based on the enabled/disabled state of the table.
528    *
529    * @param snapshot
530    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
531    * @throws IOException when some sort of generic IO exception occurs.
532    */
533   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
534     // check to see if we already completed the snapshot
535     if (isSnapshotCompleted(snapshot)) {
536       throw new SnapshotExistsException(
537           "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
538           ProtobufUtil.createSnapshotDesc(snapshot));
539     }
540
541     LOG.debug("No existing snapshot, attempting snapshot...");
542
543     // stop tracking "abandoned" handlers
544     cleanupSentinels();
545
546     // check to see if the table exists
547     HTableDescriptor desc = null;
548     try {
549       desc = master.getTableDescriptors().get(
550           TableName.valueOf(snapshot.getTable()));
551     } catch (FileNotFoundException e) {
552       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
553       LOG.error(msg);
554       throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
555     } catch (IOException e) {
556       throw new SnapshotCreationException(
557           "Error while geting table description for table " + snapshot.getTable(), e,
558           ProtobufUtil.createSnapshotDesc(snapshot));
559     }
560     if (desc == null) {
561       throw new SnapshotCreationException(
562           "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
563           ProtobufUtil.createSnapshotDesc(snapshot));
564     }
565     SnapshotDescription.Builder builder = snapshot.toBuilder();
566     // if not specified, set the snapshot format
567     if (!snapshot.hasVersion()) {
568       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
569     }
570     User user = RpcServer.getRequestUser();
571     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
572       builder.setOwner(user.getShortName());
573     }
574     snapshot = builder.build();
575
576     // call pre coproc hook
577     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
578     if (cpHost != null) {
579       cpHost.preSnapshot(snapshot, desc);
580     }
581
582     // if the table is enabled, then have the RS run actually the snapshot work
583     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
584     if (master.getTableStateManager().isTableState(snapshotTable,
585         TableState.State.ENABLED)) {
586       LOG.debug("Table enabled, starting distributed snapshot.");
587       snapshotEnabledTable(snapshot);
588       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
589     }
590     // For disabled table, snapshot is created by the master
591     else if (master.getTableStateManager().isTableState(snapshotTable,
592         TableState.State.DISABLED)) {
593       LOG.debug("Table is disabled, running snapshot entirely on master.");
594       snapshotDisabledTable(snapshot);
595       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
596     } else {
597       LOG.error("Can't snapshot table '" + snapshot.getTable()
598           + "', isn't open or closed, we don't know what to do!");
599       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
600           + " isn't fully open.");
601       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
602         ProtobufUtil.createSnapshotDesc(snapshot));
603     }
604
605     // call post coproc hook
606     if (cpHost != null) {
607       cpHost.postSnapshot(snapshot, desc);
608     }
609   }
610
611   /**
612    * Set the handler for the current snapshot
613    * <p>
614    * Exposed for TESTING
615    * @param tableName
616    * @param handler handler the master should use
617    *
618    * TODO get rid of this if possible, repackaging, modify tests.
619    */
620   public synchronized void setSnapshotHandlerForTesting(
621       final TableName tableName,
622       final SnapshotSentinel handler) {
623     if (handler != null) {
624       this.snapshotHandlers.put(tableName, handler);
625     } else {
626       this.snapshotHandlers.remove(tableName);
627     }
628   }
629
630   /**
631    * @return distributed commit coordinator for all running snapshots
632    */
633   ProcedureCoordinator getCoordinator() {
634     return coordinator;
635   }
636
637   /**
638    * Check to see if the snapshot is one of the currently completed snapshots
639    * Returns true if the snapshot exists in the "completed snapshots folder".
640    *
641    * @param snapshot expected snapshot to check
642    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
643    *         not stored
644    * @throws IOException if the filesystem throws an unexpected exception,
645    * @throws IllegalArgumentException if snapshot name is invalid.
646    */
647   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
648     try {
649       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
650       FileSystem fs = master.getMasterFileSystem().getFileSystem();
651       // check to see if the snapshot already exists
652       return fs.exists(snapshotDir);
653     } catch (IllegalArgumentException iae) {
654       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
655     }
656   }
657
658   /**
659    * Clone the specified snapshot.
660    * The clone will fail if the destination table has a snapshot or restore in progress.
661    *
662    * @param reqSnapshot Snapshot Descriptor from request
663    * @param tableName table to clone
664    * @param snapshot Snapshot Descriptor
665    * @param snapshotTableDesc Table Descriptor
666    * @param nonceGroup unique value to prevent duplicated RPC
667    * @param nonce unique value to prevent duplicated RPC
668    * @return procId the ID of the clone snapshot procedure
669    * @throws IOException
670    */
671   private long cloneSnapshot(
672       final SnapshotDescription reqSnapshot,
673       final TableName tableName,
674       final SnapshotDescription snapshot,
675       final HTableDescriptor snapshotTableDesc,
676       final long nonceGroup,
677       final long nonce) throws IOException {
678     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
679     HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
680     if (cpHost != null) {
681       cpHost.preCloneSnapshot(reqSnapshot, htd);
682     }
683     long procId;
684     try {
685       procId = cloneSnapshot(snapshot, htd, nonceGroup, nonce);
686     } catch (IOException e) {
687       LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
688         + " as table " + tableName.getNameAsString(), e);
689       throw e;
690     }
691     LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
692
693     if (cpHost != null) {
694       cpHost.postCloneSnapshot(reqSnapshot, htd);
695     }
696     return procId;
697   }
698
699   /**
700    * Clone the specified snapshot into a new table.
701    * The operation will fail if the destination table has a snapshot or restore in progress.
702    *
703    * @param snapshot Snapshot Descriptor
704    * @param hTableDescriptor Table Descriptor of the table to create
705    * @param nonceGroup unique value to prevent duplicated RPC
706    * @param nonce unique value to prevent duplicated RPC
707    * @return procId the ID of the clone snapshot procedure
708    */
709   synchronized long cloneSnapshot(
710       final SnapshotDescription snapshot,
711       final HTableDescriptor hTableDescriptor,
712       final long nonceGroup,
713       final long nonce) throws HBaseSnapshotException {
714     TableName tableName = hTableDescriptor.getTableName();
715
716     // make sure we aren't running a snapshot on the same table
717     if (isTakingSnapshot(tableName)) {
718       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
719     }
720
721     // make sure we aren't running a restore on the same table
722     if (isRestoringTable(tableName)) {
723       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
724     }
725
726     try {
727       long procId = master.getMasterProcedureExecutor().submitProcedure(
728         new CloneSnapshotProcedure(
729           master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
730         nonceGroup,
731         nonce);
732       this.restoreTableToProcIdMap.put(tableName, procId);
733       return procId;
734     } catch (Exception e) {
735       String msg = "Couldn't clone the snapshot="
736         + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
737       LOG.error(msg, e);
738       throw new RestoreSnapshotException(msg, e);
739     }
740   }
741
742   /**
743    * Restore or Clone the specified snapshot
744    * @param reqSnapshot
745    * @param nonceGroup unique value to prevent duplicated RPC
746    * @param nonce unique value to prevent duplicated RPC
747    * @throws IOException
748    */
749   public long restoreOrCloneSnapshot(
750       SnapshotDescription reqSnapshot,
751       final long nonceGroup,
752       final long nonce) throws IOException {
753     FileSystem fs = master.getMasterFileSystem().getFileSystem();
754     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
755
756     // check if the snapshot exists
757     if (!fs.exists(snapshotDir)) {
758       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
759       throw new SnapshotDoesNotExistException(
760         ProtobufUtil.createSnapshotDesc(reqSnapshot));
761     }
762
763     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
764     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
765     // information.
766     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
767     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
768         snapshotDir, snapshot);
769     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
770     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
771
772     // stop tracking "abandoned" handlers
773     cleanupSentinels();
774
775     // Verify snapshot validity
776     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
777
778     // Execute the restore/clone operation
779     long procId;
780     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
781       procId = restoreSnapshot(
782         reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
783     } else {
784       procId = cloneSnapshot(
785         reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceGroup, nonce);
786     }
787     return procId;
788   }
789
790   /**
791    * Restore the specified snapshot.
792    * The restore will fail if the destination table has a snapshot or restore in progress.
793    *
794    * @param reqSnapshot Snapshot Descriptor from request
795    * @param tableName table to restore
796    * @param snapshot Snapshot Descriptor
797    * @param snapshotTableDesc Table Descriptor
798    * @param nonceGroup unique value to prevent duplicated RPC
799    * @param nonce unique value to prevent duplicated RPC
800    * @return procId the ID of the restore snapshot procedure
801    * @throws IOException
802    */
803   private long restoreSnapshot(
804       final SnapshotDescription reqSnapshot,
805       final TableName tableName,
806       final SnapshotDescription snapshot,
807       final HTableDescriptor snapshotTableDesc,
808       final long nonceGroup,
809       final long nonce) throws IOException {
810     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
811
812     if (master.getTableStateManager().isTableState(
813       TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
814       throw new UnsupportedOperationException("Table '" +
815         TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
816         "perform a restore operation.");
817     }
818
819     // call Coprocessor pre hook
820     if (cpHost != null) {
821       cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
822     }
823
824     long procId;
825     try {
826       procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceGroup, nonce);
827     } catch (IOException e) {
828       LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
829         + " as table " + tableName.getNameAsString(), e);
830       throw e;
831     }
832     LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
833
834     if (cpHost != null) {
835       cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
836     }
837
838     return procId;
839   }
840
841   /**
842    * Restore the specified snapshot.
843    * The restore will fail if the destination table has a snapshot or restore in progress.
844    *
845    * @param snapshot Snapshot Descriptor
846    * @param hTableDescriptor Table Descriptor
847    * @param nonceGroup unique value to prevent duplicated RPC
848    * @param nonce unique value to prevent duplicated RPC
849    * @return procId the ID of the restore snapshot procedure
850    */
851   private synchronized long restoreSnapshot(
852       final SnapshotDescription snapshot,
853       final HTableDescriptor hTableDescriptor,
854       final long nonceGroup,
855       final long nonce) throws HBaseSnapshotException {
856     TableName tableName = hTableDescriptor.getTableName();
857
858     // make sure we aren't running a snapshot on the same table
859     if (isTakingSnapshot(tableName)) {
860       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
861     }
862
863     // make sure we aren't running a restore on the same table
864     if (isRestoringTable(tableName)) {
865       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
866     }
867
868     try {
869       long procId = master.getMasterProcedureExecutor().submitProcedure(
870         new RestoreSnapshotProcedure(
871           master.getMasterProcedureExecutor().getEnvironment(), hTableDescriptor, snapshot),
872         nonceGroup,
873         nonce);
874       this.restoreTableToProcIdMap.put(tableName, procId);
875       return procId;
876     } catch (Exception e) {
877       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
878           snapshot)  +
879           " on table=" + tableName;
880       LOG.error(msg, e);
881       throw new RestoreSnapshotException(msg, e);
882     }
883   }
884
885   /**
886    * Verify if the restore of the specified table is in progress.
887    *
888    * @param tableName table under restore
889    * @return <tt>true</tt> if there is a restore in progress of the specified table.
890    */
891   private synchronized boolean isRestoringTable(final TableName tableName) {
892     Long procId = this.restoreTableToProcIdMap.get(tableName);
893     if (procId == null) {
894       return false;
895     }
896     ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
897     if (procExec.isRunning() && !procExec.isFinished(procId)) {
898       return true;
899     } else {
900       this.restoreTableToProcIdMap.remove(tableName);
901       return false;
902     }
903
904   }
905
906   /**
907    * Return the handler if it is currently live and has the same snapshot target name.
908    * The handler is removed from the sentinels map if completed.
909    * @param sentinels live handlers
910    * @param snapshot snapshot description
911    * @return null if doesn't match, else a live handler.
912    */
913   private synchronized SnapshotSentinel removeSentinelIfFinished(
914       final Map<TableName, SnapshotSentinel> sentinels,
915       final SnapshotDescription snapshot) {
916     if (!snapshot.hasTable()) {
917       return null;
918     }
919
920     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
921     SnapshotSentinel h = sentinels.get(snapshotTable);
922     if (h == null) {
923       return null;
924     }
925
926     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
927       // specified snapshot is to the one currently running
928       return null;
929     }
930
931     // Remove from the "in-progress" list once completed
932     if (h.isFinished()) {
933       sentinels.remove(snapshotTable);
934     }
935
936     return h;
937   }
938
939   /**
940    * Removes "abandoned" snapshot/restore requests.
941    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
942    * and the in-progress maps are cleaned up when the status of a completed task is requested.
943    * To avoid having sentinels staying around for long time if something client side is failed,
944    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
945    */
946   private void cleanupSentinels() {
947     cleanupSentinels(this.snapshotHandlers);
948     cleanupCompletedRestoreInMap();
949   }
950
951   /**
952    * Remove the sentinels that are marked as finished and the completion time
953    * has exceeded the removal timeout.
954    * @param sentinels map of sentinels to clean
955    */
956   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
957     long currentTime = EnvironmentEdgeManager.currentTime();
958     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
959         sentinels.entrySet().iterator();
960     while (it.hasNext()) {
961       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
962       SnapshotSentinel sentinel = entry.getValue();
963       if (sentinel.isFinished() &&
964           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
965       {
966         it.remove();
967       }
968     }
969   }
970
971   /**
972    * Remove the procedures that are marked as finished
973    */
974   private synchronized void cleanupCompletedRestoreInMap() {
975     ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
976     Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
977     while (it.hasNext()) {
978       Map.Entry<TableName, Long> entry = it.next();
979       Long procId = entry.getValue();
980       if (procExec.isRunning() && procExec.isFinished(procId)) {
981         it.remove();
982       }
983     }
984   }
985
986   //
987   // Implementing Stoppable interface
988   //
989
990   @Override
991   public void stop(String why) {
992     // short circuit
993     if (this.stopped) return;
994     // make sure we get stop
995     this.stopped = true;
996     // pass the stop onto take snapshot handlers
997     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
998       snapshotHandler.cancel(why);
999     }
1000
1001     try {
1002       if (coordinator != null) {
1003         coordinator.close();
1004       }
1005     } catch (IOException e) {
1006       LOG.error("stop ProcedureCoordinator error", e);
1007     }
1008   }
1009
1010   @Override
1011   public boolean isStopped() {
1012     return this.stopped;
1013   }
1014
1015   /**
1016    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1017    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1018    * @throws UnsupportedOperationException if snapshot are not supported
1019    */
1020   public void checkSnapshotSupport() throws UnsupportedOperationException {
1021     if (!this.isSnapshotSupported) {
1022       throw new UnsupportedOperationException(
1023         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1024           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1025     }
1026   }
1027
1028   /**
1029    * Called at startup, to verify if snapshot operation is supported, and to avoid
1030    * starting the master if there're snapshots present but the cleaners needed are missing.
1031    * Otherwise we can end up with snapshot data loss.
1032    * @param conf The {@link Configuration} object to use
1033    * @param mfs The MasterFileSystem to use
1034    * @throws IOException in case of file-system operation failure
1035    * @throws UnsupportedOperationException in case cleaners are missing and
1036    *         there're snapshot in the system
1037    */
1038   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1039       throws IOException, UnsupportedOperationException {
1040     // Verify if snapshot is disabled by the user
1041     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1042     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1043     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1044
1045     // Extract cleaners from conf
1046     Set<String> hfileCleaners = new HashSet<String>();
1047     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1048     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1049
1050     Set<String> logCleaners = new HashSet<String>();
1051     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1052     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1053
1054     // check if an older version of snapshot directory was present
1055     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1056     FileSystem fs = mfs.getFileSystem();
1057     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1058     if (ss != null && !ss.isEmpty()) {
1059       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1060       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1061     }
1062
1063     // If the user has enabled the snapshot, we force the cleaners to be present
1064     // otherwise we still need to check if cleaners are enabled or not and verify
1065     // that there're no snapshot in the .snapshot folder.
1066     if (snapshotEnabled) {
1067       // Inject snapshot cleaners, if snapshot.enable is true
1068       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1069       hfileCleaners.add(HFileLinkCleaner.class.getName());
1070
1071       // Set cleaners conf
1072       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1073         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1074       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1075         logCleaners.toArray(new String[logCleaners.size()]));
1076     } else {
1077       // Verify if cleaners are present
1078       snapshotEnabled =
1079         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1080         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1081
1082       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1083       if (snapshotEnabled) {
1084         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1085           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1086           (userDisabled ? "is set to 'false'." : "is not set."));
1087       }
1088     }
1089
1090     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1091     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1092
1093     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1094     // otherwise we end up with snapshot data loss.
1095     if (!snapshotEnabled) {
1096       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1097       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1098       if (fs.exists(snapshotDir)) {
1099         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1100           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1101         if (snapshots != null) {
1102           LOG.error("Snapshots are present, but cleaners are not enabled.");
1103           checkSnapshotSupport();
1104         }
1105       }
1106     }
1107   }
1108
1109   @Override
1110   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1111       IOException, UnsupportedOperationException {
1112     this.master = master;
1113
1114     this.rootDir = master.getMasterFileSystem().getRootDir();
1115     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1116
1117     // get the configuration for the coordinator
1118     Configuration conf = master.getConfiguration();
1119     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1120     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1121                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1122             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1123                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1124     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1125
1126     // setup the default procedure coordinator
1127     String name = master.getServerName().toString();
1128     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1129     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1130         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1131
1132     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1133     this.executorService = master.getExecutorService();
1134     resetTempDir();
1135   }
1136
1137   @Override
1138   public String getProcedureSignature() {
1139     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1140   }
1141
1142   @Override
1143   public void execProcedure(ProcedureDescription desc) throws IOException {
1144     takeSnapshot(toSnapshotDescription(desc));
1145   }
1146
1147   @Override
1148   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1149     return isSnapshotDone(toSnapshotDescription(desc));
1150   }
1151
1152   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1153       throws IOException {
1154     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1155     if (!desc.hasInstance()) {
1156       throw new IOException("Snapshot name is not defined: " + desc.toString());
1157     }
1158     String snapshotName = desc.getInstance();
1159     List<NameStringPair> props = desc.getConfigurationList();
1160     String table = null;
1161     for (NameStringPair prop : props) {
1162       if ("table".equalsIgnoreCase(prop.getName())) {
1163         table = prop.getValue();
1164       }
1165     }
1166     if (table == null) {
1167       throw new IOException("Snapshot table is not defined: " + desc.toString());
1168     }
1169     TableName tableName = TableName.valueOf(table);
1170     builder.setTable(tableName.getNameAsString());
1171     builder.setName(snapshotName);
1172     builder.setType(SnapshotDescription.Type.FLUSH);
1173     return builder.build();
1174   }
1175 }