View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MetaTableAccessor;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.TableState;
48  import org.apache.hadoop.hbase.errorhandling.ForeignException;
49  import org.apache.hadoop.hbase.executor.ExecutorService;
50  import org.apache.hadoop.hbase.ipc.RpcServer;
51  import org.apache.hadoop.hbase.master.AssignmentManager;
52  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
53  import org.apache.hadoop.hbase.master.MasterFileSystem;
54  import org.apache.hadoop.hbase.master.MasterServices;
55  import org.apache.hadoop.hbase.master.MetricsMaster;
56  import org.apache.hadoop.hbase.master.SnapshotSentinel;
57  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
58  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
59  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
60  import org.apache.hadoop.hbase.procedure.Procedure;
61  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
62  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
63  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
67  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
68  import org.apache.hadoop.hbase.security.AccessDeniedException;
69  import org.apache.hadoop.hbase.security.User;
70  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
71  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
72  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
73  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
74  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
75  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
76  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
77  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
78  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
79  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
80  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
81  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
82  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
83  import org.apache.hadoop.hbase.util.FSUtils;
84  import org.apache.zookeeper.KeeperException;
85  
86  /**
87   * This class manages the procedure of taking and restoring snapshots. There is only one
88   * SnapshotManager for the master.
89   * <p>
90   * The class provides methods for monitoring in-progress snapshot actions.
91   * <p>
92   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
93   * simplification in the current implementation.
94   */
95  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
96  @InterfaceStability.Unstable
97  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
98    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
99  
100   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
101   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
102 
103   /**
104    * Wait time before removing a finished sentinel from the in-progress map
105    *
106    * NOTE: This is used as a safety auto cleanup.
107    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
108    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
109    * In case something fails on the client side and the snapshot/restore state is not reclaimed
110    * after a default timeout, the entry is removed from the in-progress map.
111    * At this point, if the user asks for the snapshot/restore status, the result will be
112    * snapshot done if exists or failed if it doesn't exists.
113    */
114   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
115 
116   /** Enable or disable snapshot support */
117   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
118 
119   /**
120    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
121    * completion.
122    */
123   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
124 
125   /** By default, check to see if the snapshot is complete (ms) */
126   private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
127 
128   /**
129    * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting for
130    * completion.
131    */
132   private static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.master.timeoutMillis";
133 
134   /** Name of the operation to use in the controller */
135   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
136 
137   /** Conf key for # of threads used by the SnapshotManager thread pool */
138   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
139 
140   /** number of current operations running on the master */
141   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
142 
143   private boolean stopped;
144   private MasterServices master;  // Needed by TableEventHandlers
145   private ProcedureCoordinator coordinator;
146 
147   // Is snapshot feature enabled?
148   private boolean isSnapshotSupported = false;
149 
150   // Snapshot handlers map, with table name as key.
151   // The map is always accessed and modified under the object lock using synchronized.
152   // snapshotTable() will insert an Handler in the table.
153   // isSnapshotDone() will remove the handler requested if the operation is finished.
154   private Map<TableName, SnapshotSentinel> snapshotHandlers =
155       new HashMap<TableName, SnapshotSentinel>();
156 
157   // Restore Sentinels map, with table name as key.
158   // The map is always accessed and modified under the object lock using synchronized.
159   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
160   // isRestoreDone() will remove the handler requested if the operation is finished.
161   private Map<TableName, SnapshotSentinel> restoreHandlers =
162       new HashMap<TableName, SnapshotSentinel>();
163 
164   private Path rootDir;
165   private ExecutorService executorService;
166 
167   public SnapshotManager() {}
168 
169   /**
170    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
171    * @param master services for the master where the manager is running
172    * @param coordinator procedure coordinator instance.  exposed for testing.
173    * @param pool HBase ExecutorServcie instance, exposed for testing.
174    */
175   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
176       ProcedureCoordinator coordinator, ExecutorService pool)
177       throws IOException, UnsupportedOperationException {
178     this.master = master;
179 
180     this.rootDir = master.getMasterFileSystem().getRootDir();
181     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
182 
183     this.coordinator = coordinator;
184     this.executorService = pool;
185     resetTempDir();
186   }
187 
188   /**
189    * Gets the list of all completed snapshots.
190    * @return list of SnapshotDescriptions
191    * @throws IOException File system exception
192    */
193   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
194     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
195   }
196 
197   /**
198    * Gets the list of all completed snapshots.
199    * @param snapshotDir snapshot directory
200    * @return list of SnapshotDescriptions
201    * @throws IOException File system exception
202    */
203   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
204     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
205     // first create the snapshot root path and check to see if it exists
206     FileSystem fs = master.getMasterFileSystem().getFileSystem();
207     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
208 
209     // if there are no snapshots, return an empty list
210     if (!fs.exists(snapshotDir)) {
211       return snapshotDescs;
212     }
213 
214     // ignore all the snapshots in progress
215     FileStatus[] snapshots = fs.listStatus(snapshotDir,
216       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
217     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
218     // loop through all the completed snapshots
219     for (FileStatus snapshot : snapshots) {
220       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
221       // if the snapshot is bad
222       if (!fs.exists(info)) {
223         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
224         continue;
225       }
226       FSDataInputStream in = null;
227       try {
228         in = fs.open(info);
229         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
230         if (cpHost != null) {
231           try {
232             cpHost.preListSnapshot(desc);
233           } catch (AccessDeniedException e) {
234             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
235                 + "Either you should be owner of this snapshot or admin user.");
236             // Skip this and try for next snapshot
237             continue;
238           }
239         }
240         snapshotDescs.add(desc);
241 
242         // call coproc post hook
243         if (cpHost != null) {
244           cpHost.postListSnapshot(desc);
245         }
246       } catch (IOException e) {
247         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
248       } finally {
249         if (in != null) {
250           in.close();
251         }
252       }
253     }
254     return snapshotDescs;
255   }
256 
257   /**
258    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
259    * snapshot attempts.
260    *
261    * @throws IOException if we can't reach the filesystem
262    */
263   void resetTempDir() throws IOException {
264     // cleanup any existing snapshots.
265     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
266     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
267       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
268         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
269       }
270     }
271   }
272 
273   /**
274    * Delete the specified snapshot
275    * @param snapshot
276    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
277    * @throws IOException For filesystem IOExceptions
278    */
279   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
280     // check to see if it is completed
281     if (!isSnapshotCompleted(snapshot)) {
282       throw new SnapshotDoesNotExistException(snapshot);
283     }
284 
285     String snapshotName = snapshot.getName();
286     // first create the snapshot description and check to see if it exists
287     FileSystem fs = master.getMasterFileSystem().getFileSystem();
288     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
289     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
290     // just the "name" and it does not contains the "real" snapshot information
291     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
292 
293     // call coproc pre hook
294     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
295     if (cpHost != null) {
296       cpHost.preDeleteSnapshot(snapshot);
297     }
298 
299     LOG.debug("Deleting snapshot: " + snapshotName);
300     // delete the existing snapshot
301     if (!fs.delete(snapshotDir, true)) {
302       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
303     }
304 
305     // call coproc post hook
306     if (cpHost != null) {
307       cpHost.postDeleteSnapshot(snapshot);
308     }
309 
310   }
311 
312   /**
313    * Check if the specified snapshot is done
314    *
315    * @param expected
316    * @return true if snapshot is ready to be restored, false if it is still being taken.
317    * @throws IOException IOException if error from HDFS or RPC
318    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
319    */
320   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
321     // check the request to make sure it has a snapshot
322     if (expected == null) {
323       throw new UnknownSnapshotException(
324          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
325     }
326 
327     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
328 
329     // check to see if the sentinel exists,
330     // and if the task is complete removes it from the in-progress snapshots map.
331     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
332 
333     // stop tracking "abandoned" handlers
334     cleanupSentinels();
335 
336     if (handler == null) {
337       // If there's no handler in the in-progress map, it means one of the following:
338       //   - someone has already requested the snapshot state
339       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
340       //   - the snapshot was never requested
341       // In those cases returns to the user the "done state" if the snapshots exists on disk,
342       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
343       if (!isSnapshotCompleted(expected)) {
344         throw new UnknownSnapshotException("Snapshot " + ssString
345             + " is not currently running or one of the known completed snapshots.");
346       }
347       // was done, return true;
348       return true;
349     }
350 
351     // pass on any failure we find in the sentinel
352     try {
353       handler.rethrowExceptionIfFailed();
354     } catch (ForeignException e) {
355       // Give some procedure info on an exception.
356       String status;
357       Procedure p = coordinator.getProcedure(expected.getName());
358       if (p != null) {
359         status = p.getStatus();
360       } else {
361         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
362       }
363       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
364           expected);
365     }
366 
367     // check to see if we are done
368     if (handler.isFinished()) {
369       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
370       return true;
371     } else if (LOG.isDebugEnabled()) {
372       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
373     }
374     return false;
375   }
376 
377   /**
378    * Check to see if there is a snapshot in progress with the same name or on the same table.
379    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
380    * don't allow snapshot with the same name.
381    * @param snapshot description of the snapshot being checked.
382    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
383    *         table.
384    */
385   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
386     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
387     if (isTakingSnapshot(snapshotTable)) {
388       return true;
389     }
390     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
391     while (it.hasNext()) {
392       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
393       SnapshotSentinel sentinel = entry.getValue();
394       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
395         return true;
396       }
397     }
398     return false;
399   }
400 
401   /**
402    * Check to see if the specified table has a snapshot in progress.  Currently we have a
403    * limitation only allowing a single snapshot per table at a time.
404    * @param tableName name of the table being snapshotted.
405    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
406    */
407   synchronized boolean isTakingSnapshot(final TableName tableName) {
408     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
409     return handler != null && !handler.isFinished();
410   }
411 
412   /**
413    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
414    * aren't already running a snapshot or restore on the requested table.
415    * @param snapshot description of the snapshot we want to start
416    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
417    */
418   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
419       throws HBaseSnapshotException {
420     FileSystem fs = master.getMasterFileSystem().getFileSystem();
421     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
422     TableName snapshotTable =
423         TableName.valueOf(snapshot.getTable());
424 
425     // make sure we aren't already running a snapshot
426     if (isTakingSnapshot(snapshot)) {
427       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
428       throw new SnapshotCreationException("Rejected taking "
429           + ClientSnapshotDescriptionUtils.toString(snapshot)
430           + " because we are already running another snapshot "
431           + (handler != null ? ("on the same table " +
432               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
433               : "with the same name"), snapshot);
434     }
435 
436     // make sure we aren't running a restore on the same table
437     if (isRestoringTable(snapshotTable)) {
438       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
439       throw new SnapshotCreationException("Rejected taking "
440           + ClientSnapshotDescriptionUtils.toString(snapshot)
441           + " because we are already have a restore in progress on the same snapshot "
442           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
443     }
444 
445     try {
446       // delete the working directory, since we aren't running the snapshot. Likely leftovers
447       // from a failed attempt.
448       fs.delete(workingDir, true);
449 
450       // recreate the working directory for the snapshot
451       if (!fs.mkdirs(workingDir)) {
452         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
453             + ") for snapshot" , snapshot);
454       }
455     } catch (HBaseSnapshotException e) {
456       throw e;
457     } catch (IOException e) {
458       throw new SnapshotCreationException(
459           "Exception while checking to see if snapshot could be started.", e, snapshot);
460     }
461   }
462 
463   /**
464    * Take a snapshot of a disabled table.
465    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
466    * @throws HBaseSnapshotException if the snapshot could not be started
467    */
468   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
469       throws HBaseSnapshotException {
470     // setup the snapshot
471     prepareToTakeSnapshot(snapshot);
472 
473     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
474     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
475 
476     // Take the snapshot of the disabled table
477     DisabledTableSnapshotHandler handler =
478         new DisabledTableSnapshotHandler(snapshot, master);
479     snapshotTable(snapshot, handler);
480   }
481 
482   /**
483    * Take a snapshot of an enabled table.
484    * @param snapshot description of the snapshot to take.
485    * @throws HBaseSnapshotException if the snapshot could not be started
486    */
487   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
488       throws HBaseSnapshotException {
489     // setup the snapshot
490     prepareToTakeSnapshot(snapshot);
491 
492     // Take the snapshot of the enabled table
493     EnabledTableSnapshotHandler handler =
494         new EnabledTableSnapshotHandler(snapshot, master, this);
495     snapshotTable(snapshot, handler);
496   }
497 
498   /**
499    * Take a snapshot using the specified handler.
500    * On failure the snapshot temporary working directory is removed.
501    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
502    *       snapshot request if the table is busy with another snapshot/restore operation.
503    * @param snapshot the snapshot description
504    * @param handler the snapshot handler
505    */
506   private synchronized void snapshotTable(SnapshotDescription snapshot,
507       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
508     try {
509       handler.prepare();
510       this.executorService.submit(handler);
511       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
512     } catch (Exception e) {
513       // cleanup the working directory by trying to delete it from the fs.
514       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
515       try {
516         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
517           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
518               ClientSnapshotDescriptionUtils.toString(snapshot));
519         }
520       } catch (IOException e1) {
521         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
522             ClientSnapshotDescriptionUtils.toString(snapshot));
523       }
524       // fail the snapshot
525       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
526     }
527   }
528 
529   /**
530    * Take a snapshot based on the enabled/disabled state of the table.
531    *
532    * @param snapshot
533    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
534    * @throws IOException when some sort of generic IO exception occurs.
535    */
536   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
537     // check to see if we already completed the snapshot
538     if (isSnapshotCompleted(snapshot)) {
539       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
540           + "' already stored on the filesystem.", snapshot);
541     }
542 
543     LOG.debug("No existing snapshot, attempting snapshot...");
544 
545     // stop tracking "abandoned" handlers
546     cleanupSentinels();
547 
548     // check to see if the table exists
549     HTableDescriptor desc = null;
550     try {
551       desc = master.getTableDescriptors().get(
552           TableName.valueOf(snapshot.getTable()));
553     } catch (FileNotFoundException e) {
554       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
555       LOG.error(msg);
556       throw new SnapshotCreationException(msg, e, snapshot);
557     } catch (IOException e) {
558       throw new SnapshotCreationException("Error while geting table description for table "
559           + snapshot.getTable(), e, snapshot);
560     }
561     if (desc == null) {
562       throw new SnapshotCreationException("Table '" + snapshot.getTable()
563           + "' doesn't exist, can't take snapshot.", snapshot);
564     }
565     SnapshotDescription.Builder builder = snapshot.toBuilder();
566     // if not specified, set the snapshot format
567     if (!snapshot.hasVersion()) {
568       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
569     }
570     User user = RpcServer.getRequestUser();
571     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
572       builder.setOwner(user.getShortName());
573     }
574     snapshot = builder.build();
575 
576     // call pre coproc hook
577     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
578     if (cpHost != null) {
579       cpHost.preSnapshot(snapshot, desc);
580     }
581 
582     // if the table is enabled, then have the RS run actually the snapshot work
583     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
584     AssignmentManager assignmentMgr = master.getAssignmentManager();
585     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
586         TableState.State.ENABLED)) {
587       LOG.debug("Table enabled, starting distributed snapshot.");
588       snapshotEnabledTable(snapshot);
589       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
590     }
591     // For disabled table, snapshot is created by the master
592     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
593         TableState.State.DISABLED)) {
594       LOG.debug("Table is disabled, running snapshot entirely on master.");
595       snapshotDisabledTable(snapshot);
596       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
597     } else {
598       LOG.error("Can't snapshot table '" + snapshot.getTable()
599           + "', isn't open or closed, we don't know what to do!");
600       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
601           + " isn't fully open.");
602       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
603     }
604 
605     // call post coproc hook
606     if (cpHost != null) {
607       cpHost.postSnapshot(snapshot, desc);
608     }
609   }
610 
611   /**
612    * Set the handler for the current snapshot
613    * <p>
614    * Exposed for TESTING
615    * @param tableName
616    * @param handler handler the master should use
617    *
618    * TODO get rid of this if possible, repackaging, modify tests.
619    */
620   public synchronized void setSnapshotHandlerForTesting(
621       final TableName tableName,
622       final SnapshotSentinel handler) {
623     if (handler != null) {
624       this.snapshotHandlers.put(tableName, handler);
625     } else {
626       this.snapshotHandlers.remove(tableName);
627     }
628   }
629 
630   /**
631    * @return distributed commit coordinator for all running snapshots
632    */
633   ProcedureCoordinator getCoordinator() {
634     return coordinator;
635   }
636 
637   /**
638    * Check to see if the snapshot is one of the currently completed snapshots
639    * Returns true if the snapshot exists in the "completed snapshots folder".
640    *
641    * @param snapshot expected snapshot to check
642    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
643    *         not stored
644    * @throws IOException if the filesystem throws an unexpected exception,
645    * @throws IllegalArgumentException if snapshot name is invalid.
646    */
647   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
648     try {
649       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
650       FileSystem fs = master.getMasterFileSystem().getFileSystem();
651       // check to see if the snapshot already exists
652       return fs.exists(snapshotDir);
653     } catch (IllegalArgumentException iae) {
654       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
655     }
656   }
657 
658   /**
659    * Clone the specified snapshot into a new table.
660    * The operation will fail if the destination table has a snapshot or restore in progress.
661    *
662    * @param snapshot Snapshot Descriptor
663    * @param hTableDescriptor Table Descriptor of the table to create
664    */
665   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
666       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
667     TableName tableName = hTableDescriptor.getTableName();
668 
669     // make sure we aren't running a snapshot on the same table
670     if (isTakingSnapshot(tableName)) {
671       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
672     }
673 
674     // make sure we aren't running a restore on the same table
675     if (isRestoringTable(tableName)) {
676       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
677     }
678 
679     try {
680       CloneSnapshotHandler handler =
681         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
682       this.executorService.submit(handler);
683       this.restoreHandlers.put(tableName, handler);
684     } catch (Exception e) {
685       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
686         " on table=" + tableName;
687       LOG.error(msg, e);
688       throw new RestoreSnapshotException(msg, e);
689     }
690   }
691 
692   /**
693    * Restore the specified snapshot
694    * @param reqSnapshot
695    * @throws IOException
696    */
697   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
698     FileSystem fs = master.getMasterFileSystem().getFileSystem();
699     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
700     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
701 
702     // check if the snapshot exists
703     if (!fs.exists(snapshotDir)) {
704       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
705       throw new SnapshotDoesNotExistException(reqSnapshot);
706     }
707 
708     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
709     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
710     // information.
711     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
712     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
713         snapshotDir, snapshot);
714     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
715     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
716 
717     // stop tracking "abandoned" handlers
718     cleanupSentinels();
719 
720     // Verify snapshot validity
721     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
722 
723     // Execute the restore/clone operation
724     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
725       if (master.getTableStateManager().isTableState(
726           TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
727         throw new UnsupportedOperationException("Table '" +
728             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
729             "perform a restore operation" +
730             ".");
731       }
732 
733       // call coproc pre hook
734       if (cpHost != null) {
735         cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
736       }
737       try {
738         // Table already exist. Check and update the region quota for this table namespace
739         checkAndUpdateNamespaceRegionQuota(manifest, tableName);
740         restoreSnapshot(snapshot, snapshotTableDesc);
741       } catch (IOException e) {
742         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
743         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
744             + " as table " + tableName.getNameAsString(), e);
745         throw e;
746       }
747       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
748 
749       if (cpHost != null) {
750         cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
751       }
752     } else {
753       HTableDescriptor htd = RestoreSnapshotHelper.cloneTableSchema(snapshotTableDesc, tableName);
754       if (cpHost != null) {
755         cpHost.preCloneSnapshot(reqSnapshot, htd);
756       }
757       try {
758         checkAndUpdateNamespaceQuota(manifest, tableName);
759         cloneSnapshot(snapshot, htd);
760       } catch (IOException e) {
761         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
762         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
763             + " as table " + tableName.getNameAsString(), e);
764         throw e;
765       }
766       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
767 
768       if (cpHost != null) {
769         cpHost.postCloneSnapshot(reqSnapshot, htd);
770       }
771     }
772   }
773   
774   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
775       throws IOException {
776     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
777       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
778         manifest.getRegionManifestsMap().size());
779     }
780   }
781 
782   private void checkAndUpdateNamespaceRegionQuota(SnapshotManifest manifest, TableName tableName)
783       throws IOException {
784     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
785       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
786         manifest.getRegionManifestsMap().size());
787     }
788   }
789 
790   /**
791    * Restore the specified snapshot.
792    * The restore will fail if the destination table has a snapshot or restore in progress.
793    *
794    * @param snapshot Snapshot Descriptor
795    * @param hTableDescriptor Table Descriptor
796    */
797   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
798       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
799     TableName tableName = hTableDescriptor.getTableName();
800 
801     // make sure we aren't running a snapshot on the same table
802     if (isTakingSnapshot(tableName)) {
803       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
804     }
805 
806     // make sure we aren't running a restore on the same table
807     if (isRestoringTable(tableName)) {
808       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
809     }
810 
811     try {
812       RestoreSnapshotHandler handler =
813         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
814       this.executorService.submit(handler);
815       restoreHandlers.put(tableName, handler);
816     } catch (Exception e) {
817       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
818           snapshot)  +
819           " on table=" + tableName;
820       LOG.error(msg, e);
821       throw new RestoreSnapshotException(msg, e);
822     }
823   }
824 
825   /**
826    * Verify if the restore of the specified table is in progress.
827    *
828    * @param tableName table under restore
829    * @return <tt>true</tt> if there is a restore in progress of the specified table.
830    */
831   private synchronized boolean isRestoringTable(final TableName tableName) {
832     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
833     return(sentinel != null && !sentinel.isFinished());
834   }
835 
836   /**
837    * Returns the status of a restore operation.
838    * If the in-progress restore is failed throws the exception that caused the failure.
839    *
840    * @param snapshot
841    * @return false if in progress, true if restore is completed or not requested.
842    * @throws IOException if there was a failure during the restore
843    */
844   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
845     // check to see if the sentinel exists,
846     // and if the task is complete removes it from the in-progress restore map.
847     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
848 
849     // stop tracking "abandoned" handlers
850     cleanupSentinels();
851 
852     if (sentinel == null) {
853       // there is no sentinel so restore is not in progress.
854       return true;
855     }
856 
857     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
858         + sentinel.getSnapshot().getName() + " table=" +
859         TableName.valueOf(snapshot.getTable()));
860 
861     // If the restore is failed, rethrow the exception
862     sentinel.rethrowExceptionIfFailed();
863 
864     // check to see if we are done
865     if (sentinel.isFinished()) {
866       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
867           " has completed. Notifying the client.");
868       return true;
869     }
870 
871     if (LOG.isDebugEnabled()) {
872       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
873           ClientSnapshotDescriptionUtils.toString(snapshot));
874     }
875     return false;
876   }
877 
878   /**
879    * Return the handler if it is currently live and has the same snapshot target name.
880    * The handler is removed from the sentinels map if completed.
881    * @param sentinels live handlers
882    * @param snapshot snapshot description
883    * @return null if doesn't match, else a live handler.
884    */
885   private synchronized SnapshotSentinel removeSentinelIfFinished(
886       final Map<TableName, SnapshotSentinel> sentinels,
887       final SnapshotDescription snapshot) {
888     if (!snapshot.hasTable()) {
889       return null;
890     }
891 
892     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
893     SnapshotSentinel h = sentinels.get(snapshotTable);
894     if (h == null) {
895       return null;
896     }
897 
898     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
899       // specified snapshot is to the one currently running
900       return null;
901     }
902 
903     // Remove from the "in-progress" list once completed
904     if (h.isFinished()) {
905       sentinels.remove(snapshotTable);
906     }
907 
908     return h;
909   }
910 
911   /**
912    * Removes "abandoned" snapshot/restore requests.
913    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
914    * and the in-progress maps are cleaned up when the status of a completed task is requested.
915    * To avoid having sentinels staying around for long time if something client side is failed,
916    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
917    */
918   private void cleanupSentinels() {
919     cleanupSentinels(this.snapshotHandlers);
920     cleanupSentinels(this.restoreHandlers);
921   }
922 
923   /**
924    * Remove the sentinels that are marked as finished and the completion time
925    * has exceeded the removal timeout.
926    * @param sentinels map of sentinels to clean
927    */
928   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
929     long currentTime = EnvironmentEdgeManager.currentTime();
930     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
931         sentinels.entrySet().iterator();
932     while (it.hasNext()) {
933       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
934       SnapshotSentinel sentinel = entry.getValue();
935       if (sentinel.isFinished() &&
936           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
937       {
938         it.remove();
939       }
940     }
941   }
942 
943   //
944   // Implementing Stoppable interface
945   //
946 
947   @Override
948   public void stop(String why) {
949     // short circuit
950     if (this.stopped) return;
951     // make sure we get stop
952     this.stopped = true;
953     // pass the stop onto take snapshot handlers
954     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
955       snapshotHandler.cancel(why);
956     }
957 
958     // pass the stop onto all the restore handlers
959     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
960       restoreHandler.cancel(why);
961     }
962     try {
963       if (coordinator != null) {
964         coordinator.close();
965       }
966     } catch (IOException e) {
967       LOG.error("stop ProcedureCoordinator error", e);
968     }
969   }
970 
971   @Override
972   public boolean isStopped() {
973     return this.stopped;
974   }
975 
976   /**
977    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
978    * Called at the beginning of snapshot() and restoreSnapshot() methods.
979    * @throws UnsupportedOperationException if snapshot are not supported
980    */
981   public void checkSnapshotSupport() throws UnsupportedOperationException {
982     if (!this.isSnapshotSupported) {
983       throw new UnsupportedOperationException(
984         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
985           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
986     }
987   }
988 
989   /**
990    * Called at startup, to verify if snapshot operation is supported, and to avoid
991    * starting the master if there're snapshots present but the cleaners needed are missing.
992    * Otherwise we can end up with snapshot data loss.
993    * @param conf The {@link Configuration} object to use
994    * @param mfs The MasterFileSystem to use
995    * @throws IOException in case of file-system operation failure
996    * @throws UnsupportedOperationException in case cleaners are missing and
997    *         there're snapshot in the system
998    */
999   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1000       throws IOException, UnsupportedOperationException {
1001     // Verify if snapshot is disabled by the user
1002     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1003     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1004     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1005 
1006     // Extract cleaners from conf
1007     Set<String> hfileCleaners = new HashSet<String>();
1008     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1009     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1010 
1011     Set<String> logCleaners = new HashSet<String>();
1012     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1013     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1014 
1015     // check if an older version of snapshot directory was present
1016     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1017     FileSystem fs = mfs.getFileSystem();
1018     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1019     if (ss != null && !ss.isEmpty()) {
1020       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1021       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1022     }
1023 
1024     // If the user has enabled the snapshot, we force the cleaners to be present
1025     // otherwise we still need to check if cleaners are enabled or not and verify
1026     // that there're no snapshot in the .snapshot folder.
1027     if (snapshotEnabled) {
1028       // Inject snapshot cleaners, if snapshot.enable is true
1029       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1030       hfileCleaners.add(HFileLinkCleaner.class.getName());
1031       logCleaners.add(SnapshotLogCleaner.class.getName());
1032 
1033       // Set cleaners conf
1034       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1035         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1036       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1037         logCleaners.toArray(new String[logCleaners.size()]));
1038     } else {
1039       // Verify if cleaners are present
1040       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
1041         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1042         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1043 
1044       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1045       if (snapshotEnabled) {
1046         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1047           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1048           (userDisabled ? "is set to 'false'." : "is not set."));
1049       }
1050     }
1051 
1052     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1053     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1054 
1055     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1056     // otherwise we end up with snapshot data loss.
1057     if (!snapshotEnabled) {
1058       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1059       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1060       if (fs.exists(snapshotDir)) {
1061         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1062           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1063         if (snapshots != null) {
1064           LOG.error("Snapshots are present, but cleaners are not enabled.");
1065           checkSnapshotSupport();
1066         }
1067       }
1068     }
1069   }
1070 
1071   @Override
1072   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1073       IOException, UnsupportedOperationException {
1074     this.master = master;
1075 
1076     this.rootDir = master.getMasterFileSystem().getRootDir();
1077     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1078 
1079     // get the configuration for the coordinator
1080     Configuration conf = master.getConfiguration();
1081     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1082     long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
1083     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1084 
1085     // setup the default procedure coordinator
1086     String name = master.getServerName().toString();
1087     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1088     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1089         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1090 
1091     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1092     this.executorService = master.getExecutorService();
1093     resetTempDir();
1094   }
1095 
1096   @Override
1097   public String getProcedureSignature() {
1098     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1099   }
1100 
1101   @Override
1102   public void execProcedure(ProcedureDescription desc) throws IOException {
1103     takeSnapshot(toSnapshotDescription(desc));
1104   }
1105 
1106   @Override
1107   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1108     return isSnapshotDone(toSnapshotDescription(desc));
1109   }
1110 
1111   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1112       throws IOException {
1113     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1114     if (!desc.hasInstance()) {
1115       throw new IOException("Snapshot name is not defined: " + desc.toString());
1116     }
1117     String snapshotName = desc.getInstance();
1118     List<NameStringPair> props = desc.getConfigurationList();
1119     String table = null;
1120     for (NameStringPair prop : props) {
1121       if ("table".equalsIgnoreCase(prop.getName())) {
1122         table = prop.getValue();
1123       }
1124     }
1125     if (table == null) {
1126       throw new IOException("Snapshot table is not defined: " + desc.toString());
1127     }
1128     TableName tableName = TableName.valueOf(table);
1129     builder.setTable(tableName.getNameAsString());
1130     builder.setName(snapshotName);
1131     builder.setType(SnapshotDescription.Type.FLUSH);
1132     return builder.build();
1133   }
1134 }