001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.stream.Collectors; 025import java.util.stream.Stream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionReplicaUtil; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.client.TableState; 035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 036import org.apache.hadoop.hbase.master.MetricsSnapshot; 037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.monitoring.MonitoredTask; 043import org.apache.hadoop.hbase.monitoring.TaskMonitor; 044import org.apache.hadoop.hbase.procedure2.Procedure; 045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 047import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; 051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 052import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.ModifyRegionUtils; 055import org.apache.hadoop.hbase.util.RetryCounter; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 065 066/** 067 * A procedure used to take snapshot on tables. 068 */ 069@InterfaceAudience.Private 070public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> { 071 private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); 072 private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); 073 074 private Configuration conf; 075 private SnapshotDescription snapshot; 076 private Path rootDir; 077 private Path snapshotDir; 078 private Path workingDir; 079 private FileSystem workingDirFS; 080 private FileSystem rootFs; 081 private TableName snapshotTable; 082 private MonitoredTask status; 083 private SnapshotManifest snapshotManifest; 084 private TableDescriptor htd; 085 086 private RetryCounter retryCounter; 087 088 public SnapshotProcedure() { 089 } 090 091 public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { 092 super(env); 093 this.snapshot = snapshot; 094 } 095 096 @Override 097 public TableName getTableName() { 098 return TableName.valueOf(snapshot.getTable()); 099 } 100 101 @Override 102 public TableOperationType getTableOperationType() { 103 return TableOperationType.SNAPSHOT; 104 } 105 106 @Override 107 protected LockState acquireLock(MasterProcedureEnv env) { 108 // AbstractStateMachineTableProcedure acquires exclusive table lock by default, 109 // but we may need to downgrade it to shared lock for some reasons: 110 // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details. 111 // b. we want to support taking multiple different snapshots on same table on the same time. 112 if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) { 113 return LockState.LOCK_EVENT_WAIT; 114 } 115 return LockState.LOCK_ACQUIRED; 116 } 117 118 @Override 119 protected void releaseLock(MasterProcedureEnv env) { 120 env.getProcedureScheduler().wakeTableSharedLock(this, getTableName()); 121 } 122 123 @Override 124 protected boolean holdLock(MasterProcedureEnv env) { 125 // In order to avoid enabling/disabling/modifying/deleting table during snapshot, 126 // we don't release lock during suspend 127 return true; 128 } 129 130 @Override 131 protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) 132 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 133 LOG.info("{} execute state={}", this, state); 134 135 try { 136 switch (state) { 137 case SNAPSHOT_PREPARE: 138 prepareSnapshot(env); 139 setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); 140 return Flow.HAS_MORE_STATE; 141 case SNAPSHOT_PRE_OPERATION: 142 preSnapshot(env); 143 setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); 144 return Flow.HAS_MORE_STATE; 145 case SNAPSHOT_WRITE_SNAPSHOT_INFO: 146 SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); 147 TableState tableState = 148 env.getMasterServices().getTableStateManager().getTableState(snapshotTable); 149 if (tableState.isEnabled()) { 150 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); 151 } else if (tableState.isDisabled()) { 152 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); 153 } 154 return Flow.HAS_MORE_STATE; 155 case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: 156 addChildProcedure(createRemoteSnapshotProcedures(env)); 157 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); 158 return Flow.HAS_MORE_STATE; 159 case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: 160 snapshotSplitRegions(env); 161 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 162 return Flow.HAS_MORE_STATE; 163 case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: 164 snapshotClosedRegions(env); 165 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 166 return Flow.HAS_MORE_STATE; 167 case SNAPSHOT_SNAPSHOT_MOB_REGION: 168 snapshotMobRegion(env); 169 setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); 170 return Flow.HAS_MORE_STATE; 171 case SNAPSHOT_CONSOLIDATE_SNAPSHOT: 172 // flush the in-memory state, and write the single manifest 173 status.setStatus("Consolidate snapshot: " + snapshot.getName()); 174 snapshotManifest.consolidate(); 175 setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); 176 return Flow.HAS_MORE_STATE; 177 case SNAPSHOT_VERIFIER_SNAPSHOT: 178 status.setStatus("Verifying snapshot: " + snapshot.getName()); 179 verifySnapshot(env); 180 setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); 181 return Flow.HAS_MORE_STATE; 182 case SNAPSHOT_COMPLETE_SNAPSHOT: 183 if (isSnapshotCorrupted()) { 184 throw new CorruptedSnapshotException(snapshot.getName()); 185 } 186 completeSnapshot(env); 187 setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); 188 return Flow.HAS_MORE_STATE; 189 case SNAPSHOT_POST_OPERATION: 190 postSnapshot(env); 191 return Flow.NO_MORE_STATE; 192 default: 193 throw new UnsupportedOperationException("unhandled state=" + state); 194 } 195 } catch (ProcedureSuspendedException e) { 196 throw e; 197 } catch (Exception e) { 198 setFailure("master-snapshot", e); 199 LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); 200 status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); 201 return Flow.NO_MORE_STATE; 202 } 203 } 204 205 @Override 206 protected void rollbackState(MasterProcedureEnv env, SnapshotState state) 207 throws IOException, InterruptedException { 208 if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { 209 try { 210 if (!workingDirFS.delete(workingDir, true)) { 211 LOG.error("Couldn't delete snapshot working directory {}", workingDir); 212 } 213 } catch (IOException e) { 214 LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); 215 } 216 } 217 } 218 219 @Override 220 protected boolean isRollbackSupported(SnapshotState state) { 221 return true; 222 } 223 224 @Override 225 protected SnapshotState getState(final int stateId) { 226 return SnapshotState.forNumber(stateId); 227 } 228 229 @Override 230 protected int getStateId(SnapshotState state) { 231 return state.getNumber(); 232 } 233 234 @Override 235 protected SnapshotState getInitialState() { 236 return SnapshotState.SNAPSHOT_PREPARE; 237 } 238 239 private void prepareSnapshot(MasterProcedureEnv env) 240 throws ProcedureSuspendedException, IOException { 241 if (isAnySplitOrMergeProcedureRunning(env)) { 242 if (retryCounter == null) { 243 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 244 } 245 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 246 LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); 247 setTimeout(Math.toIntExact(backoff)); 248 setState(ProcedureState.WAITING_TIMEOUT); 249 skipPersistence(); 250 throw new ProcedureSuspendedException(); 251 } 252 prepareSnapshotEnv(env); 253 } 254 255 private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { 256 this.conf = env.getMasterConfiguration(); 257 this.snapshotTable = TableName.valueOf(snapshot.getTable()); 258 this.htd = loadTableDescriptorSnapshot(env); 259 this.rootFs = env.getMasterFileSystem().getFileSystem(); 260 this.rootDir = CommonFSUtils.getRootDir(conf); 261 this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 262 this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); 263 this.workingDirFS = workingDir.getFileSystem(conf); 264 this.status = TaskMonitor.get() 265 .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); 266 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); 267 this.snapshotManifest = 268 SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); 269 this.snapshotManifest.addTableDescriptor(htd); 270 } 271 272 @Override 273 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 274 setState(ProcedureState.RUNNABLE); 275 env.getProcedureScheduler().addFront(this); 276 return false; 277 } 278 279 private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { 280 return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() 281 .filter(p -> !p.isFinished()) 282 .filter( 283 p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure) 284 .anyMatch( 285 p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName())); 286 } 287 288 private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { 289 TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); 290 if (htd == null) { 291 throw new IOException("TableDescriptor missing for " + snapshotTable); 292 } 293 if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { 294 return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, 295 Long.toString(this.snapshot.getMaxFileSize())).build(); 296 } 297 return htd; 298 } 299 300 private void preSnapshot(MasterProcedureEnv env) throws IOException { 301 env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); 302 } 303 304 private void postSnapshot(MasterProcedureEnv env) throws IOException { 305 SnapshotManager sm = env.getMasterServices().getSnapshotManager(); 306 if (sm != null) { 307 sm.unregisterSnapshotProcedure(snapshot, getProcId()); 308 } 309 } 310 311 private void verifySnapshot(MasterProcedureEnv env) throws IOException { 312 int verifyThreshold = 313 env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000); 314 List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false) 315 .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); 316 int numRegions = regions.size(); 317 318 MasterSnapshotVerifier verifier = 319 new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); 320 if (numRegions >= verifyThreshold) { 321 verifier.verifySnapshot(workingDir, false); 322 addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r)) 323 .toArray(SnapshotVerifyProcedure[]::new)); 324 } else { 325 verifier.verifySnapshot(workingDir, true); 326 } 327 } 328 329 private void completeSnapshot(MasterProcedureEnv env) throws IOException { 330 // complete the snapshot, atomically moving from tmp to .snapshot dir. 331 SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, 332 env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); 333 // update metric. when master restarts, the metric value is wrong 334 metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); 335 if (env.getMasterCoprocessorHost() != null) { 336 env.getMasterCoprocessorHost() 337 .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); 338 } 339 status.markComplete("Snapshot " + snapshot.getName() + " completed"); 340 } 341 342 private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { 343 List<RegionInfo> regions = 344 getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList()); 345 snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); 346 } 347 348 private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { 349 List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList()); 350 snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); 351 } 352 353 private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) { 354 return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream() 355 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)); 356 } 357 358 private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions, 359 String threadPoolName) throws IOException { 360 ThreadPoolExecutor exec = 361 SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName); 362 try { 363 ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { 364 @Override 365 public void editRegion(final RegionInfo region) throws IOException { 366 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 367 LOG.info("take snapshot region={}, table={}", region, snapshotTable); 368 } 369 }); 370 } finally { 371 exec.shutdown(); 372 } 373 status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); 374 } 375 376 private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { 377 if (!MobUtils.hasMobColumns(htd)) { 378 return; 379 } 380 ThreadPoolExecutor exec = 381 SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); 382 RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); 383 try { 384 ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), 385 new ModifyRegionUtils.RegionEditTask() { 386 @Override 387 public void editRegion(final RegionInfo region) throws IOException { 388 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 389 } 390 }); 391 } finally { 392 exec.shutdown(); 393 } 394 status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); 395 } 396 397 @Override 398 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 399 super.serializeStateData(serializer); 400 serializer 401 .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build()); 402 } 403 404 @Override 405 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 406 super.deserializeStateData(serializer); 407 SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); 408 this.snapshot = data.getSnapshot(); 409 } 410 411 private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { 412 return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream() 413 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 414 .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new); 415 } 416 417 @Override 418 public void toStringClassDetails(StringBuilder builder) { 419 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=") 420 .append(ClientSnapshotDescriptionUtils.toString(snapshot)); 421 } 422 423 public SnapshotDescription getSnapshotDesc() { 424 return snapshot; 425 } 426 427 @Override 428 protected void afterReplay(MasterProcedureEnv env) { 429 if (getCurrentState() == getInitialState()) { 430 // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). 431 return; 432 } 433 try { 434 prepareSnapshotEnv(env); 435 boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, 436 SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 437 if (!snapshotProcedureEnabled) { 438 throw new IOException("SnapshotProcedure is DISABLED"); 439 } 440 } catch (IOException e) { 441 LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); 442 setFailure("master-snapshot", e); 443 } 444 } 445 446 public SnapshotDescription getSnapshot() { 447 return snapshot; 448 } 449 450 public synchronized void markSnapshotCorrupted() throws IOException { 451 Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); 452 if (!workingDirFS.exists(flagFile)) { 453 workingDirFS.create(flagFile).close(); 454 LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); 455 } 456 } 457 458 public boolean isSnapshotCorrupted() throws IOException { 459 return workingDirFS 460 .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir)); 461 } 462}