001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.snapshot; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import java.util.concurrent.ConcurrentHashMap; 032import java.util.concurrent.Executors; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.ScheduledFuture; 035import java.util.concurrent.ThreadPoolExecutor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.locks.ReadWriteLock; 038import java.util.concurrent.locks.ReentrantReadWriteLock; 039import java.util.stream.Collectors; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.CommonPathCapabilities; 042import org.apache.hadoop.fs.FSDataInputStream; 043import org.apache.hadoop.fs.FileStatus; 044import org.apache.hadoop.fs.FileSystem; 045import org.apache.hadoop.fs.Path; 046import org.apache.hadoop.fs.permission.AclEntry; 047import org.apache.hadoop.fs.permission.AclStatus; 048import org.apache.hadoop.hbase.HBaseInterfaceAudience; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.ServerName; 051import org.apache.hadoop.hbase.Stoppable; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.TableDescriptor; 054import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 055import org.apache.hadoop.hbase.client.TableState; 056import org.apache.hadoop.hbase.errorhandling.ForeignException; 057import org.apache.hadoop.hbase.executor.ExecutorService; 058import org.apache.hadoop.hbase.ipc.RpcServer; 059import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 060import org.apache.hadoop.hbase.master.MasterFileSystem; 061import org.apache.hadoop.hbase.master.MasterServices; 062import org.apache.hadoop.hbase.master.MetricsMaster; 063import org.apache.hadoop.hbase.master.SnapshotSentinel; 064import org.apache.hadoop.hbase.master.WorkerAssigner; 065import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 066import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; 067import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; 068import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 069import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; 070import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 071import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; 072import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; 073import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure; 074import org.apache.hadoop.hbase.procedure.MasterProcedureManager; 075import org.apache.hadoop.hbase.procedure.Procedure; 076import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; 077import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; 078import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; 079import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 080import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 081import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 082import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; 083import org.apache.hadoop.hbase.security.AccessDeniedException; 084import org.apache.hadoop.hbase.security.User; 085import org.apache.hadoop.hbase.security.access.AccessChecker; 086import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner; 087import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper; 088import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 089import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; 090import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 091import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 092import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 093import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException; 094import org.apache.hadoop.hbase.snapshot.SnapshotExistsException; 095import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 096import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; 097import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException; 098import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; 099import org.apache.hadoop.hbase.util.CommonFSUtils; 100import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 101import org.apache.hadoop.hbase.util.NonceKey; 102import org.apache.hadoop.hbase.util.TableDescriptorChecker; 103import org.apache.yetus.audience.InterfaceAudience; 104import org.apache.yetus.audience.InterfaceStability; 105import org.apache.zookeeper.KeeperException; 106import org.slf4j.Logger; 107import org.slf4j.LoggerFactory; 108 109import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 110 111import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 114import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 115import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type; 116 117/** 118 * This class manages the procedure of taking and restoring snapshots. There is only one 119 * SnapshotManager for the master. 120 * <p> 121 * The class provides methods for monitoring in-progress snapshot actions. 122 * <p> 123 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a 124 * simplification in the current implementation. 125 */ 126@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 127@InterfaceStability.Unstable 128public class SnapshotManager extends MasterProcedureManager implements Stoppable { 129 private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); 130 131 /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */ 132 private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500; 133 134 /** 135 * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a 136 * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks 137 * if a snapshot or restore is completed. This operation is part of the HBaseAdmin 138 * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore 139 * state is not reclaimed after a default timeout, the entry is removed from the in-progress map. 140 * At this point, if the user asks for the snapshot/restore status, the result will be snapshot 141 * done if exists or failed if it doesn't exists. 142 */ 143 public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS = 144 "hbase.snapshot.sentinels.cleanup.timeoutMillis"; 145 public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L; 146 147 /** Enable or disable snapshot support */ 148 public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled"; 149 150 /** 151 * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion. 152 */ 153 private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis"; 154 155 /** Name of the operation to use in the controller */ 156 public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot"; 157 158 /** Conf key for # of threads used by the SnapshotManager thread pool */ 159 public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads"; 160 161 /** number of current operations running on the master */ 162 public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1; 163 164 /** Conf key for preserving original max file size configs */ 165 public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE = 166 "hbase.snapshot.max.filesize.preserve"; 167 168 /** Enable or disable snapshot procedure */ 169 public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled"; 170 171 public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true; 172 173 private boolean stopped; 174 private MasterServices master; // Needed by TableEventHandlers 175 private ProcedureCoordinator coordinator; 176 177 // Is snapshot feature enabled? 178 private boolean isSnapshotSupported = false; 179 180 // Snapshot handlers map, with table name as key. 181 // The map is always accessed and modified under the object lock using synchronized. 182 // snapshotTable() will insert an Handler in the table. 183 // isSnapshotDone() will remove the handler requested if the operation is finished. 184 private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>(); 185 private final ScheduledExecutorService scheduleThreadPool = 186 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 187 .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build()); 188 private ScheduledFuture<?> snapshotHandlerChoreCleanerTask; 189 190 // Restore map, with table name as key, procedure ID as value. 191 // The map is always accessed and modified under the object lock using synchronized. 192 // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map. 193 // 194 // TODO: just as the Apache HBase 1.x implementation, this map would not survive master 195 // restart/failover. This is just a stopgap implementation until implementation of taking 196 // snapshot using Procedure-V2. 197 private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>(); 198 199 // SnapshotDescription -> SnapshotProcId 200 private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap = 201 new ConcurrentHashMap<>(); 202 203 private WorkerAssigner verifyWorkerAssigner; 204 205 private Path rootDir; 206 private ExecutorService executorService; 207 208 /** 209 * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to 210 * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would 211 * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner 212 * start to work. (See HBASE-21387) 213 */ 214 private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true); 215 216 public SnapshotManager() { 217 } 218 219 /** 220 * Fully specify all necessary components of a snapshot manager. Exposed for testing. 221 * @param master services for the master where the manager is running 222 * @param coordinator procedure coordinator instance. exposed for testing. 223 * @param pool HBase ExecutorServcie instance, exposed for testing. 224 */ 225 @InterfaceAudience.Private 226 SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, 227 ExecutorService pool, int sentinelCleanInterval) 228 throws IOException, UnsupportedOperationException { 229 this.master = master; 230 231 this.rootDir = master.getMasterFileSystem().getRootDir(); 232 Configuration conf = master.getConfiguration(); 233 checkSnapshotSupport(conf, master.getMasterFileSystem()); 234 235 this.coordinator = coordinator; 236 this.executorService = pool; 237 resetTempDir(); 238 snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate( 239 this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS); 240 } 241 242 /** 243 * Gets the list of all completed snapshots. 244 * @return list of SnapshotDescriptions 245 * @throws IOException File system exception 246 */ 247 public List<SnapshotDescription> getCompletedSnapshots() throws IOException { 248 return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true); 249 } 250 251 /** 252 * Gets the list of all completed snapshots. 253 * @param snapshotDir snapshot directory 254 * @param withCpCall Whether to call CP hooks 255 * @return list of SnapshotDescriptions 256 * @throws IOException File system exception 257 */ 258 private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall) 259 throws IOException { 260 List<SnapshotDescription> snapshotDescs = new ArrayList<>(); 261 // first create the snapshot root path and check to see if it exists 262 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 263 if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); 264 265 // if there are no snapshots, return an empty list 266 if (!fs.exists(snapshotDir)) { 267 return snapshotDescs; 268 } 269 270 // ignore all the snapshots in progress 271 FileStatus[] snapshots = fs.listStatus(snapshotDir, 272 new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); 273 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 274 withCpCall = withCpCall && cpHost != null; 275 // loop through all the completed snapshots 276 for (FileStatus snapshot : snapshots) { 277 Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE); 278 // if the snapshot is bad 279 if (!fs.exists(info)) { 280 LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist"); 281 continue; 282 } 283 FSDataInputStream in = null; 284 try { 285 in = fs.open(info); 286 SnapshotDescription desc = SnapshotDescription.parseFrom(in); 287 org.apache.hadoop.hbase.client.SnapshotDescription descPOJO = 288 (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null; 289 if (withCpCall) { 290 try { 291 cpHost.preListSnapshot(descPOJO); 292 } catch (AccessDeniedException e) { 293 LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. " 294 + "Either you should be owner of this snapshot or admin user."); 295 // Skip this and try for next snapshot 296 continue; 297 } 298 } 299 snapshotDescs.add(desc); 300 301 // call coproc post hook 302 if (withCpCall) { 303 cpHost.postListSnapshot(descPOJO); 304 } 305 } catch (IOException e) { 306 LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e); 307 } finally { 308 if (in != null) { 309 in.close(); 310 } 311 } 312 } 313 return snapshotDescs; 314 } 315 316 /** 317 * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from 318 * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working 319 * directory. 320 * @throws IOException if we can't reach the filesystem 321 */ 322 private void resetTempDir() throws IOException { 323 Set<String> workingProcedureCoordinatedSnapshotNames = 324 snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet()); 325 326 Path tmpdir = 327 SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration()); 328 FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration()); 329 FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir); 330 if (workingSnapshotDirs == null) { 331 return; 332 } 333 for (FileStatus workingSnapshotDir : workingSnapshotDirs) { 334 String workingSnapshotName = workingSnapshotDir.getPath().getName(); 335 if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) { 336 try { 337 if (tmpFs.delete(workingSnapshotDir.getPath(), true)) { 338 LOG.info("delete unfinished zk-coordinated snapshot working directory {}", 339 workingSnapshotDir.getPath()); 340 } else { 341 LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", 342 workingSnapshotDir.getPath()); 343 } 344 } catch (IOException e) { 345 LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", 346 workingSnapshotDir.getPath(), e); 347 } 348 } else { 349 LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName); 350 } 351 } 352 } 353 354 /** 355 * Delete the specified snapshot 356 * @throws SnapshotDoesNotExistException If the specified snapshot does not exist. 357 * @throws IOException For filesystem IOExceptions 358 */ 359 public void deleteSnapshot(SnapshotDescription snapshot) throws IOException { 360 // check to see if it is completed 361 if (!isSnapshotCompleted(snapshot)) { 362 throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot)); 363 } 364 365 String snapshotName = snapshot.getName(); 366 // first create the snapshot description and check to see if it exists 367 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 368 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 369 // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with 370 // just the "name" and it does not contains the "real" snapshot information 371 snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 372 373 // call coproc pre hook 374 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 375 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 376 if (cpHost != null) { 377 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 378 cpHost.preDeleteSnapshot(snapshotPOJO); 379 } 380 381 LOG.debug("Deleting snapshot: " + snapshotName); 382 // delete the existing snapshot 383 if (!fs.delete(snapshotDir, true)) { 384 throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir); 385 } 386 387 // call coproc post hook 388 if (cpHost != null) { 389 cpHost.postDeleteSnapshot(snapshotPOJO); 390 } 391 392 } 393 394 /** 395 * Check if the specified snapshot is done 396 * @return true if snapshot is ready to be restored, false if it is still being taken. 397 * @throws IOException IOException if error from HDFS or RPC 398 * @throws UnknownSnapshotException if snapshot is invalid or does not exist. 399 */ 400 public boolean isSnapshotDone(SnapshotDescription expected) throws IOException { 401 // check the request to make sure it has a snapshot 402 if (expected == null) { 403 throw new UnknownSnapshotException( 404 "No snapshot name passed in request, can't figure out which snapshot you want to check."); 405 } 406 407 Long procId = snapshotToProcIdMap.get(expected); 408 if (procId != null) { 409 if (master.getMasterProcedureExecutor().isRunning()) { 410 return master.getMasterProcedureExecutor().isFinished(procId); 411 } else { 412 return false; 413 } 414 } 415 416 String ssString = ClientSnapshotDescriptionUtils.toString(expected); 417 418 // check to see if the sentinel exists, 419 // and if the task is complete removes it from the in-progress snapshots map. 420 SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected); 421 422 // stop tracking "abandoned" handlers 423 cleanupSentinels(); 424 425 if (handler == null) { 426 // If there's no handler in the in-progress map, it means one of the following: 427 // - someone has already requested the snapshot state 428 // - the requested snapshot was completed long time ago (cleanupSentinels() timeout) 429 // - the snapshot was never requested 430 // In those cases returns to the user the "done state" if the snapshots exists on disk, 431 // otherwise raise an exception saying that the snapshot is not running and doesn't exist. 432 if (!isSnapshotCompleted(expected)) { 433 throw new UnknownSnapshotException("Snapshot " + ssString 434 + " is not currently running or one of the known completed snapshots."); 435 } 436 // was done, return true; 437 return true; 438 } 439 440 // pass on any failure we find in the sentinel 441 try { 442 handler.rethrowExceptionIfFailed(); 443 } catch (ForeignException e) { 444 // Give some procedure info on an exception. 445 String status; 446 Procedure p = coordinator.getProcedure(expected.getName()); 447 if (p != null) { 448 status = p.getStatus(); 449 } else { 450 status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames(); 451 } 452 throw new HBaseSnapshotException("Snapshot " + ssString + " had an error. " + status, e, 453 ProtobufUtil.createSnapshotDesc(expected)); 454 } 455 456 // check to see if we are done 457 if (handler.isFinished()) { 458 LOG.debug("Snapshot '" + ssString + "' has completed, notifying client."); 459 return true; 460 } else if (LOG.isDebugEnabled()) { 461 LOG.debug("Snapshoting '" + ssString + "' is still in progress!"); 462 } 463 return false; 464 } 465 466 /** 467 * Check to see if there is a snapshot in progress with the same name or on the same table. 468 * Currently we have a limitation only allowing a single snapshot per table at a time. Also we 469 * don't allow snapshot with the same name. 470 * @param snapshot description of the snapshot being checked. 471 * @param checkTable check if the table is already taking a snapshot. 472 * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same 473 * table. 474 */ 475 synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) { 476 if (checkTable) { 477 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 478 if (isTakingSnapshot(snapshotTable)) { 479 return true; 480 } 481 } 482 Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator(); 483 while (it.hasNext()) { 484 Map.Entry<TableName, SnapshotSentinel> entry = it.next(); 485 SnapshotSentinel sentinel = entry.getValue(); 486 if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) { 487 return true; 488 } 489 } 490 Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator(); 491 while (spIt.hasNext()) { 492 Map.Entry<SnapshotDescription, Long> entry = spIt.next(); 493 if ( 494 snapshot.getName().equals(entry.getKey().getName()) 495 && !master.getMasterProcedureExecutor().isFinished(entry.getValue()) 496 ) { 497 return true; 498 } 499 } 500 return false; 501 } 502 503 /** 504 * Check to see if the specified table has a snapshot in progress. Currently we have a limitation 505 * only allowing a single snapshot per table at a time. 506 * @param tableName name of the table being snapshotted. 507 * @return <tt>true</tt> if there is a snapshot in progress on the specified table. 508 */ 509 public boolean isTakingSnapshot(final TableName tableName) { 510 return isTakingSnapshot(tableName, false); 511 } 512 513 public boolean isTableTakingAnySnapshot(final TableName tableName) { 514 return isTakingSnapshot(tableName, true); 515 } 516 517 /** 518 * Check to see if the specified table has a snapshot in progress. Since we introduce the 519 * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we 520 * can just consider tables in snapshotHandlers only, but for 521 * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and 522 * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to 523 * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check 524 * if table in snapshot. 525 * @param tableName name of the table being snapshotted. 526 * @param checkProcedure true if we should check tables in snapshotToProcIdMap 527 * @return <tt>true</tt> if there is a snapshot in progress on the specified table. 528 */ 529 private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) { 530 SnapshotSentinel handler = this.snapshotHandlers.get(tableName); 531 if (handler != null && !handler.isFinished()) { 532 return true; 533 } 534 if (checkProcedure) { 535 for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) { 536 if ( 537 TableName.valueOf(entry.getKey().getTable()).equals(tableName) 538 && !master.getMasterProcedureExecutor().isFinished(entry.getValue()) 539 ) { 540 return true; 541 } 542 } 543 } 544 return false; 545 } 546 547 /** 548 * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we 549 * aren't already running a snapshot or restore on the requested table. 550 * @param snapshot description of the snapshot we want to start 551 * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot 552 */ 553 public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot) 554 throws HBaseSnapshotException { 555 Path workingDir = 556 SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration()); 557 558 try { 559 FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration()); 560 // delete the working directory, since we aren't running the snapshot. Likely leftovers 561 // from a failed attempt. 562 workingDirFS.delete(workingDir, true); 563 564 // recreate the working directory for the snapshot 565 if (!workingDirFS.mkdirs(workingDir)) { 566 throw new SnapshotCreationException( 567 "Couldn't create working directory (" + workingDir + ") for snapshot", 568 ProtobufUtil.createSnapshotDesc(snapshot)); 569 } 570 updateWorkingDirAclsIfRequired(workingDir, workingDirFS); 571 } catch (HBaseSnapshotException e) { 572 throw e; 573 } catch (IOException e) { 574 throw new SnapshotCreationException( 575 "Exception while checking to see if snapshot could be started.", e, 576 ProtobufUtil.createSnapshotDesc(snapshot)); 577 } 578 } 579 580 /** 581 * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs, 582 * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that 583 * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of 584 * the working dir to new snapshot dir, the ACLs are retained. 585 * @param workingDir working dir to build the snapshot. 586 * @param workingDirFS working dir file system. 587 * @throws IOException If ACL read/modify operation fails. 588 */ 589 private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS) 590 throws IOException { 591 if ( 592 !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS) 593 || workingDir.getParent() == null || workingDir.getParent().getParent() == null 594 ) { 595 return; 596 } 597 AclStatus snapshotWorkingParentDirStatus; 598 try { 599 snapshotWorkingParentDirStatus = 600 workingDirFS.getAclStatus(workingDir.getParent().getParent()); 601 } catch (IOException e) { 602 LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}", 603 workingDir.getParent().getParent(), workingDir, e); 604 return; 605 } 606 List<AclEntry> snapshotWorkingParentDirAclStatusEntries = 607 snapshotWorkingParentDirStatus.getEntries(); 608 if ( 609 snapshotWorkingParentDirAclStatusEntries != null 610 && snapshotWorkingParentDirAclStatusEntries.size() > 0 611 ) { 612 workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries); 613 } 614 } 615 616 /** 617 * Take a snapshot of a disabled table. 618 * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}. 619 * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary 620 * directory could not be determined 621 */ 622 private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException { 623 // setup the snapshot 624 prepareWorkingDirectory(snapshot); 625 626 // set the snapshot to be a disabled snapshot, since the client doesn't know about that 627 snapshot = snapshot.toBuilder().setType(Type.DISABLED).build(); 628 629 // Take the snapshot of the disabled table 630 DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this); 631 snapshotTable(snapshot, handler); 632 } 633 634 /** 635 * Take a snapshot of an enabled table. 636 * @param snapshot description of the snapshot to take. 637 * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary 638 * directory could not be determined 639 */ 640 private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException { 641 // setup the snapshot 642 prepareWorkingDirectory(snapshot); 643 644 // Take the snapshot of the enabled table 645 EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this); 646 snapshotTable(snapshot, handler); 647 } 648 649 /** 650 * Take a snapshot using the specified handler. On failure the snapshot temporary working 651 * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the 652 * rejecting the snapshot request if the table is busy with another snapshot/restore operation. 653 * @param snapshot the snapshot description 654 * @param handler the snapshot handler 655 */ 656 private synchronized void snapshotTable(SnapshotDescription snapshot, 657 final TakeSnapshotHandler handler) throws IOException { 658 try { 659 handler.prepare(); 660 this.executorService.submit(handler); 661 this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler); 662 } catch (Exception e) { 663 // cleanup the working directory by trying to delete it from the fs. 664 Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, 665 master.getConfiguration()); 666 FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration()); 667 try { 668 if (!workingDirFs.delete(workingDir, true)) { 669 LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" 670 + ClientSnapshotDescriptionUtils.toString(snapshot)); 671 } 672 } catch (IOException e1) { 673 LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" 674 + ClientSnapshotDescriptionUtils.toString(snapshot)); 675 } 676 // fail the snapshot 677 throw new SnapshotCreationException("Could not build snapshot handler", e, 678 ProtobufUtil.createSnapshotDesc(snapshot)); 679 } 680 } 681 682 public ReadWriteLock getTakingSnapshotLock() { 683 return this.takingSnapshotLock; 684 } 685 686 /** 687 * The snapshot operation processing as following: <br> 688 * 1. Create a Snapshot Handler, and do some initialization; <br> 689 * 2. Put the handler into snapshotHandlers <br> 690 * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock 691 * and snapshotHandlers; 692 * @return true to indicate that there're some running snapshots. 693 */ 694 public synchronized boolean isTakingAnySnapshot() { 695 return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0 696 || this.snapshotToProcIdMap.size() > 0; 697 } 698 699 /** 700 * Take a snapshot based on the enabled/disabled state of the table. 701 * @throws HBaseSnapshotException when a snapshot specific exception occurs. 702 * @throws IOException when some sort of generic IO exception occurs. 703 */ 704 public void takeSnapshot(SnapshotDescription snapshot) throws IOException { 705 this.takingSnapshotLock.readLock().lock(); 706 try { 707 takeSnapshotInternal(snapshot); 708 } finally { 709 this.takingSnapshotLock.readLock().unlock(); 710 } 711 } 712 713 public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce) 714 throws IOException { 715 this.takingSnapshotLock.readLock().lock(); 716 try { 717 return submitSnapshotProcedure(snapshot, nonceGroup, nonce); 718 } finally { 719 this.takingSnapshotLock.readLock().unlock(); 720 } 721 } 722 723 private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup, 724 long nonce) throws IOException { 725 return MasterProcedureUtil 726 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) { 727 @Override 728 protected void run() throws IOException { 729 sanityCheckBeforeSnapshot(snapshot, false); 730 731 long procId = submitProcedure(new SnapshotProcedure( 732 getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot)); 733 734 getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId); 735 } 736 737 @Override 738 protected String getDescription() { 739 return "SnapshotProcedure"; 740 } 741 }); 742 } 743 744 private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { 745 TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true); 746 747 // call pre coproc hook 748 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 749 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 750 if (cpHost != null) { 751 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 752 cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); 753 } 754 755 // if the table is enabled, then have the RS run actually the snapshot work 756 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 757 if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) { 758 if (LOG.isDebugEnabled()) { 759 LOG.debug("Table enabled, starting distributed snapshots for {}", 760 ClientSnapshotDescriptionUtils.toString(snapshot)); 761 } 762 snapshotEnabledTable(snapshot); 763 if (LOG.isDebugEnabled()) { 764 LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot)); 765 } 766 } 767 // For disabled table, snapshot is created by the master 768 else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) { 769 if (LOG.isDebugEnabled()) { 770 LOG.debug("Table is disabled, running snapshot entirely on master for {}", 771 ClientSnapshotDescriptionUtils.toString(snapshot)); 772 } 773 snapshotDisabledTable(snapshot); 774 if (LOG.isDebugEnabled()) { 775 LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot)); 776 } 777 } else { 778 LOG.error("Can't snapshot table '" + snapshot.getTable() 779 + "', isn't open or closed, we don't know what to do!"); 780 TablePartiallyOpenException tpoe = 781 new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open."); 782 throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, 783 ProtobufUtil.createSnapshotDesc(snapshot)); 784 } 785 786 // call post coproc hook 787 if (cpHost != null) { 788 cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); 789 } 790 } 791 792 /** 793 * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated 794 * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the 795 * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name. 796 * @param snapshot description of the snapshot being checked. 797 * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot, 798 * we need to check if another zk-coordinated snapshot is in progress, for the 799 * snapshot procedure, this is unnecessary. 800 * @return the table descriptor of the table 801 */ 802 private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot, 803 boolean checkTable) throws IOException { 804 // check to see if we already completed the snapshot 805 if (isSnapshotCompleted(snapshot)) { 806 throw new SnapshotExistsException( 807 "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.", 808 ProtobufUtil.createSnapshotDesc(snapshot)); 809 } 810 LOG.debug("No existing snapshot, attempting snapshot..."); 811 812 // stop tracking "abandoned" handlers 813 cleanupSentinels(); 814 815 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 816 // make sure we aren't already running a snapshot 817 if (isTakingSnapshot(snapshot, checkTable)) { 818 throw new SnapshotCreationException( 819 "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot) 820 + " because we are already running another snapshot" 821 + " on the same table or with the same name"); 822 } 823 824 // make sure we aren't running a restore on the same table 825 if (isRestoringTable(snapshotTable)) { 826 throw new SnapshotCreationException( 827 "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot) 828 + " because we are already have a restore in progress on the same snapshot."); 829 } 830 831 // check to see if the table exists 832 TableDescriptor desc = null; 833 try { 834 desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable())); 835 } catch (FileNotFoundException e) { 836 String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; 837 LOG.error(msg); 838 throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); 839 } catch (IOException e) { 840 throw new SnapshotCreationException( 841 "Error while geting table description for table " + snapshot.getTable(), e, 842 ProtobufUtil.createSnapshotDesc(snapshot)); 843 } 844 if (desc == null) { 845 throw new SnapshotCreationException( 846 "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", 847 ProtobufUtil.createSnapshotDesc(snapshot)); 848 } 849 return desc; 850 } 851 852 /** 853 * Set the handler for the current snapshot 854 * <p> 855 * Exposed for TESTING 856 * @param handler handler the master should use TODO get rid of this if possible, repackaging, 857 * modify tests. 858 */ 859 public synchronized void setSnapshotHandlerForTesting(final TableName tableName, 860 final SnapshotSentinel handler) { 861 if (handler != null) { 862 this.snapshotHandlers.put(tableName, handler); 863 } else { 864 this.snapshotHandlers.remove(tableName); 865 } 866 } 867 868 /** Returns distributed commit coordinator for all running snapshots */ 869 ProcedureCoordinator getCoordinator() { 870 return coordinator; 871 } 872 873 /** 874 * Check to see if the snapshot is one of the currently completed snapshots Returns true if the 875 * snapshot exists in the "completed snapshots folder". 876 * @param snapshot expected snapshot to check 877 * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is 878 * not stored 879 * @throws IOException if the filesystem throws an unexpected exception, 880 * @throws IllegalArgumentException if snapshot name is invalid. 881 */ 882 private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException { 883 try { 884 final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 885 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 886 // check to see if the snapshot already exists 887 return fs.exists(snapshotDir); 888 } catch (IllegalArgumentException iae) { 889 throw new UnknownSnapshotException("Unexpected exception thrown", iae); 890 } 891 } 892 893 /** 894 * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or 895 * restore in progress. 896 * @param reqSnapshot Snapshot Descriptor from request 897 * @param tableName table to clone 898 * @param snapshot Snapshot Descriptor 899 * @param snapshotTableDesc Table Descriptor 900 * @param nonceKey unique identifier to prevent duplicated RPC 901 * @return procId the ID of the clone snapshot procedure 902 */ 903 private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, 904 final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc, 905 final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException { 906 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 907 TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc); 908 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 909 if (cpHost != null) { 910 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 911 cpHost.preCloneSnapshot(snapshotPOJO, htd); 912 } 913 long procId; 914 try { 915 procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT); 916 } catch (IOException e) { 917 LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table " 918 + tableName.getNameAsString(), e); 919 throw e; 920 } 921 LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName); 922 923 if (cpHost != null) { 924 cpHost.postCloneSnapshot(snapshotPOJO, htd); 925 } 926 return procId; 927 } 928 929 /** 930 * Clone the specified snapshot into a new table. The operation will fail if the destination table 931 * has a snapshot or restore in progress. 932 * @param snapshot Snapshot Descriptor 933 * @param tableDescriptor Table Descriptor of the table to create 934 * @param nonceKey unique identifier to prevent duplicated RPC 935 * @return procId the ID of the clone snapshot procedure 936 */ 937 synchronized long cloneSnapshot(final SnapshotDescription snapshot, 938 final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl, 939 final String customSFT) throws HBaseSnapshotException { 940 TableName tableName = tableDescriptor.getTableName(); 941 942 // make sure we aren't running a snapshot on the same table 943 if (isTableTakingAnySnapshot(tableName)) { 944 throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); 945 } 946 947 // make sure we aren't running a restore on the same table 948 if (isRestoringTable(tableName)) { 949 throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName); 950 } 951 952 try { 953 long procId = master.getMasterProcedureExecutor().submitProcedure( 954 new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(), 955 tableDescriptor, snapshot, restoreAcl, customSFT), 956 nonceKey); 957 this.restoreTableToProcIdMap.put(tableName, procId); 958 return procId; 959 } catch (Exception e) { 960 String msg = "Couldn't clone the snapshot=" 961 + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName; 962 LOG.error(msg, e); 963 throw new RestoreSnapshotException(msg, e); 964 } 965 } 966 967 /** 968 * Restore or Clone the specified snapshot 969 * @param nonceKey unique identifier to prevent duplicated RPC 970 */ 971 public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey, 972 final boolean restoreAcl, String customSFT) throws IOException { 973 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 974 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir); 975 976 // check if the snapshot exists 977 if (!fs.exists(snapshotDir)) { 978 LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist."); 979 throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot)); 980 } 981 982 // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with 983 // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot 984 // information. 985 SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 986 SnapshotManifest manifest = 987 SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot); 988 TableDescriptor snapshotTableDesc = manifest.getTableDescriptor(); 989 TableName tableName = TableName.valueOf(reqSnapshot.getTable()); 990 991 // sanity check the new table descriptor 992 TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc); 993 994 // stop tracking "abandoned" handlers 995 cleanupSentinels(); 996 997 // Verify snapshot validity 998 SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest); 999 1000 // Execute the restore/clone operation 1001 long procId; 1002 if (master.getTableDescriptors().exists(tableName)) { 1003 procId = 1004 restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl); 1005 } else { 1006 procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, 1007 restoreAcl, customSFT); 1008 } 1009 return procId; 1010 } 1011 1012 /** 1013 * Restore the specified snapshot. The restore will fail if the destination table has a snapshot 1014 * or restore in progress. 1015 * @param reqSnapshot Snapshot Descriptor from request 1016 * @param tableName table to restore 1017 * @param snapshot Snapshot Descriptor 1018 * @param snapshotTableDesc Table Descriptor 1019 * @param nonceKey unique identifier to prevent duplicated RPC 1020 * @param restoreAcl true to restore acl of snapshot 1021 * @return procId the ID of the restore snapshot procedure 1022 */ 1023 private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, 1024 final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc, 1025 final NonceKey nonceKey, final boolean restoreAcl) throws IOException { 1026 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 1027 1028 // have to check first if restoring the snapshot would break current SFT setup 1029 StoreFileTrackerValidationUtils.validatePreRestoreSnapshot( 1030 master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration()); 1031 1032 if ( 1033 master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()), 1034 TableState.State.ENABLED) 1035 ) { 1036 throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable()) 1037 + "' must be disabled in order to " + "perform a restore operation."); 1038 } 1039 1040 // call Coprocessor pre hook 1041 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 1042 if (cpHost != null) { 1043 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 1044 cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc); 1045 } 1046 1047 long procId; 1048 try { 1049 procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl); 1050 } catch (IOException e) { 1051 LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName() 1052 + " as table " + tableName.getNameAsString(), e); 1053 throw e; 1054 } 1055 LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName); 1056 1057 if (cpHost != null) { 1058 cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc); 1059 } 1060 1061 return procId; 1062 } 1063 1064 /** 1065 * Restore the specified snapshot. The restore will fail if the destination table has a snapshot 1066 * or restore in progress. 1067 * @param snapshot Snapshot Descriptor 1068 * @param tableDescriptor Table Descriptor 1069 * @param nonceKey unique identifier to prevent duplicated RPC 1070 * @param restoreAcl true to restore acl of snapshot 1071 * @return procId the ID of the restore snapshot procedure 1072 */ 1073 private synchronized long restoreSnapshot(final SnapshotDescription snapshot, 1074 final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl) 1075 throws HBaseSnapshotException { 1076 final TableName tableName = tableDescriptor.getTableName(); 1077 1078 // make sure we aren't running a snapshot on the same table 1079 if (isTableTakingAnySnapshot(tableName)) { 1080 throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); 1081 } 1082 1083 // make sure we aren't running a restore on the same table 1084 if (isRestoringTable(tableName)) { 1085 throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName); 1086 } 1087 1088 try { 1089 TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName); 1090 long procId = master.getMasterProcedureExecutor().submitProcedure( 1091 new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(), 1092 oldDescriptor, tableDescriptor, snapshot, restoreAcl), 1093 nonceKey); 1094 this.restoreTableToProcIdMap.put(tableName, procId); 1095 return procId; 1096 } catch (Exception e) { 1097 String msg = "Couldn't restore the snapshot=" 1098 + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName; 1099 LOG.error(msg, e); 1100 throw new RestoreSnapshotException(msg, e); 1101 } 1102 } 1103 1104 /** 1105 * Verify if the restore of the specified table is in progress. 1106 * @param tableName table under restore 1107 * @return <tt>true</tt> if there is a restore in progress of the specified table. 1108 */ 1109 private synchronized boolean isRestoringTable(final TableName tableName) { 1110 Long procId = this.restoreTableToProcIdMap.get(tableName); 1111 if (procId == null) { 1112 return false; 1113 } 1114 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1115 if (procExec.isRunning() && !procExec.isFinished(procId)) { 1116 return true; 1117 } else { 1118 this.restoreTableToProcIdMap.remove(tableName); 1119 return false; 1120 } 1121 } 1122 1123 /** 1124 * Return the handler if it is currently live and has the same snapshot target name. The handler 1125 * is removed from the sentinels map if completed. 1126 * @param sentinels live handlers 1127 * @param snapshot snapshot description 1128 * @return null if doesn't match, else a live handler. 1129 */ 1130 private synchronized SnapshotSentinel removeSentinelIfFinished( 1131 final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) { 1132 if (!snapshot.hasTable()) { 1133 return null; 1134 } 1135 1136 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 1137 SnapshotSentinel h = sentinels.get(snapshotTable); 1138 if (h == null) { 1139 return null; 1140 } 1141 1142 if (!h.getSnapshot().getName().equals(snapshot.getName())) { 1143 // specified snapshot is to the one currently running 1144 return null; 1145 } 1146 1147 // Remove from the "in-progress" list once completed 1148 if (h.isFinished()) { 1149 sentinels.remove(snapshotTable); 1150 } 1151 1152 return h; 1153 } 1154 1155 /** 1156 * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API 1157 * the operation status is checked until completed, and the in-progress maps are cleaned up when 1158 * the status of a completed task is requested. To avoid having sentinels staying around for long 1159 * time if something client side is failed, each operation tries to clean up the in-progress maps 1160 * sentinels finished from a long time. 1161 */ 1162 private void cleanupSentinels() { 1163 cleanupSentinels(this.snapshotHandlers); 1164 cleanupCompletedRestoreInMap(); 1165 cleanupCompletedSnapshotInMap(); 1166 } 1167 1168 /** 1169 * Remove the sentinels that are marked as finished and the completion time has exceeded the 1170 * removal timeout. 1171 * @param sentinels map of sentinels to clean 1172 */ 1173 private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) { 1174 long currentTime = EnvironmentEdgeManager.currentTime(); 1175 long sentinelsCleanupTimeoutMillis = 1176 master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 1177 SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT); 1178 Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator(); 1179 while (it.hasNext()) { 1180 Map.Entry<TableName, SnapshotSentinel> entry = it.next(); 1181 SnapshotSentinel sentinel = entry.getValue(); 1182 if ( 1183 sentinel.isFinished() 1184 && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis 1185 ) { 1186 it.remove(); 1187 } 1188 } 1189 } 1190 1191 /** 1192 * Remove the procedures that are marked as finished 1193 */ 1194 private synchronized void cleanupCompletedRestoreInMap() { 1195 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1196 Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator(); 1197 while (it.hasNext()) { 1198 Map.Entry<TableName, Long> entry = it.next(); 1199 Long procId = entry.getValue(); 1200 if (procExec.isRunning() && procExec.isFinished(procId)) { 1201 it.remove(); 1202 } 1203 } 1204 } 1205 1206 /** 1207 * Remove the procedures that are marked as finished 1208 */ 1209 private synchronized void cleanupCompletedSnapshotInMap() { 1210 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1211 Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator(); 1212 while (it.hasNext()) { 1213 Map.Entry<SnapshotDescription, Long> entry = it.next(); 1214 Long procId = entry.getValue(); 1215 if (procExec.isRunning() && procExec.isFinished(procId)) { 1216 it.remove(); 1217 } 1218 } 1219 } 1220 1221 // 1222 // Implementing Stoppable interface 1223 // 1224 1225 @Override 1226 public void stop(String why) { 1227 // short circuit 1228 if (this.stopped) return; 1229 // make sure we get stop 1230 this.stopped = true; 1231 // pass the stop onto take snapshot handlers 1232 for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) { 1233 snapshotHandler.cancel(why); 1234 } 1235 if (snapshotHandlerChoreCleanerTask != null) { 1236 snapshotHandlerChoreCleanerTask.cancel(true); 1237 } 1238 try { 1239 if (coordinator != null) { 1240 coordinator.close(); 1241 } 1242 } catch (IOException e) { 1243 LOG.error("stop ProcedureCoordinator error", e); 1244 } 1245 } 1246 1247 @Override 1248 public boolean isStopped() { 1249 return this.stopped; 1250 } 1251 1252 /** 1253 * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported. 1254 * Called at the beginning of snapshot() and restoreSnapshot() methods. 1255 * @throws UnsupportedOperationException if snapshot are not supported 1256 */ 1257 public void checkSnapshotSupport() throws UnsupportedOperationException { 1258 if (!this.isSnapshotSupported) { 1259 throw new UnsupportedOperationException( 1260 "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" 1261 + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'."); 1262 } 1263 } 1264 1265 /** 1266 * Called at startup, to verify if snapshot operation is supported, and to avoid starting the 1267 * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end 1268 * up with snapshot data loss. 1269 * @param conf The {@link Configuration} object to use 1270 * @param mfs The MasterFileSystem to use 1271 * @throws IOException in case of file-system operation failure 1272 * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the 1273 * system 1274 */ 1275 private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs) 1276 throws IOException, UnsupportedOperationException { 1277 // Verify if snapshot is disabled by the user 1278 String enabled = conf.get(HBASE_SNAPSHOT_ENABLED); 1279 boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false); 1280 boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled); 1281 1282 // Extract cleaners from conf 1283 Set<String> hfileCleaners = new HashSet<>(); 1284 String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); 1285 if (cleaners != null) Collections.addAll(hfileCleaners, cleaners); 1286 1287 Set<String> logCleaners = new HashSet<>(); 1288 cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS); 1289 if (cleaners != null) Collections.addAll(logCleaners, cleaners); 1290 1291 // check if an older version of snapshot directory was present 1292 Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME); 1293 FileSystem fs = mfs.getFileSystem(); 1294 List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false); 1295 if (ss != null && !ss.isEmpty()) { 1296 LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir); 1297 LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME); 1298 } 1299 1300 // If the user has enabled the snapshot, we force the cleaners to be present 1301 // otherwise we still need to check if cleaners are enabled or not and verify 1302 // that there're no snapshot in the .snapshot folder. 1303 if (snapshotEnabled) { 1304 // Inject snapshot cleaners, if snapshot.enable is true 1305 hfileCleaners.add(SnapshotHFileCleaner.class.getName()); 1306 hfileCleaners.add(HFileLinkCleaner.class.getName()); 1307 // If sync acl to HDFS feature is enabled, then inject the cleaner 1308 if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) { 1309 hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName()); 1310 } 1311 1312 // Set cleaners conf 1313 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 1314 hfileCleaners.toArray(new String[hfileCleaners.size()])); 1315 conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, 1316 logCleaners.toArray(new String[logCleaners.size()])); 1317 } else { 1318 // There may be restore tables if snapshot is enabled and then disabled, so add 1319 // HFileLinkCleaner, see HBASE-26670 for more details. 1320 hfileCleaners.add(HFileLinkCleaner.class.getName()); 1321 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 1322 hfileCleaners.toArray(new String[hfileCleaners.size()])); 1323 // Verify if SnapshotHFileCleaner are present 1324 snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName()); 1325 1326 // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set. 1327 if (snapshotEnabled) { 1328 LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '" 1329 + HBASE_SNAPSHOT_ENABLED + "' property " 1330 + (userDisabled ? "is set to 'false'." : "is not set.")); 1331 } 1332 } 1333 1334 // Mark snapshot feature as enabled if cleaners are present and user has not disabled it. 1335 this.isSnapshotSupported = snapshotEnabled && !userDisabled; 1336 1337 // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder 1338 // otherwise we end up with snapshot data loss. 1339 if (!snapshotEnabled) { 1340 LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners."); 1341 Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir()); 1342 if (fs.exists(snapshotDir)) { 1343 FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir, 1344 new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); 1345 if (snapshots != null) { 1346 LOG.error("Snapshots are present, but cleaners are not enabled."); 1347 checkSnapshotSupport(); 1348 } 1349 } 1350 } 1351 } 1352 1353 @Override 1354 public void initialize(MasterServices master, MetricsMaster metricsMaster) 1355 throws KeeperException, IOException, UnsupportedOperationException { 1356 this.master = master; 1357 1358 this.rootDir = master.getMasterFileSystem().getRootDir(); 1359 checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); 1360 1361 // get the configuration for the coordinator 1362 Configuration conf = master.getConfiguration(); 1363 long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); 1364 long timeoutMillis = Math.max( 1365 conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS, 1366 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME), 1367 conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS, 1368 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME)); 1369 int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT); 1370 1371 // setup the default procedure coordinator 1372 String name = master.getServerName().toString(); 1373 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); 1374 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(), 1375 SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); 1376 1377 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); 1378 this.executorService = master.getExecutorService(); 1379 this.verifyWorkerAssigner = 1380 new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3), 1381 new ProcedureEvent<>("snapshot-verify-worker-assigning")); 1382 restoreUnfinishedSnapshotProcedure(); 1383 restoreWorkers(); 1384 resetTempDir(); 1385 snapshotHandlerChoreCleanerTask = 1386 scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS); 1387 } 1388 1389 private void restoreUnfinishedSnapshotProcedure() { 1390 master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream() 1391 .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished()) 1392 .map(p -> (SnapshotProcedure) p).forEach(p -> { 1393 registerSnapshotProcedure(p.getSnapshot(), p.getProcId()); 1394 LOG.info("restore unfinished snapshot procedure {}", p); 1395 }); 1396 } 1397 1398 @Override 1399 public String getProcedureSignature() { 1400 return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; 1401 } 1402 1403 @Override 1404 public void execProcedure(ProcedureDescription desc) throws IOException { 1405 takeSnapshot(toSnapshotDescription(desc)); 1406 } 1407 1408 @Override 1409 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 1410 throws IOException { 1411 // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path). 1412 // In future, when we AC is removed for good, that check should be moved here. 1413 } 1414 1415 @Override 1416 public boolean isProcedureDone(ProcedureDescription desc) throws IOException { 1417 return isSnapshotDone(toSnapshotDescription(desc)); 1418 } 1419 1420 private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException { 1421 SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); 1422 if (!desc.hasInstance()) { 1423 throw new IOException("Snapshot name is not defined: " + desc.toString()); 1424 } 1425 String snapshotName = desc.getInstance(); 1426 List<NameStringPair> props = desc.getConfigurationList(); 1427 String table = null; 1428 for (NameStringPair prop : props) { 1429 if ("table".equalsIgnoreCase(prop.getName())) { 1430 table = prop.getValue(); 1431 } 1432 } 1433 if (table == null) { 1434 throw new IOException("Snapshot table is not defined: " + desc.toString()); 1435 } 1436 TableName tableName = TableName.valueOf(table); 1437 builder.setTable(tableName.getNameAsString()); 1438 builder.setName(snapshotName); 1439 builder.setType(SnapshotDescription.Type.FLUSH); 1440 return builder.build(); 1441 } 1442 1443 public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) { 1444 snapshotToProcIdMap.put(snapshot, procId); 1445 LOG.debug("register snapshot={}, snapshot procedure id = {}", 1446 ClientSnapshotDescriptionUtils.toString(snapshot), procId); 1447 } 1448 1449 public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) { 1450 snapshotToProcIdMap.remove(snapshot, procId); 1451 LOG.debug("unregister snapshot={}, snapshot procedure id = {}", 1452 ClientSnapshotDescriptionUtils.toString(snapshot), procId); 1453 } 1454 1455 public boolean snapshotProcedureEnabled() { 1456 return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED, 1457 SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 1458 } 1459 1460 public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) 1461 throws ProcedureSuspendedException { 1462 Optional<ServerName> worker = verifyWorkerAssigner.acquire(); 1463 if (worker.isPresent()) { 1464 LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); 1465 return worker.get(); 1466 } 1467 verifyWorkerAssigner.suspend(procedure); 1468 throw new ProcedureSuspendedException(); 1469 } 1470 1471 public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker, 1472 MasterProcedureScheduler scheduler) { 1473 LOG.debug("{} Release verify snapshot worker={}", procedure, worker); 1474 verifyWorkerAssigner.release(worker); 1475 verifyWorkerAssigner.wake(scheduler); 1476 } 1477 1478 private void restoreWorkers() { 1479 master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream() 1480 .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p) 1481 .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> { 1482 verifyWorkerAssigner.addUsedWorker(p.getServerName()); 1483 LOG.debug("{} restores used worker {}", p, p.getServerName()); 1484 }); 1485 } 1486 1487 public Integer getAvailableWorker(ServerName serverName) { 1488 return verifyWorkerAssigner.getAvailableWorker(serverName); 1489 } 1490}