001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.snapshot; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.HashSet; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.Executors; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.ScheduledFuture; 034import java.util.concurrent.ThreadPoolExecutor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.locks.ReadWriteLock; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038import java.util.stream.Collectors; 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.CommonPathCapabilities; 041import org.apache.hadoop.fs.FSDataInputStream; 042import org.apache.hadoop.fs.FileStatus; 043import org.apache.hadoop.fs.FileSystem; 044import org.apache.hadoop.fs.Path; 045import org.apache.hadoop.fs.permission.AclEntry; 046import org.apache.hadoop.fs.permission.AclStatus; 047import org.apache.hadoop.hbase.HBaseInterfaceAudience; 048import org.apache.hadoop.hbase.HConstants; 049import org.apache.hadoop.hbase.ServerName; 050import org.apache.hadoop.hbase.Stoppable; 051import org.apache.hadoop.hbase.TableName; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.client.TableState; 055import org.apache.hadoop.hbase.errorhandling.ForeignException; 056import org.apache.hadoop.hbase.executor.ExecutorService; 057import org.apache.hadoop.hbase.ipc.RpcServer; 058import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 059import org.apache.hadoop.hbase.master.MasterFileSystem; 060import org.apache.hadoop.hbase.master.MasterServices; 061import org.apache.hadoop.hbase.master.MetricsMaster; 062import org.apache.hadoop.hbase.master.SnapshotSentinel; 063import org.apache.hadoop.hbase.master.WorkerAssigner; 064import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; 065import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; 066import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; 067import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 069import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; 070import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; 071import org.apache.hadoop.hbase.master.procedure.SnapshotVerifyProcedure; 072import org.apache.hadoop.hbase.procedure.MasterProcedureManager; 073import org.apache.hadoop.hbase.procedure.Procedure; 074import org.apache.hadoop.hbase.procedure.ProcedureCoordinator; 075import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; 076import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator; 077import org.apache.hadoop.hbase.procedure2.ProcedureEvent; 078import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 079import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 080import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerValidationUtils; 081import org.apache.hadoop.hbase.security.AccessDeniedException; 082import org.apache.hadoop.hbase.security.User; 083import org.apache.hadoop.hbase.security.access.AccessChecker; 084import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner; 085import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper; 086import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 087import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; 088import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; 089import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 090import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 091import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException; 092import org.apache.hadoop.hbase.snapshot.SnapshotExistsException; 093import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 094import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil; 095import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException; 096import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; 097import org.apache.hadoop.hbase.util.CommonFSUtils; 098import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 099import org.apache.hadoop.hbase.util.NonceKey; 100import org.apache.hadoop.hbase.util.TableDescriptorChecker; 101import org.apache.yetus.audience.InterfaceAudience; 102import org.apache.yetus.audience.InterfaceStability; 103import org.apache.zookeeper.KeeperException; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 108 109import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 110import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; 111import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; 112import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 113import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type; 114 115/** 116 * This class manages the procedure of taking and restoring snapshots. There is only one 117 * SnapshotManager for the master. 118 * <p> 119 * The class provides methods for monitoring in-progress snapshot actions. 120 * <p> 121 * Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a 122 * simplification in the current implementation. 123 */ 124@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 125@InterfaceStability.Unstable 126public class SnapshotManager extends MasterProcedureManager implements Stoppable { 127 private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); 128 129 /** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */ 130 private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500; 131 132 /** 133 * Wait time before removing a finished sentinel from the in-progress map NOTE: This is used as a 134 * safety auto cleanup. The snapshot and restore handlers map entries are removed when a user asks 135 * if a snapshot or restore is completed. This operation is part of the HBaseAdmin 136 * snapshot/restore API flow. In case something fails on the client side and the snapshot/restore 137 * state is not reclaimed after a default timeout, the entry is removed from the in-progress map. 138 * At this point, if the user asks for the snapshot/restore status, the result will be snapshot 139 * done if exists or failed if it doesn't exists. 140 */ 141 public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS = 142 "hbase.snapshot.sentinels.cleanup.timeoutMillis"; 143 public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L; 144 145 /** Enable or disable snapshot support */ 146 public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled"; 147 148 /** 149 * Conf key for # of ms elapsed between checks for snapshot errors while waiting for completion. 150 */ 151 private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis"; 152 153 /** Name of the operation to use in the controller */ 154 public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot"; 155 156 /** Conf key for # of threads used by the SnapshotManager thread pool */ 157 public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads"; 158 159 /** number of current operations running on the master */ 160 public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1; 161 162 /** Conf key for preserving original max file size configs */ 163 public static final String SNAPSHOT_MAX_FILE_SIZE_PRESERVE = 164 "hbase.snapshot.max.filesize.preserve"; 165 166 /** Enable or disable snapshot procedure */ 167 public static final String SNAPSHOT_PROCEDURE_ENABLED = "hbase.snapshot.procedure.enabled"; 168 169 public static final boolean SNAPSHOT_PROCEDURE_ENABLED_DEFAULT = true; 170 171 private boolean stopped; 172 private MasterServices master; // Needed by TableEventHandlers 173 private ProcedureCoordinator coordinator; 174 175 // Is snapshot feature enabled? 176 private boolean isSnapshotSupported = false; 177 178 // Snapshot handlers map, with table name as key. 179 // The map is always accessed and modified under the object lock using synchronized. 180 // snapshotTable() will insert an Handler in the table. 181 // isSnapshotDone() will remove the handler requested if the operation is finished. 182 private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>(); 183 private final ScheduledExecutorService scheduleThreadPool = 184 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() 185 .setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build()); 186 private ScheduledFuture<?> snapshotHandlerChoreCleanerTask; 187 188 // Restore map, with table name as key, procedure ID as value. 189 // The map is always accessed and modified under the object lock using synchronized. 190 // restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map. 191 // 192 // TODO: just as the Apache HBase 1.x implementation, this map would not survive master 193 // restart/failover. This is just a stopgap implementation until implementation of taking 194 // snapshot using Procedure-V2. 195 private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>(); 196 197 // SnapshotDescription -> SnapshotProcId 198 private final ConcurrentHashMap<SnapshotDescription, Long> snapshotToProcIdMap = 199 new ConcurrentHashMap<>(); 200 201 private WorkerAssigner verifyWorkerAssigner; 202 203 private Path rootDir; 204 private ExecutorService executorService; 205 206 /** 207 * Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to 208 * check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would 209 * belongs to the newly creating snapshot. So we should grab the write lock first when cleaner 210 * start to work. (See HBASE-21387) 211 */ 212 private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true); 213 214 public SnapshotManager() { 215 } 216 217 /** 218 * Fully specify all necessary components of a snapshot manager. Exposed for testing. 219 * @param master services for the master where the manager is running 220 * @param coordinator procedure coordinator instance. exposed for testing. 221 * @param pool HBase ExecutorServcie instance, exposed for testing. 222 */ 223 @InterfaceAudience.Private 224 SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator, 225 ExecutorService pool, int sentinelCleanInterval) 226 throws IOException, UnsupportedOperationException { 227 this.master = master; 228 229 this.rootDir = master.getMasterFileSystem().getRootDir(); 230 Configuration conf = master.getConfiguration(); 231 checkSnapshotSupport(conf, master.getMasterFileSystem()); 232 233 this.coordinator = coordinator; 234 this.executorService = pool; 235 resetTempDir(); 236 snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate( 237 this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS); 238 } 239 240 /** 241 * Gets the list of all completed snapshots. 242 * @return list of SnapshotDescriptions 243 * @throws IOException File system exception 244 */ 245 public List<SnapshotDescription> getCompletedSnapshots() throws IOException { 246 return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true); 247 } 248 249 /** 250 * Gets the list of all completed snapshots. 251 * @param snapshotDir snapshot directory 252 * @param withCpCall Whether to call CP hooks 253 * @return list of SnapshotDescriptions 254 * @throws IOException File system exception 255 */ 256 private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall) 257 throws IOException { 258 List<SnapshotDescription> snapshotDescs = new ArrayList<>(); 259 // first create the snapshot root path and check to see if it exists 260 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 261 if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir); 262 263 // if there are no snapshots, return an empty list 264 if (!fs.exists(snapshotDir)) { 265 return snapshotDescs; 266 } 267 268 // ignore all the snapshots in progress 269 FileStatus[] snapshots = fs.listStatus(snapshotDir, 270 new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); 271 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 272 withCpCall = withCpCall && cpHost != null; 273 // loop through all the completed snapshots 274 for (FileStatus snapshot : snapshots) { 275 Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE); 276 // if the snapshot is bad 277 if (!fs.exists(info)) { 278 LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist"); 279 continue; 280 } 281 FSDataInputStream in = null; 282 try { 283 in = fs.open(info); 284 SnapshotDescription desc = SnapshotDescription.parseFrom(in); 285 org.apache.hadoop.hbase.client.SnapshotDescription descPOJO = 286 (withCpCall) ? ProtobufUtil.createSnapshotDesc(desc) : null; 287 if (withCpCall) { 288 try { 289 cpHost.preListSnapshot(descPOJO); 290 } catch (AccessDeniedException e) { 291 LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. " 292 + "Either you should be owner of this snapshot or admin user."); 293 // Skip this and try for next snapshot 294 continue; 295 } 296 } 297 snapshotDescs.add(desc); 298 299 // call coproc post hook 300 if (withCpCall) { 301 cpHost.postListSnapshot(descPOJO); 302 } 303 } catch (IOException e) { 304 LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e); 305 } finally { 306 if (in != null) { 307 in.close(); 308 } 309 } 310 } 311 return snapshotDescs; 312 } 313 314 /** 315 * Cleans up any zk-coordinated snapshots in the snapshot/.tmp directory that were left from 316 * failed snapshot attempts. For unfinished procedure2-coordinated snapshots, keep the working 317 * directory. 318 * @throws IOException if we can't reach the filesystem 319 */ 320 private void resetTempDir() throws IOException { 321 Set<String> workingProcedureCoordinatedSnapshotNames = 322 snapshotToProcIdMap.keySet().stream().map(s -> s.getName()).collect(Collectors.toSet()); 323 324 Path tmpdir = 325 SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir, master.getConfiguration()); 326 FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration()); 327 FileStatus[] workingSnapshotDirs = CommonFSUtils.listStatus(tmpFs, tmpdir); 328 if (workingSnapshotDirs == null) { 329 return; 330 } 331 for (FileStatus workingSnapshotDir : workingSnapshotDirs) { 332 String workingSnapshotName = workingSnapshotDir.getPath().getName(); 333 if (!workingProcedureCoordinatedSnapshotNames.contains(workingSnapshotName)) { 334 try { 335 if (tmpFs.delete(workingSnapshotDir.getPath(), true)) { 336 LOG.info("delete unfinished zk-coordinated snapshot working directory {}", 337 workingSnapshotDir.getPath()); 338 } else { 339 LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", 340 workingSnapshotDir.getPath()); 341 } 342 } catch (IOException e) { 343 LOG.warn("Couldn't delete unfinished zk-coordinated snapshot working directory {}", 344 workingSnapshotDir.getPath(), e); 345 } 346 } else { 347 LOG.debug("find working directory of unfinished procedure {}", workingSnapshotName); 348 } 349 } 350 } 351 352 /** 353 * Delete the specified snapshot 354 * @throws SnapshotDoesNotExistException If the specified snapshot does not exist. 355 * @throws IOException For filesystem IOExceptions 356 */ 357 public void deleteSnapshot(SnapshotDescription snapshot) throws IOException { 358 // check to see if it is completed 359 if (!isSnapshotCompleted(snapshot)) { 360 throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot)); 361 } 362 363 String snapshotName = snapshot.getName(); 364 // first create the snapshot description and check to see if it exists 365 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 366 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); 367 // Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with 368 // just the "name" and it does not contains the "real" snapshot information 369 snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 370 371 // call coproc pre hook 372 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 373 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 374 if (cpHost != null) { 375 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 376 cpHost.preDeleteSnapshot(snapshotPOJO); 377 } 378 379 LOG.debug("Deleting snapshot: " + snapshotName); 380 // delete the existing snapshot 381 if (!fs.delete(snapshotDir, true)) { 382 throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir); 383 } 384 385 // call coproc post hook 386 if (cpHost != null) { 387 cpHost.postDeleteSnapshot(snapshotPOJO); 388 } 389 390 } 391 392 /** 393 * Check if the specified snapshot is done 394 * @return true if snapshot is ready to be restored, false if it is still being taken. 395 * @throws IOException IOException if error from HDFS or RPC 396 * @throws UnknownSnapshotException if snapshot is invalid or does not exist. 397 */ 398 public boolean isSnapshotDone(SnapshotDescription expected) throws IOException { 399 // check the request to make sure it has a snapshot 400 if (expected == null) { 401 throw new UnknownSnapshotException( 402 "No snapshot name passed in request, can't figure out which snapshot you want to check."); 403 } 404 405 Long procId = snapshotToProcIdMap.get(expected); 406 if (procId != null) { 407 if (master.getMasterProcedureExecutor().isRunning()) { 408 return master.getMasterProcedureExecutor().isFinished(procId); 409 } else { 410 return false; 411 } 412 } 413 414 String ssString = ClientSnapshotDescriptionUtils.toString(expected); 415 416 // check to see if the sentinel exists, 417 // and if the task is complete removes it from the in-progress snapshots map. 418 SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected); 419 420 // stop tracking "abandoned" handlers 421 cleanupSentinels(); 422 423 if (handler == null) { 424 // If there's no handler in the in-progress map, it means one of the following: 425 // - someone has already requested the snapshot state 426 // - the requested snapshot was completed long time ago (cleanupSentinels() timeout) 427 // - the snapshot was never requested 428 // In those cases returns to the user the "done state" if the snapshots exists on disk, 429 // otherwise raise an exception saying that the snapshot is not running and doesn't exist. 430 if (!isSnapshotCompleted(expected)) { 431 throw new UnknownSnapshotException("Snapshot " + ssString 432 + " is not currently running or one of the known completed snapshots."); 433 } 434 // was done, return true; 435 return true; 436 } 437 438 // pass on any failure we find in the sentinel 439 try { 440 handler.rethrowExceptionIfFailed(); 441 } catch (ForeignException e) { 442 // Give some procedure info on an exception. 443 String status; 444 Procedure p = coordinator.getProcedure(expected.getName()); 445 if (p != null) { 446 status = p.getStatus(); 447 } else { 448 status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames(); 449 } 450 throw new HBaseSnapshotException("Snapshot " + ssString + " had an error. " + status, e, 451 ProtobufUtil.createSnapshotDesc(expected)); 452 } 453 454 // check to see if we are done 455 if (handler.isFinished()) { 456 LOG.debug("Snapshot '" + ssString + "' has completed, notifying client."); 457 return true; 458 } else if (LOG.isDebugEnabled()) { 459 LOG.debug("Snapshoting '" + ssString + "' is still in progress!"); 460 } 461 return false; 462 } 463 464 /** 465 * Check to see if there is a snapshot in progress with the same name or on the same table. 466 * Currently we have a limitation only allowing a single snapshot per table at a time. Also we 467 * don't allow snapshot with the same name. 468 * @param snapshot description of the snapshot being checked. 469 * @param checkTable check if the table is already taking a snapshot. 470 * @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same 471 * table. 472 */ 473 synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot, boolean checkTable) { 474 if (checkTable) { 475 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 476 if (isTakingSnapshot(snapshotTable)) { 477 return true; 478 } 479 } 480 Iterator<Map.Entry<TableName, SnapshotSentinel>> it = snapshotHandlers.entrySet().iterator(); 481 while (it.hasNext()) { 482 Map.Entry<TableName, SnapshotSentinel> entry = it.next(); 483 SnapshotSentinel sentinel = entry.getValue(); 484 if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) { 485 return true; 486 } 487 } 488 Iterator<Map.Entry<SnapshotDescription, Long>> spIt = snapshotToProcIdMap.entrySet().iterator(); 489 while (spIt.hasNext()) { 490 Map.Entry<SnapshotDescription, Long> entry = spIt.next(); 491 if ( 492 snapshot.getName().equals(entry.getKey().getName()) 493 && !master.getMasterProcedureExecutor().isFinished(entry.getValue()) 494 ) { 495 return true; 496 } 497 } 498 return false; 499 } 500 501 /** 502 * Check to see if the specified table has a snapshot in progress. Currently we have a limitation 503 * only allowing a single snapshot per table at a time. 504 * @param tableName name of the table being snapshotted. 505 * @return <tt>true</tt> if there is a snapshot in progress on the specified table. 506 */ 507 public boolean isTakingSnapshot(final TableName tableName) { 508 return isTakingSnapshot(tableName, false); 509 } 510 511 public boolean isTableTakingAnySnapshot(final TableName tableName) { 512 return isTakingSnapshot(tableName, true); 513 } 514 515 /** 516 * Check to see if the specified table has a snapshot in progress. Since we introduce the 517 * SnapshotProcedure, it is a little bit different from before. For zk-coordinated snapshot, we 518 * can just consider tables in snapshotHandlers only, but for 519 * {@link org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure} and 520 * {@link org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure}, we need to 521 * consider tables in snapshotToProcIdMap also, for the snapshot procedure, we don't need to check 522 * if table in snapshot. 523 * @param tableName name of the table being snapshotted. 524 * @param checkProcedure true if we should check tables in snapshotToProcIdMap 525 * @return <tt>true</tt> if there is a snapshot in progress on the specified table. 526 */ 527 private synchronized boolean isTakingSnapshot(TableName tableName, boolean checkProcedure) { 528 SnapshotSentinel handler = this.snapshotHandlers.get(tableName); 529 if (handler != null && !handler.isFinished()) { 530 return true; 531 } 532 if (checkProcedure) { 533 for (Map.Entry<SnapshotDescription, Long> entry : snapshotToProcIdMap.entrySet()) { 534 if ( 535 TableName.valueOf(entry.getKey().getTable()).equals(tableName) 536 && !master.getMasterProcedureExecutor().isFinished(entry.getValue()) 537 ) { 538 return true; 539 } 540 } 541 } 542 return false; 543 } 544 545 /** 546 * Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we 547 * aren't already running a snapshot or restore on the requested table. 548 * @param snapshot description of the snapshot we want to start 549 * @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot 550 */ 551 public synchronized void prepareWorkingDirectory(SnapshotDescription snapshot) 552 throws HBaseSnapshotException { 553 Path workingDir = 554 SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, master.getConfiguration()); 555 556 try { 557 FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration()); 558 // delete the working directory, since we aren't running the snapshot. Likely leftovers 559 // from a failed attempt. 560 workingDirFS.delete(workingDir, true); 561 562 // recreate the working directory for the snapshot 563 if (!workingDirFS.mkdirs(workingDir)) { 564 throw new SnapshotCreationException( 565 "Couldn't create working directory (" + workingDir + ") for snapshot", 566 ProtobufUtil.createSnapshotDesc(snapshot)); 567 } 568 updateWorkingDirAclsIfRequired(workingDir, workingDirFS); 569 } catch (HBaseSnapshotException e) { 570 throw e; 571 } catch (IOException e) { 572 throw new SnapshotCreationException( 573 "Exception while checking to see if snapshot could be started.", e, 574 ProtobufUtil.createSnapshotDesc(snapshot)); 575 } 576 } 577 578 /** 579 * If the parent dir of the snapshot working dir (e.g. /hbase/.hbase-snapshot) has non-empty ACLs, 580 * use them for the current working dir (e.g. /hbase/.hbase-snapshot/.tmp/{snapshot-name}) so that 581 * regardless of whether the snapshot commit phase performs atomic rename or non-atomic copy of 582 * the working dir to new snapshot dir, the ACLs are retained. 583 * @param workingDir working dir to build the snapshot. 584 * @param workingDirFS working dir file system. 585 * @throws IOException If ACL read/modify operation fails. 586 */ 587 private static void updateWorkingDirAclsIfRequired(Path workingDir, FileSystem workingDirFS) 588 throws IOException { 589 if ( 590 !workingDirFS.hasPathCapability(workingDir, CommonPathCapabilities.FS_ACLS) 591 || workingDir.getParent() == null || workingDir.getParent().getParent() == null 592 ) { 593 return; 594 } 595 AclStatus snapshotWorkingParentDirStatus; 596 try { 597 snapshotWorkingParentDirStatus = 598 workingDirFS.getAclStatus(workingDir.getParent().getParent()); 599 } catch (IOException e) { 600 LOG.warn("Unable to retrieve ACL status for path: {}, current working dir path: {}", 601 workingDir.getParent().getParent(), workingDir, e); 602 return; 603 } 604 List<AclEntry> snapshotWorkingParentDirAclStatusEntries = 605 snapshotWorkingParentDirStatus.getEntries(); 606 if ( 607 snapshotWorkingParentDirAclStatusEntries != null 608 && snapshotWorkingParentDirAclStatusEntries.size() > 0 609 ) { 610 workingDirFS.modifyAclEntries(workingDir, snapshotWorkingParentDirAclStatusEntries); 611 } 612 } 613 614 /** 615 * Take a snapshot of a disabled table. 616 * @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}. 617 * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary 618 * directory could not be determined 619 */ 620 private synchronized void snapshotDisabledTable(SnapshotDescription snapshot) throws IOException { 621 // setup the snapshot 622 prepareWorkingDirectory(snapshot); 623 624 // set the snapshot to be a disabled snapshot, since the client doesn't know about that 625 snapshot = snapshot.toBuilder().setType(Type.DISABLED).build(); 626 627 // Take the snapshot of the disabled table 628 DisabledTableSnapshotHandler handler = new DisabledTableSnapshotHandler(snapshot, master, this); 629 snapshotTable(snapshot, handler); 630 } 631 632 /** 633 * Take a snapshot of an enabled table. 634 * @param snapshot description of the snapshot to take. 635 * @throws IOException if the snapshot could not be started or filesystem for snapshot temporary 636 * directory could not be determined 637 */ 638 private synchronized void snapshotEnabledTable(SnapshotDescription snapshot) throws IOException { 639 // setup the snapshot 640 prepareWorkingDirectory(snapshot); 641 642 // Take the snapshot of the enabled table 643 EnabledTableSnapshotHandler handler = new EnabledTableSnapshotHandler(snapshot, master, this); 644 snapshotTable(snapshot, handler); 645 } 646 647 /** 648 * Take a snapshot using the specified handler. On failure the snapshot temporary working 649 * directory is removed. NOTE: prepareToTakeSnapshot() called before this one takes care of the 650 * rejecting the snapshot request if the table is busy with another snapshot/restore operation. 651 * @param snapshot the snapshot description 652 * @param handler the snapshot handler 653 */ 654 private synchronized void snapshotTable(SnapshotDescription snapshot, 655 final TakeSnapshotHandler handler) throws IOException { 656 try { 657 handler.prepare(); 658 this.executorService.submit(handler); 659 this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler); 660 } catch (Exception e) { 661 // cleanup the working directory by trying to delete it from the fs. 662 Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, 663 master.getConfiguration()); 664 FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration()); 665 try { 666 if (!workingDirFs.delete(workingDir, true)) { 667 LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" 668 + ClientSnapshotDescriptionUtils.toString(snapshot)); 669 } 670 } catch (IOException e1) { 671 LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" 672 + ClientSnapshotDescriptionUtils.toString(snapshot)); 673 } 674 // fail the snapshot 675 throw new SnapshotCreationException("Could not build snapshot handler", e, 676 ProtobufUtil.createSnapshotDesc(snapshot)); 677 } 678 } 679 680 public ReadWriteLock getTakingSnapshotLock() { 681 return this.takingSnapshotLock; 682 } 683 684 /** 685 * The snapshot operation processing as following: <br> 686 * 1. Create a Snapshot Handler, and do some initialization; <br> 687 * 2. Put the handler into snapshotHandlers <br> 688 * So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock 689 * and snapshotHandlers; 690 * @return true to indicate that there're some running snapshots. 691 */ 692 public synchronized boolean isTakingAnySnapshot() { 693 return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0 694 || this.snapshotToProcIdMap.size() > 0; 695 } 696 697 /** 698 * Take a snapshot based on the enabled/disabled state of the table. 699 * @throws HBaseSnapshotException when a snapshot specific exception occurs. 700 * @throws IOException when some sort of generic IO exception occurs. 701 */ 702 public void takeSnapshot(SnapshotDescription snapshot) throws IOException { 703 this.takingSnapshotLock.readLock().lock(); 704 try { 705 takeSnapshotInternal(snapshot); 706 } finally { 707 this.takingSnapshotLock.readLock().unlock(); 708 } 709 } 710 711 public long takeSnapshot(SnapshotDescription snapshot, long nonceGroup, long nonce) 712 throws IOException { 713 this.takingSnapshotLock.readLock().lock(); 714 try { 715 return submitSnapshotProcedure(snapshot, nonceGroup, nonce); 716 } finally { 717 this.takingSnapshotLock.readLock().unlock(); 718 } 719 } 720 721 private synchronized long submitSnapshotProcedure(SnapshotDescription snapshot, long nonceGroup, 722 long nonce) throws IOException { 723 return MasterProcedureUtil 724 .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(master, nonceGroup, nonce) { 725 @Override 726 protected void run() throws IOException { 727 TableDescriptor tableDescriptor = sanityCheckBeforeSnapshot(snapshot, false); 728 MasterCoprocessorHost cpHost = getMaster().getMasterCoprocessorHost(); 729 User user = RpcServer.getRequestUser().orElse(null); 730 org.apache.hadoop.hbase.client.SnapshotDescription snapshotDesc = 731 ProtobufUtil.createSnapshotDesc(snapshot); 732 733 if (cpHost != null) { 734 cpHost.preSnapshot(snapshotDesc, tableDescriptor, user); 735 } 736 737 long procId = submitProcedure(new SnapshotProcedure( 738 getMaster().getMasterProcedureExecutor().getEnvironment(), snapshot)); 739 740 getMaster().getSnapshotManager().registerSnapshotProcedure(snapshot, procId); 741 742 if (cpHost != null) { 743 cpHost.postSnapshot(snapshotDesc, tableDescriptor, user); 744 } 745 } 746 747 @Override 748 protected String getDescription() { 749 return "SnapshotProcedure"; 750 } 751 }); 752 } 753 754 private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException { 755 TableDescriptor desc = sanityCheckBeforeSnapshot(snapshot, true); 756 757 // call pre coproc hook 758 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 759 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 760 if (cpHost != null) { 761 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 762 cpHost.preSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); 763 } 764 765 // if the table is enabled, then have the RS run actually the snapshot work 766 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 767 if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.ENABLED)) { 768 if (LOG.isDebugEnabled()) { 769 LOG.debug("Table enabled, starting distributed snapshots for {}", 770 ClientSnapshotDescriptionUtils.toString(snapshot)); 771 } 772 snapshotEnabledTable(snapshot); 773 if (LOG.isDebugEnabled()) { 774 LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot)); 775 } 776 } 777 // For disabled table, snapshot is created by the master 778 else if (master.getTableStateManager().isTableState(snapshotTable, TableState.State.DISABLED)) { 779 if (LOG.isDebugEnabled()) { 780 LOG.debug("Table is disabled, running snapshot entirely on master for {}", 781 ClientSnapshotDescriptionUtils.toString(snapshot)); 782 } 783 snapshotDisabledTable(snapshot); 784 if (LOG.isDebugEnabled()) { 785 LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot)); 786 } 787 } else { 788 LOG.error("Can't snapshot table '" + snapshot.getTable() 789 + "', isn't open or closed, we don't know what to do!"); 790 TablePartiallyOpenException tpoe = 791 new TablePartiallyOpenException(snapshot.getTable() + " isn't fully open."); 792 throw new SnapshotCreationException("Table is not entirely open or closed", tpoe, 793 ProtobufUtil.createSnapshotDesc(snapshot)); 794 } 795 796 // call post coproc hook 797 if (cpHost != null) { 798 cpHost.postSnapshot(snapshotPOJO, desc, RpcServer.getRequestUser().orElse(null)); 799 } 800 } 801 802 /** 803 * Check if the snapshot can be taken. Currently we have some limitations, for zk-coordinated 804 * snapshot, we don't allow snapshot with same name or taking multiple snapshots of a table at the 805 * same time, for procedure-coordinated snapshot, we don't allow snapshot with same name. 806 * @param snapshot description of the snapshot being checked. 807 * @param checkTable check if the table is already taking a snapshot. For zk-coordinated snapshot, 808 * we need to check if another zk-coordinated snapshot is in progress, for the 809 * snapshot procedure, this is unnecessary. 810 * @return the table descriptor of the table 811 */ 812 private synchronized TableDescriptor sanityCheckBeforeSnapshot(SnapshotDescription snapshot, 813 boolean checkTable) throws IOException { 814 // check to see if we already completed the snapshot 815 if (isSnapshotCompleted(snapshot)) { 816 throw new SnapshotExistsException( 817 "Snapshot '" + snapshot.getName() + "' already stored on the filesystem.", 818 ProtobufUtil.createSnapshotDesc(snapshot)); 819 } 820 LOG.debug("No existing snapshot, attempting snapshot..."); 821 822 // stop tracking "abandoned" handlers 823 cleanupSentinels(); 824 825 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 826 // make sure we aren't already running a snapshot 827 if (isTakingSnapshot(snapshot, checkTable)) { 828 throw new SnapshotCreationException( 829 "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot) 830 + " because we are already running another snapshot" 831 + " on the same table or with the same name"); 832 } 833 834 // make sure we aren't running a restore on the same table 835 if (isRestoringTable(snapshotTable)) { 836 throw new SnapshotCreationException( 837 "Rejected taking " + ClientSnapshotDescriptionUtils.toString(snapshot) 838 + " because we are already have a restore in progress on the same snapshot."); 839 } 840 841 // check to see if the table exists 842 TableDescriptor desc = null; 843 try { 844 desc = master.getTableDescriptors().get(TableName.valueOf(snapshot.getTable())); 845 } catch (FileNotFoundException e) { 846 String msg = "Table:" + snapshot.getTable() + " info doesn't exist!"; 847 LOG.error(msg); 848 throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot)); 849 } catch (IOException e) { 850 throw new SnapshotCreationException( 851 "Error while geting table description for table " + snapshot.getTable(), e, 852 ProtobufUtil.createSnapshotDesc(snapshot)); 853 } 854 if (desc == null) { 855 throw new SnapshotCreationException( 856 "Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.", 857 ProtobufUtil.createSnapshotDesc(snapshot)); 858 } 859 return desc; 860 } 861 862 /** 863 * Set the handler for the current snapshot 864 * <p> 865 * Exposed for TESTING 866 * @param handler handler the master should use TODO get rid of this if possible, repackaging, 867 * modify tests. 868 */ 869 public synchronized void setSnapshotHandlerForTesting(final TableName tableName, 870 final SnapshotSentinel handler) { 871 if (handler != null) { 872 this.snapshotHandlers.put(tableName, handler); 873 } else { 874 this.snapshotHandlers.remove(tableName); 875 } 876 } 877 878 /** Returns distributed commit coordinator for all running snapshots */ 879 ProcedureCoordinator getCoordinator() { 880 return coordinator; 881 } 882 883 /** 884 * Check to see if the snapshot is one of the currently completed snapshots Returns true if the 885 * snapshot exists in the "completed snapshots folder". 886 * @param snapshot expected snapshot to check 887 * @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is 888 * not stored 889 * @throws IOException if the filesystem throws an unexpected exception, 890 * @throws IllegalArgumentException if snapshot name is invalid. 891 */ 892 private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException { 893 try { 894 final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 895 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 896 // check to see if the snapshot already exists 897 return fs.exists(snapshotDir); 898 } catch (IllegalArgumentException iae) { 899 throw new UnknownSnapshotException("Unexpected exception thrown", iae); 900 } 901 } 902 903 /** 904 * Clone the specified snapshot. The clone will fail if the destination table has a snapshot or 905 * restore in progress. 906 * @param reqSnapshot Snapshot Descriptor from request 907 * @param tableName table to clone 908 * @param snapshot Snapshot Descriptor 909 * @param snapshotTableDesc Table Descriptor 910 * @param nonceKey unique identifier to prevent duplicated RPC 911 * @return procId the ID of the clone snapshot procedure 912 */ 913 private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, 914 final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc, 915 final NonceKey nonceKey, final boolean restoreAcl, final String customSFT) throws IOException { 916 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 917 TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc); 918 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 919 if (cpHost != null) { 920 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 921 cpHost.preCloneSnapshot(snapshotPOJO, htd); 922 } 923 long procId; 924 try { 925 procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl, customSFT); 926 } catch (IOException e) { 927 LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName() + " as table " 928 + tableName.getNameAsString(), e); 929 throw e; 930 } 931 LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName); 932 933 if (cpHost != null) { 934 cpHost.postCloneSnapshot(snapshotPOJO, htd); 935 } 936 return procId; 937 } 938 939 /** 940 * Clone the specified snapshot into a new table. The operation will fail if the destination table 941 * has a snapshot or restore in progress. 942 * @param snapshot Snapshot Descriptor 943 * @param tableDescriptor Table Descriptor of the table to create 944 * @param nonceKey unique identifier to prevent duplicated RPC 945 * @return procId the ID of the clone snapshot procedure 946 */ 947 synchronized long cloneSnapshot(final SnapshotDescription snapshot, 948 final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl, 949 final String customSFT) throws HBaseSnapshotException { 950 TableName tableName = tableDescriptor.getTableName(); 951 952 // make sure we aren't running a snapshot on the same table 953 if (isTableTakingAnySnapshot(tableName)) { 954 throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); 955 } 956 957 // make sure we aren't running a restore on the same table 958 if (isRestoringTable(tableName)) { 959 throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName); 960 } 961 962 try { 963 long procId = master.getMasterProcedureExecutor().submitProcedure( 964 new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(), 965 tableDescriptor, snapshot, restoreAcl, customSFT), 966 nonceKey); 967 this.restoreTableToProcIdMap.put(tableName, procId); 968 return procId; 969 } catch (Exception e) { 970 String msg = "Couldn't clone the snapshot=" 971 + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName; 972 LOG.error(msg, e); 973 throw new RestoreSnapshotException(msg, e); 974 } 975 } 976 977 /** 978 * Restore or Clone the specified snapshot 979 * @param nonceKey unique identifier to prevent duplicated RPC 980 */ 981 public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey, 982 final boolean restoreAcl, String customSFT) throws IOException { 983 FileSystem fs = master.getMasterFileSystem().getFileSystem(); 984 Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir); 985 986 // check if the snapshot exists 987 if (!fs.exists(snapshotDir)) { 988 LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist."); 989 throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(reqSnapshot)); 990 } 991 992 // Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with 993 // just the snapshot "name" and table name to restore. It does not contains the "real" snapshot 994 // information. 995 SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); 996 SnapshotManifest manifest = 997 SnapshotManifest.open(master.getConfiguration(), fs, snapshotDir, snapshot); 998 TableDescriptor snapshotTableDesc = manifest.getTableDescriptor(); 999 TableName tableName = TableName.valueOf(reqSnapshot.getTable()); 1000 1001 // sanity check the new table descriptor 1002 TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc); 1003 1004 // stop tracking "abandoned" handlers 1005 cleanupSentinels(); 1006 1007 // Verify snapshot validity 1008 SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest); 1009 1010 // Execute the restore/clone operation 1011 long procId; 1012 if (master.getTableDescriptors().exists(tableName)) { 1013 procId = 1014 restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl); 1015 } else { 1016 procId = cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, 1017 restoreAcl, customSFT); 1018 } 1019 return procId; 1020 } 1021 1022 /** 1023 * Restore the specified snapshot. The restore will fail if the destination table has a snapshot 1024 * or restore in progress. 1025 * @param reqSnapshot Snapshot Descriptor from request 1026 * @param tableName table to restore 1027 * @param snapshot Snapshot Descriptor 1028 * @param snapshotTableDesc Table Descriptor 1029 * @param nonceKey unique identifier to prevent duplicated RPC 1030 * @param restoreAcl true to restore acl of snapshot 1031 * @return procId the ID of the restore snapshot procedure 1032 */ 1033 private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName, 1034 final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc, 1035 final NonceKey nonceKey, final boolean restoreAcl) throws IOException { 1036 MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost(); 1037 1038 // have to check first if restoring the snapshot would break current SFT setup 1039 StoreFileTrackerValidationUtils.validatePreRestoreSnapshot( 1040 master.getTableDescriptors().get(tableName), snapshotTableDesc, master.getConfiguration()); 1041 1042 if ( 1043 master.getTableStateManager().isTableState(TableName.valueOf(snapshot.getTable()), 1044 TableState.State.ENABLED) 1045 ) { 1046 throw new UnsupportedOperationException("Table '" + TableName.valueOf(snapshot.getTable()) 1047 + "' must be disabled in order to " + "perform a restore operation."); 1048 } 1049 1050 // call Coprocessor pre hook 1051 org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null; 1052 if (cpHost != null) { 1053 snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot); 1054 cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc); 1055 } 1056 1057 long procId; 1058 try { 1059 procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl); 1060 } catch (IOException e) { 1061 LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName() 1062 + " as table " + tableName.getNameAsString(), e); 1063 throw e; 1064 } 1065 LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName); 1066 1067 if (cpHost != null) { 1068 cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc); 1069 } 1070 1071 return procId; 1072 } 1073 1074 /** 1075 * Restore the specified snapshot. The restore will fail if the destination table has a snapshot 1076 * or restore in progress. 1077 * @param snapshot Snapshot Descriptor 1078 * @param tableDescriptor Table Descriptor 1079 * @param nonceKey unique identifier to prevent duplicated RPC 1080 * @param restoreAcl true to restore acl of snapshot 1081 * @return procId the ID of the restore snapshot procedure 1082 */ 1083 private synchronized long restoreSnapshot(final SnapshotDescription snapshot, 1084 final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl) 1085 throws HBaseSnapshotException { 1086 final TableName tableName = tableDescriptor.getTableName(); 1087 1088 // make sure we aren't running a snapshot on the same table 1089 if (isTableTakingAnySnapshot(tableName)) { 1090 throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName); 1091 } 1092 1093 // make sure we aren't running a restore on the same table 1094 if (isRestoringTable(tableName)) { 1095 throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName); 1096 } 1097 1098 try { 1099 TableDescriptor oldDescriptor = master.getTableDescriptors().get(tableName); 1100 long procId = master.getMasterProcedureExecutor().submitProcedure( 1101 new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(), 1102 oldDescriptor, tableDescriptor, snapshot, restoreAcl), 1103 nonceKey); 1104 this.restoreTableToProcIdMap.put(tableName, procId); 1105 return procId; 1106 } catch (Exception e) { 1107 String msg = "Couldn't restore the snapshot=" 1108 + ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName; 1109 LOG.error(msg, e); 1110 throw new RestoreSnapshotException(msg, e); 1111 } 1112 } 1113 1114 /** 1115 * Verify if the restore of the specified table is in progress. 1116 * @param tableName table under restore 1117 * @return <tt>true</tt> if there is a restore in progress of the specified table. 1118 */ 1119 private synchronized boolean isRestoringTable(final TableName tableName) { 1120 Long procId = this.restoreTableToProcIdMap.get(tableName); 1121 if (procId == null) { 1122 return false; 1123 } 1124 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1125 if (procExec.isRunning() && !procExec.isFinished(procId)) { 1126 return true; 1127 } else { 1128 this.restoreTableToProcIdMap.remove(tableName); 1129 return false; 1130 } 1131 } 1132 1133 /** 1134 * Return the handler if it is currently live and has the same snapshot target name. The handler 1135 * is removed from the sentinels map if completed. 1136 * @param sentinels live handlers 1137 * @param snapshot snapshot description 1138 * @return null if doesn't match, else a live handler. 1139 */ 1140 private synchronized SnapshotSentinel removeSentinelIfFinished( 1141 final Map<TableName, SnapshotSentinel> sentinels, final SnapshotDescription snapshot) { 1142 if (!snapshot.hasTable()) { 1143 return null; 1144 } 1145 1146 TableName snapshotTable = TableName.valueOf(snapshot.getTable()); 1147 SnapshotSentinel h = sentinels.get(snapshotTable); 1148 if (h == null) { 1149 return null; 1150 } 1151 1152 if (!h.getSnapshot().getName().equals(snapshot.getName())) { 1153 // specified snapshot is to the one currently running 1154 return null; 1155 } 1156 1157 // Remove from the "in-progress" list once completed 1158 if (h.isFinished()) { 1159 sentinels.remove(snapshotTable); 1160 } 1161 1162 return h; 1163 } 1164 1165 /** 1166 * Removes "abandoned" snapshot/restore requests. As part of the HBaseAdmin snapshot/restore API 1167 * the operation status is checked until completed, and the in-progress maps are cleaned up when 1168 * the status of a completed task is requested. To avoid having sentinels staying around for long 1169 * time if something client side is failed, each operation tries to clean up the in-progress maps 1170 * sentinels finished from a long time. 1171 */ 1172 private void cleanupSentinels() { 1173 cleanupSentinels(this.snapshotHandlers); 1174 cleanupCompletedRestoreInMap(); 1175 cleanupCompletedSnapshotInMap(); 1176 } 1177 1178 /** 1179 * Remove the sentinels that are marked as finished and the completion time has exceeded the 1180 * removal timeout. 1181 * @param sentinels map of sentinels to clean 1182 */ 1183 private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) { 1184 long currentTime = EnvironmentEdgeManager.currentTime(); 1185 long sentinelsCleanupTimeoutMillis = 1186 master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS, 1187 SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT); 1188 Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator(); 1189 while (it.hasNext()) { 1190 Map.Entry<TableName, SnapshotSentinel> entry = it.next(); 1191 SnapshotSentinel sentinel = entry.getValue(); 1192 if ( 1193 sentinel.isFinished() 1194 && (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis 1195 ) { 1196 it.remove(); 1197 } 1198 } 1199 } 1200 1201 /** 1202 * Remove the procedures that are marked as finished 1203 */ 1204 private synchronized void cleanupCompletedRestoreInMap() { 1205 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1206 Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator(); 1207 while (it.hasNext()) { 1208 Map.Entry<TableName, Long> entry = it.next(); 1209 Long procId = entry.getValue(); 1210 if (procExec.isRunning() && procExec.isFinished(procId)) { 1211 it.remove(); 1212 } 1213 } 1214 } 1215 1216 /** 1217 * Remove the procedures that are marked as finished 1218 */ 1219 private synchronized void cleanupCompletedSnapshotInMap() { 1220 ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); 1221 Iterator<Map.Entry<SnapshotDescription, Long>> it = snapshotToProcIdMap.entrySet().iterator(); 1222 while (it.hasNext()) { 1223 Map.Entry<SnapshotDescription, Long> entry = it.next(); 1224 Long procId = entry.getValue(); 1225 if (procExec.isRunning() && procExec.isFinished(procId)) { 1226 it.remove(); 1227 } 1228 } 1229 } 1230 1231 // 1232 // Implementing Stoppable interface 1233 // 1234 1235 @Override 1236 public void stop(String why) { 1237 // short circuit 1238 if (this.stopped) return; 1239 // make sure we get stop 1240 this.stopped = true; 1241 // pass the stop onto take snapshot handlers 1242 for (SnapshotSentinel snapshotHandler : this.snapshotHandlers.values()) { 1243 snapshotHandler.cancel(why); 1244 } 1245 if (snapshotHandlerChoreCleanerTask != null) { 1246 snapshotHandlerChoreCleanerTask.cancel(true); 1247 } 1248 try { 1249 if (coordinator != null) { 1250 coordinator.close(); 1251 } 1252 } catch (IOException e) { 1253 LOG.error("stop ProcedureCoordinator error", e); 1254 } 1255 } 1256 1257 @Override 1258 public boolean isStopped() { 1259 return this.stopped; 1260 } 1261 1262 /** 1263 * Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported. 1264 * Called at the beginning of snapshot() and restoreSnapshot() methods. 1265 * @throws UnsupportedOperationException if snapshot are not supported 1266 */ 1267 public void checkSnapshotSupport() throws UnsupportedOperationException { 1268 if (!this.isSnapshotSupported) { 1269 throw new UnsupportedOperationException( 1270 "To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" 1271 + HBASE_SNAPSHOT_ENABLED + "' property with value 'true'."); 1272 } 1273 } 1274 1275 /** 1276 * Called at startup, to verify if snapshot operation is supported, and to avoid starting the 1277 * master if there're snapshots present but the cleaners needed are missing. Otherwise we can end 1278 * up with snapshot data loss. 1279 * @param conf The {@link Configuration} object to use 1280 * @param mfs The MasterFileSystem to use 1281 * @throws IOException in case of file-system operation failure 1282 * @throws UnsupportedOperationException in case cleaners are missing and there're snapshot in the 1283 * system 1284 */ 1285 private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs) 1286 throws IOException, UnsupportedOperationException { 1287 // Verify if snapshot is disabled by the user 1288 String enabled = conf.get(HBASE_SNAPSHOT_ENABLED); 1289 boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false); 1290 boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled); 1291 1292 // Extract cleaners from conf 1293 Set<String> hfileCleaners = new HashSet<>(); 1294 String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS); 1295 if (cleaners != null) Collections.addAll(hfileCleaners, cleaners); 1296 1297 Set<String> logCleaners = new HashSet<>(); 1298 cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS); 1299 if (cleaners != null) Collections.addAll(logCleaners, cleaners); 1300 1301 // check if an older version of snapshot directory was present 1302 Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME); 1303 FileSystem fs = mfs.getFileSystem(); 1304 List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false); 1305 if (ss != null && !ss.isEmpty()) { 1306 LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir); 1307 LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME); 1308 } 1309 1310 // If the user has enabled the snapshot, we force the cleaners to be present 1311 // otherwise we still need to check if cleaners are enabled or not and verify 1312 // that there're no snapshot in the .snapshot folder. 1313 if (snapshotEnabled) { 1314 // Inject snapshot cleaners, if snapshot.enable is true 1315 hfileCleaners.add(SnapshotHFileCleaner.class.getName()); 1316 hfileCleaners.add(HFileLinkCleaner.class.getName()); 1317 // If sync acl to HDFS feature is enabled, then inject the cleaner 1318 if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) { 1319 hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName()); 1320 } 1321 1322 // Set cleaners conf 1323 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 1324 hfileCleaners.toArray(new String[hfileCleaners.size()])); 1325 conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS, 1326 logCleaners.toArray(new String[logCleaners.size()])); 1327 } else { 1328 // There may be restore tables if snapshot is enabled and then disabled, so add 1329 // HFileLinkCleaner, see HBASE-26670 for more details. 1330 hfileCleaners.add(HFileLinkCleaner.class.getName()); 1331 conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, 1332 hfileCleaners.toArray(new String[hfileCleaners.size()])); 1333 // Verify if SnapshotHFileCleaner are present 1334 snapshotEnabled = hfileCleaners.contains(SnapshotHFileCleaner.class.getName()); 1335 1336 // Warn if the cleaners are enabled but the snapshot.enabled property is false/not set. 1337 if (snapshotEnabled) { 1338 LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " + "but the '" 1339 + HBASE_SNAPSHOT_ENABLED + "' property " 1340 + (userDisabled ? "is set to 'false'." : "is not set.")); 1341 } 1342 } 1343 1344 // Mark snapshot feature as enabled if cleaners are present and user has not disabled it. 1345 this.isSnapshotSupported = snapshotEnabled && !userDisabled; 1346 1347 // If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder 1348 // otherwise we end up with snapshot data loss. 1349 if (!snapshotEnabled) { 1350 LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners."); 1351 Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir()); 1352 if (fs.exists(snapshotDir)) { 1353 FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir, 1354 new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs)); 1355 if (snapshots != null) { 1356 LOG.error("Snapshots are present, but cleaners are not enabled."); 1357 checkSnapshotSupport(); 1358 } 1359 } 1360 } 1361 } 1362 1363 @Override 1364 public void initialize(MasterServices master, MetricsMaster metricsMaster) 1365 throws KeeperException, IOException, UnsupportedOperationException { 1366 this.master = master; 1367 1368 this.rootDir = master.getMasterFileSystem().getRootDir(); 1369 checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem()); 1370 1371 // get the configuration for the coordinator 1372 Configuration conf = master.getConfiguration(); 1373 long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT); 1374 long timeoutMillis = Math.max( 1375 conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS, 1376 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME), 1377 conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS, 1378 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME)); 1379 int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT); 1380 1381 // setup the default procedure coordinator 1382 String name = master.getServerName().toString(); 1383 ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); 1384 ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(master.getZooKeeper(), 1385 SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name); 1386 1387 this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); 1388 this.executorService = master.getExecutorService(); 1389 this.verifyWorkerAssigner = 1390 new WorkerAssigner(master, conf.getInt("hbase.snapshot.verify.task.max", 3), 1391 new ProcedureEvent<>("snapshot-verify-worker-assigning")); 1392 restoreUnfinishedSnapshotProcedure(); 1393 restoreWorkers(); 1394 resetTempDir(); 1395 snapshotHandlerChoreCleanerTask = 1396 scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS); 1397 } 1398 1399 private void restoreUnfinishedSnapshotProcedure() { 1400 master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream() 1401 .filter(p -> p instanceof SnapshotProcedure).filter(p -> !p.isFinished()) 1402 .map(p -> (SnapshotProcedure) p).forEach(p -> { 1403 registerSnapshotProcedure(p.getSnapshot(), p.getProcId()); 1404 LOG.info("restore unfinished snapshot procedure {}", p); 1405 }); 1406 } 1407 1408 @Override 1409 public String getProcedureSignature() { 1410 return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; 1411 } 1412 1413 @Override 1414 public void execProcedure(ProcedureDescription desc) throws IOException { 1415 takeSnapshot(toSnapshotDescription(desc)); 1416 } 1417 1418 @Override 1419 public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user) 1420 throws IOException { 1421 // Done by AccessController as part of preSnapshot coprocessor hook (legacy code path). 1422 // In future, when we AC is removed for good, that check should be moved here. 1423 } 1424 1425 @Override 1426 public boolean isProcedureDone(ProcedureDescription desc) throws IOException { 1427 return isSnapshotDone(toSnapshotDescription(desc)); 1428 } 1429 1430 private SnapshotDescription toSnapshotDescription(ProcedureDescription desc) throws IOException { 1431 SnapshotDescription.Builder builder = SnapshotDescription.newBuilder(); 1432 if (!desc.hasInstance()) { 1433 throw new IOException("Snapshot name is not defined: " + desc.toString()); 1434 } 1435 String snapshotName = desc.getInstance(); 1436 List<NameStringPair> props = desc.getConfigurationList(); 1437 String table = null; 1438 for (NameStringPair prop : props) { 1439 if ("table".equalsIgnoreCase(prop.getName())) { 1440 table = prop.getValue(); 1441 } 1442 } 1443 if (table == null) { 1444 throw new IOException("Snapshot table is not defined: " + desc.toString()); 1445 } 1446 TableName tableName = TableName.valueOf(table); 1447 builder.setTable(tableName.getNameAsString()); 1448 builder.setName(snapshotName); 1449 builder.setType(SnapshotDescription.Type.FLUSH); 1450 return builder.build(); 1451 } 1452 1453 public void registerSnapshotProcedure(SnapshotDescription snapshot, long procId) { 1454 snapshotToProcIdMap.put(snapshot, procId); 1455 LOG.debug("register snapshot={}, snapshot procedure id = {}", 1456 ClientSnapshotDescriptionUtils.toString(snapshot), procId); 1457 } 1458 1459 public void unregisterSnapshotProcedure(SnapshotDescription snapshot, long procId) { 1460 snapshotToProcIdMap.remove(snapshot, procId); 1461 LOG.debug("unregister snapshot={}, snapshot procedure id = {}", 1462 ClientSnapshotDescriptionUtils.toString(snapshot), procId); 1463 } 1464 1465 public boolean snapshotProcedureEnabled() { 1466 return master.getConfiguration().getBoolean(SNAPSHOT_PROCEDURE_ENABLED, 1467 SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 1468 } 1469 1470 public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) 1471 throws ProcedureSuspendedException { 1472 ServerName worker = verifyWorkerAssigner.acquire(procedure); 1473 LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker); 1474 return worker; 1475 } 1476 1477 public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) { 1478 LOG.debug("{} Release verify snapshot worker={}", procedure, worker); 1479 verifyWorkerAssigner.release(worker); 1480 } 1481 1482 private void restoreWorkers() { 1483 master.getMasterProcedureExecutor().getActiveProceduresNoCopy().stream() 1484 .filter(p -> p instanceof SnapshotVerifyProcedure).map(p -> (SnapshotVerifyProcedure) p) 1485 .filter(p -> !p.isFinished()).filter(p -> p.getServerName() != null).forEach(p -> { 1486 verifyWorkerAssigner.addUsedWorker(p.getServerName()); 1487 LOG.debug("{} restores used worker {}", p, p.getServerName()); 1488 }); 1489 } 1490 1491 public Integer getAvailableWorker(ServerName serverName) { 1492 return verifyWorkerAssigner.getAvailableWorker(serverName); 1493 } 1494}