001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.stream.Collectors; 025import java.util.stream.Stream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionReplicaUtil; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.client.TableState; 035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 036import org.apache.hadoop.hbase.master.MasterCoprocessorHost; 037import org.apache.hadoop.hbase.master.MetricsSnapshot; 038import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 039import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; 040import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 041import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 042import org.apache.hadoop.hbase.mob.MobUtils; 043import org.apache.hadoop.hbase.monitoring.MonitoredTask; 044import org.apache.hadoop.hbase.monitoring.TaskMonitor; 045import org.apache.hadoop.hbase.procedure2.Procedure; 046import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 047import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 048import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 049import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 050import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 051import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; 052import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 053import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.ModifyRegionUtils; 056import org.apache.hadoop.hbase.util.RetryCounter; 057import org.apache.yetus.audience.InterfaceAudience; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 066 067/** 068 * A procedure used to take snapshot on tables. 069 */ 070@InterfaceAudience.Private 071public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> { 072 private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); 073 private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); 074 075 private Configuration conf; 076 private SnapshotDescription snapshot; 077 private Path rootDir; 078 private Path snapshotDir; 079 private Path workingDir; 080 private FileSystem workingDirFS; 081 private FileSystem rootFs; 082 private TableName snapshotTable; 083 private MonitoredTask status; 084 private SnapshotManifest snapshotManifest; 085 private TableDescriptor htd; 086 087 private RetryCounter retryCounter; 088 089 public SnapshotProcedure() { 090 } 091 092 public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { 093 super(env); 094 this.snapshot = snapshot; 095 } 096 097 @Override 098 public TableName getTableName() { 099 return TableName.valueOf(snapshot.getTable()); 100 } 101 102 @Override 103 public TableOperationType getTableOperationType() { 104 return TableOperationType.SNAPSHOT; 105 } 106 107 @Override 108 protected LockState acquireLock(MasterProcedureEnv env) { 109 // AbstractStateMachineTableProcedure acquires exclusive table lock by default, 110 // but we may need to downgrade it to shared lock for some reasons: 111 // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. 112 // b. we want to support taking multiple different snapshots on same table on the same time. 113 if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { 114 return LockState.LOCK_EVENT_WAIT; 115 } 116 return LockState.LOCK_ACQUIRED; 117 } 118 119 @Override 120 protected void releaseLock(MasterProcedureEnv env) { 121 env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); 122 } 123 124 @Override 125 protected boolean holdLock(MasterProcedureEnv env) { 126 // In order to avoid enabling/disabling/modifying/deleting table during snapshot, 127 // we don't release lock during suspend 128 return true; 129 } 130 131 @Override 132 protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) 133 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 134 LOG.info("{} execute state={}", this, state); 135 136 try { 137 switch (state) { 138 case SNAPSHOT_PREPARE: 139 prepareSnapshot(env); 140 setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); 141 return Flow.HAS_MORE_STATE; 142 case SNAPSHOT_PRE_OPERATION: 143 preSnapshot(env); 144 setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); 145 return Flow.HAS_MORE_STATE; 146 case SNAPSHOT_WRITE_SNAPSHOT_INFO: 147 SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); 148 TableState tableState = 149 env.getMasterServices().getTableStateManager().getTableState(snapshotTable); 150 if (tableState.isEnabled()) { 151 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); 152 } else if (tableState.isDisabled()) { 153 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); 154 } 155 return Flow.HAS_MORE_STATE; 156 case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: 157 addChildProcedure(createRemoteSnapshotProcedures(env)); 158 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); 159 return Flow.HAS_MORE_STATE; 160 case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: 161 snapshotSplitRegions(env); 162 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 163 return Flow.HAS_MORE_STATE; 164 case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: 165 snapshotClosedRegions(env); 166 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 167 return Flow.HAS_MORE_STATE; 168 case SNAPSHOT_SNAPSHOT_MOB_REGION: 169 snapshotMobRegion(env); 170 setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); 171 return Flow.HAS_MORE_STATE; 172 case SNAPSHOT_CONSOLIDATE_SNAPSHOT: 173 // flush the in-memory state, and write the single manifest 174 status.setStatus("Consolidate snapshot: " + snapshot.getName()); 175 snapshotManifest.consolidate(); 176 setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); 177 return Flow.HAS_MORE_STATE; 178 case SNAPSHOT_VERIFIER_SNAPSHOT: 179 status.setStatus("Verifying snapshot: " + snapshot.getName()); 180 verifySnapshot(env); 181 setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); 182 return Flow.HAS_MORE_STATE; 183 case SNAPSHOT_COMPLETE_SNAPSHOT: 184 if (isSnapshotCorrupted()) { 185 throw new CorruptedSnapshotException(snapshot.getName()); 186 } 187 completeSnapshot(env); 188 setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); 189 return Flow.HAS_MORE_STATE; 190 case SNAPSHOT_POST_OPERATION: 191 postSnapshot(env); 192 return Flow.NO_MORE_STATE; 193 default: 194 throw new UnsupportedOperationException("unhandled state=" + state); 195 } 196 } catch (ProcedureSuspendedException e) { 197 throw e; 198 } catch (Exception e) { 199 setFailure("master-snapshot", e); 200 LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); 201 status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); 202 return Flow.NO_MORE_STATE; 203 } 204 } 205 206 @Override 207 protected void rollbackState(MasterProcedureEnv env, SnapshotState state) 208 throws IOException, InterruptedException { 209 if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { 210 try { 211 if (!workingDirFS.delete(workingDir, true)) { 212 LOG.error("Couldn't delete snapshot working directory {}", workingDir); 213 } 214 } catch (IOException e) { 215 LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); 216 } 217 } 218 } 219 220 @Override 221 protected boolean isRollbackSupported(SnapshotState state) { 222 return true; 223 } 224 225 @Override 226 protected SnapshotState getState(final int stateId) { 227 return SnapshotState.forNumber(stateId); 228 } 229 230 @Override 231 protected int getStateId(SnapshotState state) { 232 return state.getNumber(); 233 } 234 235 @Override 236 protected SnapshotState getInitialState() { 237 return SnapshotState.SNAPSHOT_PREPARE; 238 } 239 240 private void prepareSnapshot(MasterProcedureEnv env) 241 throws ProcedureSuspendedException, IOException { 242 if (isAnySplitOrMergeProcedureRunning(env)) { 243 if (retryCounter == null) { 244 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 245 } 246 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 247 LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); 248 setTimeout(Math.toIntExact(backoff)); 249 setState(ProcedureState.WAITING_TIMEOUT); 250 skipPersistence(); 251 throw new ProcedureSuspendedException(); 252 } 253 prepareSnapshotEnv(env); 254 } 255 256 private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { 257 this.conf = env.getMasterConfiguration(); 258 this.snapshotTable = TableName.valueOf(snapshot.getTable()); 259 this.htd = loadTableDescriptorSnapshot(env); 260 this.rootFs = env.getMasterFileSystem().getFileSystem(); 261 this.rootDir = CommonFSUtils.getRootDir(conf); 262 this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 263 this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); 264 this.workingDirFS = workingDir.getFileSystem(conf); 265 this.status = TaskMonitor.get() 266 .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); 267 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); 268 this.snapshotManifest = 269 SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); 270 this.snapshotManifest.addTableDescriptor(htd); 271 } 272 273 @Override 274 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 275 setState(ProcedureState.RUNNABLE); 276 env.getProcedureScheduler().addFront(this); 277 return false; 278 } 279 280 private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { 281 return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() 282 .filter(p -> !p.isFinished()) 283 .filter( 284 p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure) 285 .anyMatch( 286 p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName())); 287 } 288 289 private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { 290 TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); 291 if (htd == null) { 292 throw new IOException("TableDescriptor missing for " + snapshotTable); 293 } 294 if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { 295 return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, 296 Long.toString(this.snapshot.getMaxFileSize())).build(); 297 } 298 return htd; 299 } 300 301 private void preSnapshot(MasterProcedureEnv env) throws IOException { 302 env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); 303 304 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 305 if (cpHost != null) { 306 cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser()); 307 } 308 } 309 310 private void postSnapshot(MasterProcedureEnv env) throws IOException { 311 SnapshotManager sm = env.getMasterServices().getSnapshotManager(); 312 if (sm != null) { 313 sm.unregisterSnapshotProcedure(snapshot, getProcId()); 314 } 315 316 MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); 317 if (cpHost != null) { 318 cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser()); 319 } 320 } 321 322 private void verifySnapshot(MasterProcedureEnv env) throws IOException { 323 int verifyThreshold = 324 env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000); 325 List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false) 326 .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); 327 int numRegions = regions.size(); 328 329 MasterSnapshotVerifier verifier = 330 new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); 331 if (numRegions >= verifyThreshold) { 332 verifier.verifySnapshot(workingDir, false); 333 addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r)) 334 .toArray(SnapshotVerifyProcedure[]::new)); 335 } else { 336 verifier.verifySnapshot(workingDir, true); 337 } 338 } 339 340 private void completeSnapshot(MasterProcedureEnv env) throws IOException { 341 // complete the snapshot, atomically moving from tmp to .snapshot dir. 342 SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, 343 env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); 344 // update metric. when master restarts, the metric value is wrong 345 metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); 346 if (env.getMasterCoprocessorHost() != null) { 347 env.getMasterCoprocessorHost() 348 .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); 349 } 350 status.markComplete("Snapshot " + snapshot.getName() + " completed"); 351 } 352 353 private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { 354 List<RegionInfo> regions = 355 getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList()); 356 snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); 357 } 358 359 private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { 360 List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList()); 361 snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); 362 } 363 364 private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) { 365 return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream() 366 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)); 367 } 368 369 private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions, 370 String threadPoolName) throws IOException { 371 ThreadPoolExecutor exec = 372 SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName); 373 try { 374 ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { 375 @Override 376 public void editRegion(final RegionInfo region) throws IOException { 377 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 378 LOG.info("take snapshot region={}, table={}", region, snapshotTable); 379 } 380 }); 381 } finally { 382 exec.shutdown(); 383 } 384 status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); 385 } 386 387 private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { 388 if (!MobUtils.hasMobColumns(htd)) { 389 return; 390 } 391 ThreadPoolExecutor exec = 392 SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); 393 RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); 394 try { 395 ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), 396 new ModifyRegionUtils.RegionEditTask() { 397 @Override 398 public void editRegion(final RegionInfo region) throws IOException { 399 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 400 } 401 }); 402 } finally { 403 exec.shutdown(); 404 } 405 status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); 406 } 407 408 @Override 409 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 410 super.serializeStateData(serializer); 411 serializer 412 .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build()); 413 } 414 415 @Override 416 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 417 super.deserializeStateData(serializer); 418 SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); 419 this.snapshot = data.getSnapshot(); 420 } 421 422 private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { 423 return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream() 424 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 425 .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new); 426 } 427 428 @Override 429 public void toStringClassDetails(StringBuilder builder) { 430 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=") 431 .append(ClientSnapshotDescriptionUtils.toString(snapshot)); 432 } 433 434 public SnapshotDescription getSnapshotDesc() { 435 return snapshot; 436 } 437 438 @Override 439 protected void afterReplay(MasterProcedureEnv env) { 440 if (getCurrentState() == getInitialState()) { 441 // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). 442 return; 443 } 444 try { 445 prepareSnapshotEnv(env); 446 boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, 447 SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 448 if (!snapshotProcedureEnabled) { 449 throw new IOException("SnapshotProcedure is DISABLED"); 450 } 451 } catch (IOException e) { 452 LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); 453 setFailure("master-snapshot", e); 454 } 455 } 456 457 public SnapshotDescription getSnapshot() { 458 return snapshot; 459 } 460 461 public synchronized void markSnapshotCorrupted() throws IOException { 462 Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); 463 if (!workingDirFS.exists(flagFile)) { 464 workingDirFS.create(flagFile).close(); 465 LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); 466 } 467 } 468 469 public boolean isSnapshotCorrupted() throws IOException { 470 return workingDirFS 471 .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir)); 472 } 473}