View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.ThreadPoolExecutor;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FSDataInputStream;
36  import org.apache.hadoop.fs.FileStatus;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.HConstants;
41  import org.apache.hadoop.hbase.HTableDescriptor;
42  import org.apache.hadoop.hbase.MetaTableAccessor;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.classification.InterfaceStability;
47  import org.apache.hadoop.hbase.client.TableState;
48  import org.apache.hadoop.hbase.errorhandling.ForeignException;
49  import org.apache.hadoop.hbase.executor.ExecutorService;
50  import org.apache.hadoop.hbase.ipc.RpcServer;
51  import org.apache.hadoop.hbase.master.AssignmentManager;
52  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
53  import org.apache.hadoop.hbase.master.MasterFileSystem;
54  import org.apache.hadoop.hbase.master.MasterServices;
55  import org.apache.hadoop.hbase.master.MetricsMaster;
56  import org.apache.hadoop.hbase.master.SnapshotSentinel;
57  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
58  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
59  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
60  import org.apache.hadoop.hbase.procedure.Procedure;
61  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
62  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
63  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
64  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
65  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
66  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
67  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
68  import org.apache.hadoop.hbase.security.AccessDeniedException;
69  import org.apache.hadoop.hbase.security.User;
70  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
71  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
72  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
73  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
74  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
75  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
76  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
77  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
78  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
79  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
80  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.FSUtils;
83  import org.apache.zookeeper.KeeperException;
84  
85  /**
86   * This class manages the procedure of taking and restoring snapshots. There is only one
87   * SnapshotManager for the master.
88   * <p>
89   * The class provides methods for monitoring in-progress snapshot actions.
90   * <p>
91   * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
92   * simplification in the current implementation.
93   */
94  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
95  @InterfaceStability.Unstable
96  public class SnapshotManager extends MasterProcedureManager implements Stoppable {
97    private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
98  
99    /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
100   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
101 
102   /**
103    * Wait time before removing a finished sentinel from the in-progress map
104    *
105    * NOTE: This is used as a safety auto cleanup.
106    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
107    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
108    * In case something fails on the client side and the snapshot/restore state is not reclaimed
109    * after a default timeout, the entry is removed from the in-progress map.
110    * At this point, if the user asks for the snapshot/restore status, the result will be
111    * snapshot done if exists or failed if it doesn't exists.
112    */
113   private static final int SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT = 60 * 1000;
114 
115   /** Enable or disable snapshot support */
116   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
117 
118   /**
119    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
120    * completion.
121    */
122   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
123 
124   /** Name of the operation to use in the controller */
125   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
126 
127   /** Conf key for # of threads used by the SnapshotManager thread pool */
128   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
129 
130   /** number of current operations running on the master */
131   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
132 
133   private boolean stopped;
134   private MasterServices master;  // Needed by TableEventHandlers
135   private ProcedureCoordinator coordinator;
136 
137   // Is snapshot feature enabled?
138   private boolean isSnapshotSupported = false;
139 
140   // Snapshot handlers map, with table name as key.
141   // The map is always accessed and modified under the object lock using synchronized.
142   // snapshotTable() will insert an Handler in the table.
143   // isSnapshotDone() will remove the handler requested if the operation is finished.
144   private Map<TableName, SnapshotSentinel> snapshotHandlers =
145       new HashMap<TableName, SnapshotSentinel>();
146 
147   // Restore Sentinels map, with table name as key.
148   // The map is always accessed and modified under the object lock using synchronized.
149   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
150   // isRestoreDone() will remove the handler requested if the operation is finished.
151   private Map<TableName, SnapshotSentinel> restoreHandlers =
152       new HashMap<TableName, SnapshotSentinel>();
153 
154   private Path rootDir;
155   private ExecutorService executorService;
156 
157   public SnapshotManager() {}
158 
159   /**
160    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
161    * @param master services for the master where the manager is running
162    * @param coordinator procedure coordinator instance.  exposed for testing.
163    * @param pool HBase ExecutorServcie instance, exposed for testing.
164    */
165   public SnapshotManager(final MasterServices master, final MetricsMaster metricsMaster,
166       ProcedureCoordinator coordinator, ExecutorService pool)
167       throws IOException, UnsupportedOperationException {
168     this.master = master;
169 
170     this.rootDir = master.getMasterFileSystem().getRootDir();
171     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
172 
173     this.coordinator = coordinator;
174     this.executorService = pool;
175     resetTempDir();
176   }
177 
178   /**
179    * Gets the list of all completed snapshots.
180    * @return list of SnapshotDescriptions
181    * @throws IOException File system exception
182    */
183   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
184     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
185   }
186 
187   /**
188    * Gets the list of all completed snapshots.
189    * @param snapshotDir snapshot directory
190    * @return list of SnapshotDescriptions
191    * @throws IOException File system exception
192    */
193   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
194     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
195     // first create the snapshot root path and check to see if it exists
196     FileSystem fs = master.getMasterFileSystem().getFileSystem();
197     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
198 
199     // if there are no snapshots, return an empty list
200     if (!fs.exists(snapshotDir)) {
201       return snapshotDescs;
202     }
203 
204     // ignore all the snapshots in progress
205     FileStatus[] snapshots = fs.listStatus(snapshotDir,
206       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
207     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
208     // loop through all the completed snapshots
209     for (FileStatus snapshot : snapshots) {
210       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
211       // if the snapshot is bad
212       if (!fs.exists(info)) {
213         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
214         continue;
215       }
216       FSDataInputStream in = null;
217       try {
218         in = fs.open(info);
219         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
220         if (cpHost != null) {
221           try {
222             cpHost.preListSnapshot(desc);
223           } catch (AccessDeniedException e) {
224             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
225                 + "Either you should be owner of this snapshot or admin user.");
226             // Skip this and try for next snapshot
227             continue;
228           }
229         }
230         snapshotDescs.add(desc);
231 
232         // call coproc post hook
233         if (cpHost != null) {
234           cpHost.postListSnapshot(desc);
235         }
236       } catch (IOException e) {
237         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
238       } finally {
239         if (in != null) {
240           in.close();
241         }
242       }
243     }
244     return snapshotDescs;
245   }
246 
247   /**
248    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
249    * snapshot attempts.
250    *
251    * @throws IOException if we can't reach the filesystem
252    */
253   void resetTempDir() throws IOException {
254     // cleanup any existing snapshots.
255     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
256     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
257       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
258         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
259       }
260     }
261   }
262 
263   /**
264    * Delete the specified snapshot
265    * @param snapshot
266    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
267    * @throws IOException For filesystem IOExceptions
268    */
269   public void deleteSnapshot(SnapshotDescription snapshot) throws SnapshotDoesNotExistException, IOException {
270     // check to see if it is completed
271     if (!isSnapshotCompleted(snapshot)) {
272       throw new SnapshotDoesNotExistException(snapshot);
273     }
274 
275     String snapshotName = snapshot.getName();
276     // first create the snapshot description and check to see if it exists
277     FileSystem fs = master.getMasterFileSystem().getFileSystem();
278     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
279     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
280     // just the "name" and it does not contains the "real" snapshot information
281     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
282 
283     // call coproc pre hook
284     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
285     if (cpHost != null) {
286       cpHost.preDeleteSnapshot(snapshot);
287     }
288 
289     LOG.debug("Deleting snapshot: " + snapshotName);
290     // delete the existing snapshot
291     if (!fs.delete(snapshotDir, true)) {
292       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
293     }
294 
295     // call coproc post hook
296     if (cpHost != null) {
297       cpHost.postDeleteSnapshot(snapshot);
298     }
299 
300   }
301 
302   /**
303    * Check if the specified snapshot is done
304    *
305    * @param expected
306    * @return true if snapshot is ready to be restored, false if it is still being taken.
307    * @throws IOException IOException if error from HDFS or RPC
308    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
309    */
310   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
311     // check the request to make sure it has a snapshot
312     if (expected == null) {
313       throw new UnknownSnapshotException(
314          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
315     }
316 
317     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
318 
319     // check to see if the sentinel exists,
320     // and if the task is complete removes it from the in-progress snapshots map.
321     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
322 
323     // stop tracking "abandoned" handlers
324     cleanupSentinels();
325 
326     if (handler == null) {
327       // If there's no handler in the in-progress map, it means one of the following:
328       //   - someone has already requested the snapshot state
329       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
330       //   - the snapshot was never requested
331       // In those cases returns to the user the "done state" if the snapshots exists on disk,
332       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
333       if (!isSnapshotCompleted(expected)) {
334         throw new UnknownSnapshotException("Snapshot " + ssString
335             + " is not currently running or one of the known completed snapshots.");
336       }
337       // was done, return true;
338       return true;
339     }
340 
341     // pass on any failure we find in the sentinel
342     try {
343       handler.rethrowExceptionIfFailed();
344     } catch (ForeignException e) {
345       // Give some procedure info on an exception.
346       String status;
347       Procedure p = coordinator.getProcedure(expected.getName());
348       if (p != null) {
349         status = p.getStatus();
350       } else {
351         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
352       }
353       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
354           expected);
355     }
356 
357     // check to see if we are done
358     if (handler.isFinished()) {
359       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
360       return true;
361     } else if (LOG.isDebugEnabled()) {
362       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
363     }
364     return false;
365   }
366 
367   /**
368    * Check to see if there is a snapshot in progress with the same name or on the same table.
369    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
370    * don't allow snapshot with the same name.
371    * @param snapshot description of the snapshot being checked.
372    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
373    *         table.
374    */
375   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
376     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
377     if (isTakingSnapshot(snapshotTable)) {
378       return true;
379     }
380     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
381     while (it.hasNext()) {
382       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
383       SnapshotSentinel sentinel = entry.getValue();
384       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
385         return true;
386       }
387     }
388     return false;
389   }
390 
391   /**
392    * Check to see if the specified table has a snapshot in progress.  Currently we have a
393    * limitation only allowing a single snapshot per table at a time.
394    * @param tableName name of the table being snapshotted.
395    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
396    */
397   synchronized boolean isTakingSnapshot(final TableName tableName) {
398     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
399     return handler != null && !handler.isFinished();
400   }
401 
402   /**
403    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
404    * aren't already running a snapshot or restore on the requested table.
405    * @param snapshot description of the snapshot we want to start
406    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
407    */
408   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
409       throws HBaseSnapshotException {
410     FileSystem fs = master.getMasterFileSystem().getFileSystem();
411     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
412     TableName snapshotTable =
413         TableName.valueOf(snapshot.getTable());
414 
415     // make sure we aren't already running a snapshot
416     if (isTakingSnapshot(snapshot)) {
417       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
418       throw new SnapshotCreationException("Rejected taking "
419           + ClientSnapshotDescriptionUtils.toString(snapshot)
420           + " because we are already running another snapshot "
421           + (handler != null ? ("on the same table " +
422               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
423               : "with the same name"), snapshot);
424     }
425 
426     // make sure we aren't running a restore on the same table
427     if (isRestoringTable(snapshotTable)) {
428       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
429       throw new SnapshotCreationException("Rejected taking "
430           + ClientSnapshotDescriptionUtils.toString(snapshot)
431           + " because we are already have a restore in progress on the same snapshot "
432           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
433     }
434 
435     try {
436       // delete the working directory, since we aren't running the snapshot. Likely leftovers
437       // from a failed attempt.
438       fs.delete(workingDir, true);
439 
440       // recreate the working directory for the snapshot
441       if (!fs.mkdirs(workingDir)) {
442         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
443             + ") for snapshot" , snapshot);
444       }
445     } catch (HBaseSnapshotException e) {
446       throw e;
447     } catch (IOException e) {
448       throw new SnapshotCreationException(
449           "Exception while checking to see if snapshot could be started.", e, snapshot);
450     }
451   }
452 
453   /**
454    * Take a snapshot of a disabled table.
455    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
456    * @throws HBaseSnapshotException if the snapshot could not be started
457    */
458   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
459       throws HBaseSnapshotException {
460     // setup the snapshot
461     prepareToTakeSnapshot(snapshot);
462 
463     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
464     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
465 
466     // Take the snapshot of the disabled table
467     DisabledTableSnapshotHandler handler =
468         new DisabledTableSnapshotHandler(snapshot, master);
469     snapshotTable(snapshot, handler);
470   }
471 
472   /**
473    * Take a snapshot of an enabled table.
474    * @param snapshot description of the snapshot to take.
475    * @throws HBaseSnapshotException if the snapshot could not be started
476    */
477   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
478       throws HBaseSnapshotException {
479     // setup the snapshot
480     prepareToTakeSnapshot(snapshot);
481 
482     // Take the snapshot of the enabled table
483     EnabledTableSnapshotHandler handler =
484         new EnabledTableSnapshotHandler(snapshot, master, this);
485     snapshotTable(snapshot, handler);
486   }
487 
488   /**
489    * Take a snapshot using the specified handler.
490    * On failure the snapshot temporary working directory is removed.
491    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
492    *       snapshot request if the table is busy with another snapshot/restore operation.
493    * @param snapshot the snapshot description
494    * @param handler the snapshot handler
495    */
496   private synchronized void snapshotTable(SnapshotDescription snapshot,
497       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
498     try {
499       handler.prepare();
500       this.executorService.submit(handler);
501       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
502     } catch (Exception e) {
503       // cleanup the working directory by trying to delete it from the fs.
504       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
505       try {
506         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
507           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
508               ClientSnapshotDescriptionUtils.toString(snapshot));
509         }
510       } catch (IOException e1) {
511         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
512             ClientSnapshotDescriptionUtils.toString(snapshot));
513       }
514       // fail the snapshot
515       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
516     }
517   }
518 
519   /**
520    * Take a snapshot based on the enabled/disabled state of the table.
521    *
522    * @param snapshot
523    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
524    * @throws IOException when some sort of generic IO exception occurs.
525    */
526   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
527     // check to see if we already completed the snapshot
528     if (isSnapshotCompleted(snapshot)) {
529       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
530           + "' already stored on the filesystem.", snapshot);
531     }
532 
533     LOG.debug("No existing snapshot, attempting snapshot...");
534 
535     // stop tracking "abandoned" handlers
536     cleanupSentinels();
537 
538     // check to see if the table exists
539     HTableDescriptor desc = null;
540     try {
541       desc = master.getTableDescriptors().get(
542           TableName.valueOf(snapshot.getTable()));
543     } catch (FileNotFoundException e) {
544       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
545       LOG.error(msg);
546       throw new SnapshotCreationException(msg, e, snapshot);
547     } catch (IOException e) {
548       throw new SnapshotCreationException("Error while geting table description for table "
549           + snapshot.getTable(), e, snapshot);
550     }
551     if (desc == null) {
552       throw new SnapshotCreationException("Table '" + snapshot.getTable()
553           + "' doesn't exist, can't take snapshot.", snapshot);
554     }
555     SnapshotDescription.Builder builder = snapshot.toBuilder();
556     // if not specified, set the snapshot format
557     if (!snapshot.hasVersion()) {
558       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
559     }
560     User user = RpcServer.getRequestUser();
561     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
562       builder.setOwner(user.getShortName());
563     }
564     snapshot = builder.build();
565 
566     // call pre coproc hook
567     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
568     if (cpHost != null) {
569       cpHost.preSnapshot(snapshot, desc);
570     }
571 
572     // if the table is enabled, then have the RS run actually the snapshot work
573     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
574     AssignmentManager assignmentMgr = master.getAssignmentManager();
575     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
576         TableState.State.ENABLED)) {
577       LOG.debug("Table enabled, starting distributed snapshot.");
578       snapshotEnabledTable(snapshot);
579       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
580     }
581     // For disabled table, snapshot is created by the master
582     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
583         TableState.State.DISABLED)) {
584       LOG.debug("Table is disabled, running snapshot entirely on master.");
585       snapshotDisabledTable(snapshot);
586       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
587     } else {
588       LOG.error("Can't snapshot table '" + snapshot.getTable()
589           + "', isn't open or closed, we don't know what to do!");
590       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
591           + " isn't fully open.");
592       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
593     }
594 
595     // call post coproc hook
596     if (cpHost != null) {
597       cpHost.postSnapshot(snapshot, desc);
598     }
599   }
600 
601   /**
602    * Set the handler for the current snapshot
603    * <p>
604    * Exposed for TESTING
605    * @param tableName
606    * @param handler handler the master should use
607    *
608    * TODO get rid of this if possible, repackaging, modify tests.
609    */
610   public synchronized void setSnapshotHandlerForTesting(
611       final TableName tableName,
612       final SnapshotSentinel handler) {
613     if (handler != null) {
614       this.snapshotHandlers.put(tableName, handler);
615     } else {
616       this.snapshotHandlers.remove(tableName);
617     }
618   }
619 
620   /**
621    * @return distributed commit coordinator for all running snapshots
622    */
623   ProcedureCoordinator getCoordinator() {
624     return coordinator;
625   }
626 
627   /**
628    * Check to see if the snapshot is one of the currently completed snapshots
629    * Returns true if the snapshot exists in the "completed snapshots folder".
630    *
631    * @param snapshot expected snapshot to check
632    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
633    *         not stored
634    * @throws IOException if the filesystem throws an unexpected exception,
635    * @throws IllegalArgumentException if snapshot name is invalid.
636    */
637   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
638     try {
639       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
640       FileSystem fs = master.getMasterFileSystem().getFileSystem();
641       // check to see if the snapshot already exists
642       return fs.exists(snapshotDir);
643     } catch (IllegalArgumentException iae) {
644       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
645     }
646   }
647 
648   /**
649    * Clone the specified snapshot into a new table.
650    * The operation will fail if the destination table has a snapshot or restore in progress.
651    *
652    * @param snapshot Snapshot Descriptor
653    * @param hTableDescriptor Table Descriptor of the table to create
654    */
655   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
656       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
657     TableName tableName = hTableDescriptor.getTableName();
658 
659     // make sure we aren't running a snapshot on the same table
660     if (isTakingSnapshot(tableName)) {
661       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
662     }
663 
664     // make sure we aren't running a restore on the same table
665     if (isRestoringTable(tableName)) {
666       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
667     }
668 
669     try {
670       CloneSnapshotHandler handler =
671         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
672       this.executorService.submit(handler);
673       this.restoreHandlers.put(tableName, handler);
674     } catch (Exception e) {
675       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
676         " on table=" + tableName;
677       LOG.error(msg, e);
678       throw new RestoreSnapshotException(msg, e);
679     }
680   }
681 
682   /**
683    * Restore the specified snapshot
684    * @param reqSnapshot
685    * @throws IOException
686    */
687   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
688     FileSystem fs = master.getMasterFileSystem().getFileSystem();
689     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
690     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
691 
692     // check if the snapshot exists
693     if (!fs.exists(snapshotDir)) {
694       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
695       throw new SnapshotDoesNotExistException(reqSnapshot);
696     }
697 
698     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
699     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
700     // information.
701     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
702     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
703         snapshotDir, snapshot);
704     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
705     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
706 
707     // stop tracking "abandoned" handlers
708     cleanupSentinels();
709 
710     // Verify snapshot validity
711     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
712 
713     // Execute the restore/clone operation
714     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
715       if (master.getTableStateManager().isTableState(
716           TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
717         throw new UnsupportedOperationException("Table '" +
718             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
719             "perform a restore operation" +
720             ".");
721       }
722 
723       // call coproc pre hook
724       if (cpHost != null) {
725         cpHost.preRestoreSnapshot(reqSnapshot, snapshotTableDesc);
726       }
727       try {
728         // Table already exist. Check and update the region quota for this table namespace
729         checkAndUpdateNamespaceRegionQuota(manifest, tableName);
730         restoreSnapshot(snapshot, snapshotTableDesc);
731       } catch (IOException e) {
732         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
733         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
734             + " as table " + tableName.getNameAsString(), e);
735         throw e;
736       }
737       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
738 
739       if (cpHost != null) {
740         cpHost.postRestoreSnapshot(reqSnapshot, snapshotTableDesc);
741       }
742     } else {
743       HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
744       if (cpHost != null) {
745         cpHost.preCloneSnapshot(reqSnapshot, htd);
746       }
747       try {
748         checkAndUpdateNamespaceQuota(manifest, tableName);
749         cloneSnapshot(snapshot, htd);
750       } catch (IOException e) {
751         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
752         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
753             + " as table " + tableName.getNameAsString(), e);
754         throw e;
755       }
756       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
757 
758       if (cpHost != null) {
759         cpHost.postCloneSnapshot(reqSnapshot, htd);
760       }
761     }
762   }
763 
764   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
765       throws IOException {
766     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
767       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
768         manifest.getRegionManifestsMap().size());
769     }
770   }
771 
772   private void checkAndUpdateNamespaceRegionQuota(SnapshotManifest manifest, TableName tableName)
773       throws IOException {
774     if (this.master.getMasterQuotaManager().isQuotaEnabled()) {
775       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
776         manifest.getRegionManifestsMap().size());
777     }
778   }
779 
780   /**
781    * Restore the specified snapshot.
782    * The restore will fail if the destination table has a snapshot or restore in progress.
783    *
784    * @param snapshot Snapshot Descriptor
785    * @param hTableDescriptor Table Descriptor
786    */
787   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
788       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
789     TableName tableName = hTableDescriptor.getTableName();
790 
791     // make sure we aren't running a snapshot on the same table
792     if (isTakingSnapshot(tableName)) {
793       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
794     }
795 
796     // make sure we aren't running a restore on the same table
797     if (isRestoringTable(tableName)) {
798       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
799     }
800 
801     try {
802       RestoreSnapshotHandler handler =
803         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
804       this.executorService.submit(handler);
805       restoreHandlers.put(tableName, handler);
806     } catch (Exception e) {
807       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
808           snapshot)  +
809           " on table=" + tableName;
810       LOG.error(msg, e);
811       throw new RestoreSnapshotException(msg, e);
812     }
813   }
814 
815   /**
816    * Verify if the restore of the specified table is in progress.
817    *
818    * @param tableName table under restore
819    * @return <tt>true</tt> if there is a restore in progress of the specified table.
820    */
821   private synchronized boolean isRestoringTable(final TableName tableName) {
822     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
823     return(sentinel != null && !sentinel.isFinished());
824   }
825 
826   /**
827    * Returns the status of a restore operation.
828    * If the in-progress restore is failed throws the exception that caused the failure.
829    *
830    * @param snapshot
831    * @return false if in progress, true if restore is completed or not requested.
832    * @throws IOException if there was a failure during the restore
833    */
834   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
835     // check to see if the sentinel exists,
836     // and if the task is complete removes it from the in-progress restore map.
837     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
838 
839     // stop tracking "abandoned" handlers
840     cleanupSentinels();
841 
842     if (sentinel == null) {
843       // there is no sentinel so restore is not in progress.
844       return true;
845     }
846 
847     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
848         + sentinel.getSnapshot().getName() + " table=" +
849         TableName.valueOf(snapshot.getTable()));
850 
851     // If the restore is failed, rethrow the exception
852     sentinel.rethrowExceptionIfFailed();
853 
854     // check to see if we are done
855     if (sentinel.isFinished()) {
856       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
857           " has completed. Notifying the client.");
858       return true;
859     }
860 
861     if (LOG.isDebugEnabled()) {
862       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
863           ClientSnapshotDescriptionUtils.toString(snapshot));
864     }
865     return false;
866   }
867 
868   /**
869    * Return the handler if it is currently live and has the same snapshot target name.
870    * The handler is removed from the sentinels map if completed.
871    * @param sentinels live handlers
872    * @param snapshot snapshot description
873    * @return null if doesn't match, else a live handler.
874    */
875   private synchronized SnapshotSentinel removeSentinelIfFinished(
876       final Map<TableName, SnapshotSentinel> sentinels,
877       final SnapshotDescription snapshot) {
878     if (!snapshot.hasTable()) {
879       return null;
880     }
881 
882     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
883     SnapshotSentinel h = sentinels.get(snapshotTable);
884     if (h == null) {
885       return null;
886     }
887 
888     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
889       // specified snapshot is to the one currently running
890       return null;
891     }
892 
893     // Remove from the "in-progress" list once completed
894     if (h.isFinished()) {
895       sentinels.remove(snapshotTable);
896     }
897 
898     return h;
899   }
900 
901   /**
902    * Removes "abandoned" snapshot/restore requests.
903    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
904    * and the in-progress maps are cleaned up when the status of a completed task is requested.
905    * To avoid having sentinels staying around for long time if something client side is failed,
906    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
907    */
908   private void cleanupSentinels() {
909     cleanupSentinels(this.snapshotHandlers);
910     cleanupSentinels(this.restoreHandlers);
911   }
912 
913   /**
914    * Remove the sentinels that are marked as finished and the completion time
915    * has exceeded the removal timeout.
916    * @param sentinels map of sentinels to clean
917    */
918   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
919     long currentTime = EnvironmentEdgeManager.currentTime();
920     Iterator<Map.Entry<TableName, SnapshotSentinel>> it =
921         sentinels.entrySet().iterator();
922     while (it.hasNext()) {
923       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
924       SnapshotSentinel sentinel = entry.getValue();
925       if (sentinel.isFinished() &&
926           (currentTime - sentinel.getCompletionTimestamp()) > SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT)
927       {
928         it.remove();
929       }
930     }
931   }
932 
933   //
934   // Implementing Stoppable interface
935   //
936 
937   @Override
938   public void stop(String why) {
939     // short circuit
940     if (this.stopped) return;
941     // make sure we get stop
942     this.stopped = true;
943     // pass the stop onto take snapshot handlers
944     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
945       snapshotHandler.cancel(why);
946     }
947 
948     // pass the stop onto all the restore handlers
949     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
950       restoreHandler.cancel(why);
951     }
952     try {
953       if (coordinator != null) {
954         coordinator.close();
955       }
956     } catch (IOException e) {
957       LOG.error("stop ProcedureCoordinator error", e);
958     }
959   }
960 
961   @Override
962   public boolean isStopped() {
963     return this.stopped;
964   }
965 
966   /**
967    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
968    * Called at the beginning of snapshot() and restoreSnapshot() methods.
969    * @throws UnsupportedOperationException if snapshot are not supported
970    */
971   public void checkSnapshotSupport() throws UnsupportedOperationException {
972     if (!this.isSnapshotSupported) {
973       throw new UnsupportedOperationException(
974         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
975           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
976     }
977   }
978 
979   /**
980    * Called at startup, to verify if snapshot operation is supported, and to avoid
981    * starting the master if there're snapshots present but the cleaners needed are missing.
982    * Otherwise we can end up with snapshot data loss.
983    * @param conf The {@link Configuration} object to use
984    * @param mfs The MasterFileSystem to use
985    * @throws IOException in case of file-system operation failure
986    * @throws UnsupportedOperationException in case cleaners are missing and
987    *         there're snapshot in the system
988    */
989   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
990       throws IOException, UnsupportedOperationException {
991     // Verify if snapshot is disabled by the user
992     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
993     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
994     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
995 
996     // Extract cleaners from conf
997     Set<String> hfileCleaners = new HashSet<String>();
998     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
999     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1000 
1001     Set<String> logCleaners = new HashSet<String>();
1002     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1003     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1004 
1005     // check if an older version of snapshot directory was present
1006     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1007     FileSystem fs = mfs.getFileSystem();
1008     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1009     if (ss != null && !ss.isEmpty()) {
1010       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1011       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1012     }
1013 
1014     // If the user has enabled the snapshot, we force the cleaners to be present
1015     // otherwise we still need to check if cleaners are enabled or not and verify
1016     // that there're no snapshot in the .snapshot folder.
1017     if (snapshotEnabled) {
1018       // Inject snapshot cleaners, if snapshot.enable is true
1019       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1020       hfileCleaners.add(HFileLinkCleaner.class.getName());
1021 
1022       // Set cleaners conf
1023       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1024         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1025       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1026         logCleaners.toArray(new String[logCleaners.size()]));
1027     } else {
1028       // Verify if cleaners are present
1029       snapshotEnabled =
1030         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1031         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1032 
1033       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1034       if (snapshotEnabled) {
1035         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1036           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1037           (userDisabled ? "is set to 'false'." : "is not set."));
1038       }
1039     }
1040 
1041     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1042     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1043 
1044     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1045     // otherwise we end up with snapshot data loss.
1046     if (!snapshotEnabled) {
1047       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1048       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1049       if (fs.exists(snapshotDir)) {
1050         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1051           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1052         if (snapshots != null) {
1053           LOG.error("Snapshots are present, but cleaners are not enabled.");
1054           checkSnapshotSupport();
1055         }
1056       }
1057     }
1058   }
1059 
1060   @Override
1061   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1062       IOException, UnsupportedOperationException {
1063     this.master = master;
1064 
1065     this.rootDir = master.getMasterFileSystem().getRootDir();
1066     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1067 
1068     // get the configuration for the coordinator
1069     Configuration conf = master.getConfiguration();
1070     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1071     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1072                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1073             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1074                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1075     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1076 
1077     // setup the default procedure coordinator
1078     String name = master.getServerName().toString();
1079     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1080     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1081         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1082 
1083     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1084     this.executorService = master.getExecutorService();
1085     resetTempDir();
1086   }
1087 
1088   @Override
1089   public String getProcedureSignature() {
1090     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1091   }
1092 
1093   @Override
1094   public void execProcedure(ProcedureDescription desc) throws IOException {
1095     takeSnapshot(toSnapshotDescription(desc));
1096   }
1097 
1098   @Override
1099   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1100     return isSnapshotDone(toSnapshotDescription(desc));
1101   }
1102 
1103   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1104       throws IOException {
1105     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1106     if (!desc.hasInstance()) {
1107       throw new IOException("Snapshot name is not defined: " + desc.toString());
1108     }
1109     String snapshotName = desc.getInstance();
1110     List<NameStringPair> props = desc.getConfigurationList();
1111     String table = null;
1112     for (NameStringPair prop : props) {
1113       if ("table".equalsIgnoreCase(prop.getName())) {
1114         table = prop.getValue();
1115       }
1116     }
1117     if (table == null) {
1118       throw new IOException("Snapshot table is not defined: " + desc.toString());
1119     }
1120     TableName tableName = TableName.valueOf(table);
1121     builder.setTable(tableName.getNameAsString());
1122     builder.setName(snapshotName);
1123     builder.setType(SnapshotDescription.Type.FLUSH);
1124     return builder.build();
1125   }
1126 }