View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master.snapshot;
19  
20  import java.io.FileNotFoundException;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.ScheduledFuture;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.locks.ReadWriteLock;
36  import java.util.concurrent.locks.ReentrantReadWriteLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.classification.InterfaceStability;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FSDataInputStream;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.hbase.TableName;
48  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HTableDescriptor;
51  import org.apache.hadoop.hbase.Stoppable;
52  import org.apache.hadoop.hbase.MetaTableAccessor;
53  import org.apache.hadoop.hbase.errorhandling.ForeignException;
54  import org.apache.hadoop.hbase.executor.ExecutorService;
55  import org.apache.hadoop.hbase.ipc.RpcServer;
56  import org.apache.hadoop.hbase.master.AssignmentManager;
57  import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
58  import org.apache.hadoop.hbase.master.MasterFileSystem;
59  import org.apache.hadoop.hbase.master.MasterServices;
60  import org.apache.hadoop.hbase.master.MetricsMaster;
61  import org.apache.hadoop.hbase.master.SnapshotSentinel;
62  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
63  import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
64  import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
65  import org.apache.hadoop.hbase.procedure.Procedure;
66  import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
67  import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
68  import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
69  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
70  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
71  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
72  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
73  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
74  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
75  import org.apache.hadoop.hbase.security.AccessDeniedException;
76  import org.apache.hadoop.hbase.security.User;
77  import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
78  import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
79  import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
80  import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
81  import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
82  import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
83  import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
84  import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
85  import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
86  import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
87  import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
88  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
89  import org.apache.hadoop.hbase.util.FSUtils;
90  import org.apache.zookeeper.KeeperException;
91  
92  import com.google.common.annotations.VisibleForTesting;
93  import com.google.common.util.concurrent.ThreadFactoryBuilder;
94  
95  /**
96   * This class manages the procedure of taking and restoring snapshots. There is only one
97   * SnapshotManager for the master.
98   * <p>
99   * The class provides methods for monitoring in-progress snapshot actions.
100  * <p>
101  * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
102  * simplification in the current implementation.
103  */
104 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
105 @InterfaceStability.Unstable
106 public class SnapshotManager extends MasterProcedureManager implements Stoppable {
107   private static final Log LOG = LogFactory.getLog(SnapshotManager.class);
108 
109   /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
110   private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
111 
112   /**
113    * Wait time before removing a finished sentinel from the in-progress map
114    *
115    * NOTE: This is used as a safety auto cleanup.
116    * The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
117    * restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
118    * In case something fails on the client side and the snapshot/restore state is not reclaimed
119    * after a default timeout, the entry is removed from the in-progress map.
120    * At this point, if the user asks for the snapshot/restore status, the result will be
121    * snapshot done if exists or failed if it doesn't exists.
122    */
123   public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
124       "hbase.snapshot.sentinels.cleanup.timeoutMillis";
125   public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
126 
127   /** Enable or disable snapshot support */
128   public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
129 
130   /**
131    * Conf key for # of ms elapsed between checks for snapshot errors while waiting for
132    * completion.
133    */
134   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
135 
136   /** Name of the operation to use in the controller */
137   public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
138 
139   /** Conf key for # of threads used by the SnapshotManager thread pool */
140   private static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
141 
142   /** number of current operations running on the master */
143   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
144 
145   private boolean stopped;
146   private MasterServices master;  // Needed by TableEventHandlers
147   private ProcedureCoordinator coordinator;
148 
149   // Is snapshot feature enabled?
150   private boolean isSnapshotSupported = false;
151 
152   // Snapshot handlers map, with table name as key.
153   // The map is always accessed and modified under the object lock using synchronized.
154   // snapshotTable() will insert an Handler in the table.
155   // isSnapshotDone() will remove the handler requested if the operation is finished.
156   private final Map<TableName, SnapshotSentinel> snapshotHandlers =
157       new HashMap<TableName, SnapshotSentinel>();
158   private final ScheduledExecutorService scheduleThreadPool =
159         Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
160               .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
161   private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
162 
163   // Restore Sentinels map, with table name as key.
164   // The map is always accessed and modified under the object lock using synchronized.
165   // restoreSnapshot()/cloneSnapshot() will insert an Handler in the table.
166   // isRestoreDone() will remove the handler requested if the operation is finished.
167   private Map<TableName, SnapshotSentinel> restoreHandlers =
168       new HashMap<TableName, SnapshotSentinel>();
169 
170   private Path rootDir;
171   private ExecutorService executorService;
172 
173   /**
174    * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
175    * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
176    * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
177    * start to work. (See HBASE-21387)
178    */
179   private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
180 
181   public SnapshotManager() {}
182 
183   /**
184    * Fully specify all necessary components of a snapshot manager. Exposed for testing.
185    * @param master services for the master where the manager is running
186    * @param coordinator procedure coordinator instance.  exposed for testing.
187    * @param pool HBase ExecutorServcie instance, exposed for testing.
188    */
189   @VisibleForTesting
190   SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
191       ExecutorService pool, int sentinelCleanInterval)
192       throws IOException, UnsupportedOperationException {
193     this.master = master;
194 
195     this.rootDir = master.getMasterFileSystem().getRootDir();
196     Configuration conf = master.getConfiguration();
197     checkSnapshotSupport(conf, master.getMasterFileSystem());
198 
199     this.coordinator = coordinator;
200     this.executorService = pool;
201     resetTempDir();
202     initSnapshotHandlerChoreCleanerTask(sentinelCleanInterval);
203   }
204 
205   private void initSnapshotHandlerChoreCleanerTask(long sentinelCleanInterval) {
206     snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(new Runnable() {
207       @Override
208       public void run() {
209         cleanupSentinels();
210       }
211     }, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
212   }
213 
214   /**
215    * Gets the list of all completed snapshots.
216    * @return list of SnapshotDescriptions
217    * @throws IOException File system exception
218    */
219   public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
220     return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir));
221   }
222 
223   /**
224    * Gets the list of all completed snapshots.
225    * @param snapshotDir snapshot directory
226    * @return list of SnapshotDescriptions
227    * @throws IOException File system exception
228    */
229   private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir) throws IOException {
230     List<SnapshotDescription> snapshotDescs = new ArrayList<SnapshotDescription>();
231     // first create the snapshot root path and check to see if it exists
232     FileSystem fs = master.getMasterFileSystem().getFileSystem();
233     if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
234 
235     // if there are no snapshots, return an empty list
236     if (!fs.exists(snapshotDir)) {
237       return snapshotDescs;
238     }
239 
240     // ignore all the snapshots in progress
241     FileStatus[] snapshots = fs.listStatus(snapshotDir,
242       new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
243     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
244     // loop through all the completed snapshots
245     for (FileStatus snapshot : snapshots) {
246       Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
247       // if the snapshot is bad
248       if (!fs.exists(info)) {
249         LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
250         continue;
251       }
252       FSDataInputStream in = null;
253       try {
254         in = fs.open(info);
255         SnapshotDescription desc = SnapshotDescription.parseFrom(in);
256         if (cpHost != null) {
257           try {
258             cpHost.preListSnapshot(desc);
259           } catch (AccessDeniedException e) {
260             LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
261                 + "Either you should be owner of this snapshot or admin user.");
262             // Skip this and try for next snapshot
263             continue;
264           }
265         }
266         snapshotDescs.add(desc);
267 
268         // call coproc post hook
269         if (cpHost != null) {
270           cpHost.postListSnapshot(desc);
271         }
272       } catch (IOException e) {
273         LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
274       } finally {
275         if (in != null) {
276           in.close();
277         }
278       }
279     }
280     return snapshotDescs;
281   }
282 
283   /**
284    * Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
285    * snapshot attempts.
286    *
287    * @throws IOException if we can't reach the filesystem
288    */
289   private void resetTempDir() throws IOException {
290     // cleanup any existing snapshots.
291     Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir);
292     if (master.getMasterFileSystem().getFileSystem().exists(tmpdir)) {
293       if (!master.getMasterFileSystem().getFileSystem().delete(tmpdir, true)) {
294         LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
295       }
296     }
297   }
298 
299   /**
300    * Delete the specified snapshot
301    * @param snapshot
302    * @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
303    * @throws IOException For filesystem IOExceptions
304    */
305   public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
306     // check to see if it is completed
307     if (!isSnapshotCompleted(snapshot)) {
308       throw new SnapshotDoesNotExistException(snapshot);
309     }
310 
311     String snapshotName = snapshot.getName();
312     // first create the snapshot description and check to see if it exists
313     FileSystem fs = master.getMasterFileSystem().getFileSystem();
314     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
315     // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
316     // just the "name" and it does not contains the "real" snapshot information
317     snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
318 
319     // call coproc pre hook
320     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
321     if (cpHost != null) {
322       cpHost.preDeleteSnapshot(snapshot);
323     }
324 
325     LOG.debug("Deleting snapshot: " + snapshotName);
326     // delete the existing snapshot
327     if (!fs.delete(snapshotDir, true)) {
328       throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
329     }
330 
331     // call coproc post hook
332     if (cpHost != null) {
333       cpHost.postDeleteSnapshot(snapshot);
334     }
335 
336   }
337 
338   /**
339    * Check if the specified snapshot is done
340    *
341    * @param expected
342    * @return true if snapshot is ready to be restored, false if it is still being taken.
343    * @throws IOException IOException if error from HDFS or RPC
344    * @throws UnknownSnapshotException if snapshot is invalid or does not exist.
345    */
346   public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
347     // check the request to make sure it has a snapshot
348     if (expected == null) {
349       throw new UnknownSnapshotException(
350          "No snapshot name passed in request, can't figure out which snapshot you want to check.");
351     }
352 
353     String ssString = ClientSnapshotDescriptionUtils.toString(expected);
354 
355     // check to see if the sentinel exists,
356     // and if the task is complete removes it from the in-progress snapshots map.
357     SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
358 
359     // stop tracking "abandoned" handlers
360     cleanupSentinels();
361 
362     if (handler == null) {
363       // If there's no handler in the in-progress map, it means one of the following:
364       //   - someone has already requested the snapshot state
365       //   - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
366       //   - the snapshot was never requested
367       // In those cases returns to the user the "done state" if the snapshots exists on disk,
368       // otherwise raise an exception saying that the snapshot is not running and doesn't exist.
369       if (!isSnapshotCompleted(expected)) {
370         throw new UnknownSnapshotException("Snapshot " + ssString
371             + " is not currently running or one of the known completed snapshots.");
372       }
373       // was done, return true;
374       return true;
375     }
376 
377     // pass on any failure we find in the sentinel
378     try {
379       handler.rethrowExceptionIfFailed();
380     } catch (ForeignException e) {
381       // Give some procedure info on an exception.
382       String status;
383       Procedure p = coordinator.getProcedure(expected.getName());
384       if (p != null) {
385         status = p.getStatus();
386       } else {
387         status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
388       }
389       throw new HBaseSnapshotException("Snapshot " + ssString +  " had an error.  " + status, e,
390           expected);
391     }
392 
393     // check to see if we are done
394     if (handler.isFinished()) {
395       LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
396       return true;
397     } else if (LOG.isDebugEnabled()) {
398       LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
399     }
400     return false;
401   }
402 
403   /**
404    * Check to see if there is a snapshot in progress with the same name or on the same table.
405    * Currently we have a limitation only allowing a single snapshot per table at a time. Also we
406    * don't allow snapshot with the same name.
407    * @param snapshot description of the snapshot being checked.
408    * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
409    *         table.
410    */
411   synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
412     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
413     if (isTakingSnapshot(snapshotTable)) {
414       return true;
415     }
416     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
417     while (it.hasNext()) {
418       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
419       SnapshotSentinel sentinel = entry.getValue();
420       if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
421         return true;
422       }
423     }
424     return false;
425   }
426 
427   /**
428    * Check to see if the specified table has a snapshot in progress.  Currently we have a
429    * limitation only allowing a single snapshot per table at a time.
430    * @param tableName name of the table being snapshotted.
431    * @return <tt>true</tt> if there is a snapshot in progress on the specified table.
432    */
433   synchronized boolean isTakingSnapshot(final TableName tableName) {
434     SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
435     return handler != null && !handler.isFinished();
436   }
437 
438   /**
439    * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
440    * aren't already running a snapshot or restore on the requested table.
441    * @param snapshot description of the snapshot we want to start
442    * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
443    */
444   private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
445       throws HBaseSnapshotException {
446     FileSystem fs = master.getMasterFileSystem().getFileSystem();
447     Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
448     TableName snapshotTable =
449         TableName.valueOf(snapshot.getTable());
450 
451     // make sure we aren't already running a snapshot
452     if (isTakingSnapshot(snapshot)) {
453       SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
454       throw new SnapshotCreationException("Rejected taking "
455           + ClientSnapshotDescriptionUtils.toString(snapshot)
456           + " because we are already running another snapshot "
457           + (handler != null ? ("on the same table " +
458               ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
459               : "with the same name"), snapshot);
460     }
461 
462     // make sure we aren't running a restore on the same table
463     if (isRestoringTable(snapshotTable)) {
464       SnapshotSentinel handler = restoreHandlers.get(snapshotTable);
465       throw new SnapshotCreationException("Rejected taking "
466           + ClientSnapshotDescriptionUtils.toString(snapshot)
467           + " because we are already have a restore in progress on the same snapshot "
468           + ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()), snapshot);
469     }
470 
471     try {
472       // delete the working directory, since we aren't running the snapshot. Likely leftovers
473       // from a failed attempt.
474       fs.delete(workingDir, true);
475 
476       // recreate the working directory for the snapshot
477       if (!fs.mkdirs(workingDir)) {
478         throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
479             + ") for snapshot" , snapshot);
480       }
481     } catch (HBaseSnapshotException e) {
482       throw e;
483     } catch (IOException e) {
484       throw new SnapshotCreationException(
485           "Exception while checking to see if snapshot could be started.", e, snapshot);
486     }
487   }
488 
489   /**
490    * Take a snapshot of a disabled table.
491    * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
492    * @throws HBaseSnapshotException if the snapshot could not be started
493    */
494   private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
495       throws HBaseSnapshotException {
496     // setup the snapshot
497     prepareToTakeSnapshot(snapshot);
498 
499     // set the snapshot to be a disabled snapshot, since the client doesn't know about that
500     snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
501 
502     // Take the snapshot of the disabled table
503     DisabledTableSnapshotHandler handler =
504         new DisabledTableSnapshotHandler(snapshot, master, this);
505     snapshotTable(snapshot, handler);
506   }
507 
508   /**
509    * Take a snapshot of an enabled table.
510    * @param snapshot description of the snapshot to take.
511    * @throws HBaseSnapshotException if the snapshot could not be started
512    */
513   private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
514       throws HBaseSnapshotException {
515     // setup the snapshot
516     prepareToTakeSnapshot(snapshot);
517 
518     // Take the snapshot of the enabled table
519     EnabledTableSnapshotHandler handler =
520         new EnabledTableSnapshotHandler(snapshot, master, this);
521     snapshotTable(snapshot, handler);
522   }
523 
524   /**
525    * Take a snapshot using the specified handler.
526    * On failure the snapshot temporary working directory is removed.
527    * NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
528    *       snapshot request if the table is busy with another snapshot/restore operation.
529    * @param snapshot the snapshot description
530    * @param handler the snapshot handler
531    */
532   private synchronized void snapshotTable(SnapshotDescription snapshot,
533       final TakeSnapshotHandler handler) throws HBaseSnapshotException {
534     try {
535       handler.prepare();
536       this.executorService.submit(handler);
537       this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
538     } catch (Exception e) {
539       // cleanup the working directory by trying to delete it from the fs.
540       Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
541       try {
542         if (!this.master.getMasterFileSystem().getFileSystem().delete(workingDir, true)) {
543           LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
544               ClientSnapshotDescriptionUtils.toString(snapshot));
545         }
546       } catch (IOException e1) {
547         LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
548             ClientSnapshotDescriptionUtils.toString(snapshot));
549       }
550       // fail the snapshot
551       throw new SnapshotCreationException("Could not build snapshot handler", e, snapshot);
552     }
553   }
554 
555   /**
556    * Take a snapshot based on the enabled/disabled state of the table.
557    * @param snapshot
558    * @throws HBaseSnapshotException when a snapshot specific exception occurs.
559    * @throws IOException when some sort of generic IO exception occurs.
560    */
561   public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
562     this.takingSnapshotLock.readLock().lock();
563     try {
564       takeSnapshotInternal(snapshot);
565     } finally {
566       this.takingSnapshotLock.readLock().unlock();
567     }
568   }
569 
570   private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
571     // check to see if we already completed the snapshot
572     if (isSnapshotCompleted(snapshot)) {
573       throw new SnapshotExistsException("Snapshot '" + snapshot.getName()
574           + "' already stored on the filesystem.", snapshot);
575     }
576 
577     LOG.debug("No existing snapshot, attempting snapshot...");
578 
579     // stop tracking "abandoned" handlers
580     cleanupSentinels();
581 
582     // check to see if the table exists
583     HTableDescriptor desc = null;
584     try {
585       desc = master.getTableDescriptors().get(
586           TableName.valueOf(snapshot.getTable()));
587     } catch (FileNotFoundException e) {
588       String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
589       LOG.error(msg);
590       throw new SnapshotCreationException(msg, e, snapshot);
591     } catch (IOException e) {
592       throw new SnapshotCreationException("Error while geting table description for table "
593           + snapshot.getTable(), e, snapshot);
594     }
595     if (desc == null) {
596       throw new SnapshotCreationException("Table '" + snapshot.getTable()
597           + "' doesn't exist, can't take snapshot.", snapshot);
598     }
599     SnapshotDescription.Builder builder = snapshot.toBuilder();
600     // if not specified, set the snapshot format
601     if (!snapshot.hasVersion()) {
602       builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
603     }
604     User user = RpcServer.getRequestUser();
605     if (User.isHBaseSecurityEnabled(master.getConfiguration()) && user != null) {
606       builder.setOwner(user.getShortName());
607     }
608     snapshot = builder.build();
609 
610     // call pre coproc hook
611     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
612     if (cpHost != null) {
613       cpHost.preSnapshot(snapshot, desc);
614     }
615 
616     // if the table is enabled, then have the RS run actually the snapshot work
617     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
618     AssignmentManager assignmentMgr = master.getAssignmentManager();
619     if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
620         ZooKeeperProtos.Table.State.ENABLED)) {
621       LOG.debug("Table enabled, starting distributed snapshot.");
622       snapshotEnabledTable(snapshot);
623       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
624     }
625     // For disabled table, snapshot is created by the master
626     else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
627         ZooKeeperProtos.Table.State.DISABLED)) {
628       LOG.debug("Table is disabled, running snapshot entirely on master.");
629       snapshotDisabledTable(snapshot);
630       LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
631     } else {
632       LOG.error("Can't snapshot table '" + snapshot.getTable()
633           + "', isn't open or closed, we don't know what to do!");
634       TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
635           + " isn't fully open.");
636       throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, snapshot);
637     }
638 
639     // call post coproc hook
640     if (cpHost != null) {
641       cpHost.postSnapshot(snapshot, desc);
642     }
643   }
644 
645   public ReadWriteLock getTakingSnapshotLock() {
646     return this.takingSnapshotLock;
647   }
648 
649   /**
650    * The snapshot operation processing as following: <br>
651    * 1. Create a Snapshot Handler, and do some initialization; <br>
652    * 2. Put the handler into snapshotHandlers <br>
653    * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
654    * and snapshotHandlers;
655    * @return true to indicate that there're some running snapshots.
656    */
657   public synchronized boolean isTakingAnySnapshot() {
658     return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0;
659   }
660 
661   /**
662    * Set the handler for the current snapshot
663    * <p>
664    * Exposed for TESTING
665    * @param tableName
666    * @param handler handler the master should use
667    *
668    * TODO get rid of this if possible, repackaging, modify tests.
669    */
670   public synchronized void setSnapshotHandlerForTesting(
671       final TableName tableName,
672       final SnapshotSentinel handler) {
673     if (handler != null) {
674       this.snapshotHandlers.put(tableName, handler);
675     } else {
676       this.snapshotHandlers.remove(tableName);
677     }
678   }
679 
680   /**
681    * @return distributed commit coordinator for all running snapshots
682    */
683   ProcedureCoordinator getCoordinator() {
684     return coordinator;
685   }
686 
687   /**
688    * Check to see if the snapshot is one of the currently completed snapshots
689    * Returns true if the snapshot exists in the "completed snapshots folder".
690    *
691    * @param snapshot expected snapshot to check
692    * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
693    *         not stored
694    * @throws IOException if the filesystem throws an unexpected exception,
695    * @throws IllegalArgumentException if snapshot name is invalid.
696    */
697   private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
698     try {
699       final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
700       FileSystem fs = master.getMasterFileSystem().getFileSystem();
701       // check to see if the snapshot already exists
702       return fs.exists(snapshotDir);
703     } catch (IllegalArgumentException iae) {
704       throw new UnknownSnapshotException("Unexpected exception thrown", iae);
705     }
706   }
707 
708   /**
709    * Clone the specified snapshot into a new table.
710    * The operation will fail if the destination table has a snapshot or restore in progress.
711    *
712    * @param snapshot Snapshot Descriptor
713    * @param hTableDescriptor Table Descriptor of the table to create
714    */
715   synchronized void cloneSnapshot(final SnapshotDescription snapshot,
716       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
717     TableName tableName = hTableDescriptor.getTableName();
718 
719     // make sure we aren't running a snapshot on the same table
720     if (isTakingSnapshot(tableName)) {
721       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
722     }
723 
724     // make sure we aren't running a restore on the same table
725     if (isRestoringTable(tableName)) {
726       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
727     }
728 
729     try {
730       CloneSnapshotHandler handler =
731         new CloneSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
732       this.executorService.submit(handler);
733       this.restoreHandlers.put(tableName, handler);
734     } catch (Exception e) {
735       String msg = "Couldn't clone the snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
736         " on table=" + tableName;
737       LOG.error(msg, e);
738       throw new RestoreSnapshotException(msg, e);
739     }
740   }
741 
742   /**
743    * Restore the specified snapshot
744    * @param reqSnapshot
745    * @throws IOException
746    */
747   public void restoreSnapshot(SnapshotDescription reqSnapshot) throws IOException {
748     FileSystem fs = master.getMasterFileSystem().getFileSystem();
749     Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
750     MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
751 
752     // check if the snapshot exists
753     if (!fs.exists(snapshotDir)) {
754       LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
755       throw new SnapshotDoesNotExistException(reqSnapshot);
756     }
757 
758     // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
759     // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
760     // information.
761     SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
762     SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
763         snapshotDir, snapshot);
764     HTableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
765     TableName tableName = TableName.valueOf(reqSnapshot.getTable());
766 
767     // stop tracking "abandoned" handlers
768     cleanupSentinels();
769 
770     // Verify snapshot validity
771     SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
772 
773     // Execute the restore/clone operation
774     if (MetaTableAccessor.tableExists(master.getConnection(), tableName)) {
775       if (master.getAssignmentManager().getTableStateManager().isTableState(
776           TableName.valueOf(snapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
777         throw new UnsupportedOperationException("Table '" +
778             TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
779             "perform a restore operation" +
780             ".");
781       }
782 
783       // call coproc pre hook
784       if (cpHost != null) {
785         cpHost.preRestoreSnapshot(snapshot, snapshotTableDesc);
786       }
787 
788       int tableRegionCount = -1;
789       try {
790         // Table already exist. Check and update the region quota for this table namespace.
791         // The region quota may not be updated correctly if there are concurrent restore snapshot
792         // requests for the same table
793 
794         tableRegionCount = getRegionCountOfTable(tableName);
795         int snapshotRegionCount = manifest.getRegionManifestsMap().size();
796 
797         // Update region quota when snapshotRegionCount is larger. If we updated the region count
798         // to a smaller value before retoreSnapshot and the retoreSnapshot fails, we may fail to
799         // reset the region count to its original value if the region quota is consumed by other
800         // tables in the namespace
801         if (tableRegionCount > 0 && tableRegionCount < snapshotRegionCount) {
802           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
803         }
804         restoreSnapshot(snapshot, snapshotTableDesc);
805         // Update the region quota if snapshotRegionCount is smaller. This step should not fail
806         // because we have reserved enough region quota before hand
807         if (tableRegionCount > 0 && tableRegionCount > snapshotRegionCount) {
808           checkAndUpdateNamespaceRegionQuota(snapshotRegionCount, tableName);
809         }
810       } catch (QuotaExceededException e) {
811         LOG.error("Region quota exceeded while restoring the snapshot " + snapshot.getName()
812           + " as table " + tableName.getNameAsString(), e);
813         // If QEE is thrown before restoreSnapshot, quota information is not updated, so we
814         // should throw the exception directly. If QEE is thrown after restoreSnapshot, there
815         // must be unexpected reasons, we also throw the exception directly
816         throw e;
817       } catch (IOException e) {
818         if (tableRegionCount > 0) {
819           // reset the region count for table
820           checkAndUpdateNamespaceRegionQuota(tableRegionCount, tableName);
821         }
822         LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
823             + " as table " + tableName.getNameAsString(), e);
824         throw e;
825       }
826       LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
827 
828       if (cpHost != null) {
829         cpHost.postRestoreSnapshot(snapshot, snapshotTableDesc);
830       }
831     } else {
832       HTableDescriptor htd = new HTableDescriptor(tableName, snapshotTableDesc);
833       if (cpHost != null) {
834         cpHost.preCloneSnapshot(snapshot, htd);
835       }
836       try {
837         checkAndUpdateNamespaceQuota(manifest, tableName);
838         cloneSnapshot(snapshot, htd);
839       } catch (IOException e) {
840         this.master.getMasterQuotaManager().removeTableFromNamespaceQuota(tableName);
841         LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
842             + " as table " + tableName.getNameAsString(), e);
843         throw e;
844       }
845       LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
846 
847       if (cpHost != null) {
848         cpHost.postCloneSnapshot(snapshot, htd);
849       }
850     }
851   }
852 
853   private void checkAndUpdateNamespaceQuota(SnapshotManifest manifest, TableName tableName)
854       throws IOException {
855     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
856       this.master.getMasterQuotaManager().checkNamespaceTableAndRegionQuota(tableName,
857         manifest.getRegionManifestsMap().size());
858     }
859   }
860 
861   private void checkAndUpdateNamespaceRegionQuota(int updatedRegionCount, TableName tableName)
862       throws IOException {
863     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
864       this.master.getMasterQuotaManager().checkAndUpdateNamespaceRegionQuota(tableName,
865         updatedRegionCount);
866     }
867   }
868 
869   /**
870    * @return cached region count, or -1 if quota manager is disabled or table status not found
871   */
872   private int getRegionCountOfTable(TableName tableName) throws IOException {
873     if (this.master.getMasterQuotaManager().isQuotaInitialized()) {
874       return this.master.getMasterQuotaManager().getRegionCountOfTable(tableName);
875     }
876     return -1;
877   }
878 
879   /**
880    * Restore the specified snapshot.
881    * The restore will fail if the destination table has a snapshot or restore in progress.
882    *
883    * @param snapshot Snapshot Descriptor
884    * @param hTableDescriptor Table Descriptor
885    */
886   private synchronized void restoreSnapshot(final SnapshotDescription snapshot,
887       final HTableDescriptor hTableDescriptor) throws HBaseSnapshotException {
888     TableName tableName = hTableDescriptor.getTableName();
889 
890     // make sure we aren't running a snapshot on the same table
891     if (isTakingSnapshot(tableName)) {
892       throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
893     }
894 
895     // make sure we aren't running a restore on the same table
896     if (isRestoringTable(tableName)) {
897       throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
898     }
899 
900     try {
901       RestoreSnapshotHandler handler =
902         new RestoreSnapshotHandler(master, snapshot, hTableDescriptor).prepare();
903       this.executorService.submit(handler);
904       restoreHandlers.put(tableName, handler);
905     } catch (Exception e) {
906       String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
907           snapshot)  +
908           " on table=" + tableName;
909       LOG.error(msg, e);
910       throw new RestoreSnapshotException(msg, e);
911     }
912   }
913 
914   /**
915    * Verify if the restore of the specified table is in progress.
916    *
917    * @param tableName table under restore
918    * @return <tt>true</tt> if there is a restore in progress of the specified table.
919    */
920   private synchronized boolean isRestoringTable(final TableName tableName) {
921     SnapshotSentinel sentinel = this.restoreHandlers.get(tableName);
922     return(sentinel != null && !sentinel.isFinished());
923   }
924 
925   /**
926    * Returns the status of a restore operation.
927    * If the in-progress restore is failed throws the exception that caused the failure.
928    *
929    * @param snapshot
930    * @return false if in progress, true if restore is completed or not requested.
931    * @throws IOException if there was a failure during the restore
932    */
933   public boolean isRestoreDone(final SnapshotDescription snapshot) throws IOException {
934     // check to see if the sentinel exists,
935     // and if the task is complete removes it from the in-progress restore map.
936     SnapshotSentinel sentinel = removeSentinelIfFinished(this.restoreHandlers, snapshot);
937 
938     // stop tracking "abandoned" handlers
939     cleanupSentinels();
940 
941     if (sentinel == null) {
942       // there is no sentinel so restore is not in progress.
943       return true;
944     }
945 
946     LOG.debug("Verify snapshot=" + snapshot.getName() + " against="
947         + sentinel.getSnapshot().getName() + " table=" +
948         TableName.valueOf(snapshot.getTable()));
949 
950     // If the restore is failed, rethrow the exception
951     sentinel.rethrowExceptionIfFailed();
952 
953     // check to see if we are done
954     if (sentinel.isFinished()) {
955       LOG.debug("Restore snapshot=" + ClientSnapshotDescriptionUtils.toString(snapshot) +
956           " has completed. Notifying the client.");
957       return true;
958     }
959 
960     if (LOG.isDebugEnabled()) {
961       LOG.debug("Sentinel is not yet finished with restoring snapshot=" +
962           ClientSnapshotDescriptionUtils.toString(snapshot));
963     }
964     return false;
965   }
966 
967   /**
968    * Return the handler if it is currently live and has the same snapshot target name.
969    * The handler is removed from the sentinels map if completed.
970    * @param sentinels live handlers
971    * @param snapshot snapshot description
972    * @return null if doesn't match, else a live handler.
973    */
974   private synchronized SnapshotSentinel removeSentinelIfFinished(
975       final Map<TableName, SnapshotSentinel> sentinels,
976       final SnapshotDescription snapshot) {
977     if (!snapshot.hasTable()) {
978       return null;
979     }
980 
981     TableName snapshotTable = TableName.valueOf(snapshot.getTable());
982     SnapshotSentinel h = sentinels.get(snapshotTable);
983     if (h == null) {
984       return null;
985     }
986 
987     if (!h.getSnapshot().getName().equals(snapshot.getName())) {
988       // specified snapshot is to the one currently running
989       return null;
990     }
991 
992     // Remove from the "in-progress" list once completed
993     if (h.isFinished()) {
994       sentinels.remove(snapshotTable);
995     }
996 
997     return h;
998   }
999 
1000   /**
1001    * Removes "abandoned" snapshot/restore requests.
1002    * As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
1003    * and the in-progress maps are cleaned up when the status of a completed task is requested.
1004    * To avoid having sentinels staying around for long time if something client side is failed,
1005    * each operation tries to clean up the in-progress maps sentinels finished from a long time.
1006    */
1007   private void cleanupSentinels() {
1008     cleanupSentinels(this.snapshotHandlers);
1009     cleanupSentinels(this.restoreHandlers);
1010   }
1011 
1012   /**
1013    * Remove the sentinels that are marked as finished and the completion time
1014    * has exceeded the removal timeout.
1015    * @param sentinels map of sentinels to clean
1016    */
1017   private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
1018     long currentTime = EnvironmentEdgeManager.currentTime();
1019     long sentinelsCleanupTimeoutMillis =
1020         master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
1021           SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
1022     Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
1023     while (it.hasNext()) {
1024       Map.Entry<TableName, SnapshotSentinel> entry = it.next();
1025       SnapshotSentinel sentinel = entry.getValue();
1026       if (sentinel.isFinished()
1027           && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
1028         it.remove();
1029       }
1030     }
1031   }
1032 
1033   //
1034   // Implementing Stoppable interface
1035   //
1036 
1037   @Override
1038   public void stop(String why) {
1039     // short circuit
1040     if (this.stopped) return;
1041     // make sure we get stop
1042     this.stopped = true;
1043     // pass the stop onto take snapshot handlers
1044     for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
1045       snapshotHandler.cancel(why);
1046     }
1047     if (snapshotHandlerChoreCleanerTask != null) {
1048       snapshotHandlerChoreCleanerTask.cancel(true);
1049     }
1050     // pass the stop onto all the restore handlers
1051     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
1052       restoreHandler.cancel(why);
1053     }
1054     try {
1055       if (coordinator != null) {
1056         coordinator.close();
1057       }
1058     } catch (IOException e) {
1059       LOG.error("stop ProcedureCoordinator error", e);
1060     }
1061   }
1062 
1063   @Override
1064   public boolean isStopped() {
1065     return this.stopped;
1066   }
1067 
1068   /**
1069    * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
1070    * Called at the beginning of snapshot() and restoreSnapshot() methods.
1071    * @throws UnsupportedOperationException if snapshot are not supported
1072    */
1073   public void checkSnapshotSupport() throws UnsupportedOperationException {
1074     if (!this.isSnapshotSupported) {
1075       throw new UnsupportedOperationException(
1076         "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
1077           HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
1078     }
1079   }
1080 
1081   /**
1082    * Called at startup, to verify if snapshot operation is supported, and to avoid
1083    * starting the master if there're snapshots present but the cleaners needed are missing.
1084    * Otherwise we can end up with snapshot data loss.
1085    * @param conf The {@link Configuration} object to use
1086    * @param mfs The MasterFileSystem to use
1087    * @throws IOException in case of file-system operation failure
1088    * @throws UnsupportedOperationException in case cleaners are missing and
1089    *         there're snapshot in the system
1090    */
1091   private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
1092       throws IOException, UnsupportedOperationException {
1093     // Verify if snapshot is disabled by the user
1094     String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
1095     boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
1096     boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
1097 
1098     // Extract cleaners from conf
1099     Set<String> hfileCleaners = new HashSet<String>();
1100     String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
1101     if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
1102 
1103     Set<String> logCleaners = new HashSet<String>();
1104     cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
1105     if (cleaners != null) Collections.addAll(logCleaners, cleaners);
1106 
1107     // check if an older version of snapshot directory was present
1108     Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
1109     FileSystem fs = mfs.getFileSystem();
1110     List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir));
1111     if (ss != null && !ss.isEmpty()) {
1112       LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
1113       LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
1114     }
1115 
1116     // If the user has enabled the snapshot, we force the cleaners to be present
1117     // otherwise we still need to check if cleaners are enabled or not and verify
1118     // that there're no snapshot in the .snapshot folder.
1119     if (snapshotEnabled) {
1120       // Inject snapshot cleaners, if snapshot.enable is true
1121       hfileCleaners.add(SnapshotHFileCleaner.class.getName());
1122       hfileCleaners.add(HFileLinkCleaner.class.getName());
1123       logCleaners.add(SnapshotLogCleaner.class.getName());
1124 
1125       // Set cleaners conf
1126       conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
1127         hfileCleaners.toArray(new String[hfileCleaners.size()]));
1128       conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
1129         logCleaners.toArray(new String[logCleaners.size()]));
1130     } else {
1131       // Verify if cleaners are present
1132       snapshotEnabled = logCleaners.contains(SnapshotLogCleaner.class.getName()) &&
1133         hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
1134         hfileCleaners.contains(HFileLinkCleaner.class.getName());
1135 
1136       // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
1137       if (snapshotEnabled) {
1138         LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
1139           "but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
1140           (userDisabled ? "is set to 'false'." : "is not set."));
1141       }
1142     }
1143 
1144     // Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
1145     this.isSnapshotSupported = snapshotEnabled && !userDisabled;
1146 
1147     // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
1148     // otherwise we end up with snapshot data loss.
1149     if (!snapshotEnabled) {
1150       LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
1151       Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
1152       if (fs.exists(snapshotDir)) {
1153         FileStatus[] snapshots = FSUtils.listStatus(fs, snapshotDir,
1154           new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
1155         if (snapshots != null) {
1156           LOG.error("Snapshots are present, but cleaners are not enabled.");
1157           checkSnapshotSupport();
1158         }
1159       }
1160     }
1161   }
1162 
1163   @Override
1164   public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
1165       IOException, UnsupportedOperationException {
1166     this.master = master;
1167 
1168     this.rootDir = master.getMasterFileSystem().getRootDir();
1169     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
1170 
1171     // get the configuration for the coordinator
1172     Configuration conf = master.getConfiguration();
1173     long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
1174     long timeoutMillis = Math.max(conf.getLong(SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_KEY,
1175                     SnapshotDescriptionUtils.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT),
1176             conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
1177                     SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
1178     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
1179 
1180     // setup the default procedure coordinator
1181     String name = master.getServerName().toString();
1182     ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
1183     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
1184         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
1185 
1186     this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
1187     this.executorService = master.getExecutorService();
1188     resetTempDir();
1189     initSnapshotHandlerChoreCleanerTask(10);
1190   }
1191 
1192   @Override
1193   public String getProcedureSignature() {
1194     return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
1195   }
1196 
1197   @Override
1198   public void execProcedure(ProcedureDescription desc) throws IOException {
1199     takeSnapshot(toSnapshotDescription(desc));
1200   }
1201 
1202   @Override
1203   public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
1204     return isSnapshotDone(toSnapshotDescription(desc));
1205   }
1206 
1207   private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
1208       throws IOException {
1209     SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
1210     if (!desc.hasInstance()) {
1211       throw new IOException("Snapshot name is not defined: " + desc.toString());
1212     }
1213     String snapshotName = desc.getInstance();
1214     List<NameStringPair> props = desc.getConfigurationList();
1215     String table = null;
1216     for (NameStringPair prop : props) {
1217       if ("table".equalsIgnoreCase(prop.getName())) {
1218         table = prop.getValue();
1219       }
1220     }
1221     if (table == null) {
1222       throw new IOException("Snapshot table is not defined: " + desc.toString());
1223     }
1224     TableName tableName = TableName.valueOf(table);
1225     builder.setTable(tableName.getNameAsString());
1226     builder.setName(snapshotName);
1227     builder.setType(SnapshotDescription.Type.FLUSH);
1228     return builder.build();
1229   }
1230 }