001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.stream.Collectors; 025import java.util.stream.Stream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionReplicaUtil; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.client.TableState; 035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 036import org.apache.hadoop.hbase.master.MetricsSnapshot; 037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.monitoring.MonitoredTask; 043import org.apache.hadoop.hbase.monitoring.TaskMonitor; 044import org.apache.hadoop.hbase.procedure2.Procedure; 045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 047import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; 051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 052import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 053import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.ModifyRegionUtils; 057import org.apache.hadoop.hbase.util.RetryCounter; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 067 068/** 069 * A procedure used to take snapshot on tables. 070 */ 071@InterfaceAudience.Private 072public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> { 073 private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); 074 private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); 075 076 private Configuration conf; 077 private SnapshotDescription snapshot; 078 private Path rootDir; 079 private Path snapshotDir; 080 private Path workingDir; 081 private FileSystem workingDirFS; 082 private FileSystem rootFs; 083 private TableName snapshotTable; 084 private MonitoredTask status; 085 private SnapshotManifest snapshotManifest; 086 private TableDescriptor htd; 087 088 private RetryCounter retryCounter; 089 090 public SnapshotProcedure() { 091 } 092 093 public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { 094 super(env); 095 this.snapshot = snapshot; 096 } 097 098 @Override 099 public TableName getTableName() { 100 return TableName.valueOf(snapshot.getTable()); 101 } 102 103 @Override 104 public TableOperationType getTableOperationType() { 105 return TableOperationType.SNAPSHOT; 106 } 107 108 @Override 109 protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) 110 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 111 LOG.info("{} execute state={}", this, state); 112 113 try { 114 switch (state) { 115 case SNAPSHOT_PREPARE: 116 prepareSnapshot(env); 117 setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); 118 return Flow.HAS_MORE_STATE; 119 case SNAPSHOT_PRE_OPERATION: 120 preSnapshot(env); 121 setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); 122 return Flow.HAS_MORE_STATE; 123 case SNAPSHOT_WRITE_SNAPSHOT_INFO: 124 SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); 125 TableState tableState = 126 env.getMasterServices().getTableStateManager().getTableState(snapshotTable); 127 if (tableState.isEnabled()) { 128 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); 129 } else if (tableState.isDisabled()) { 130 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); 131 } 132 return Flow.HAS_MORE_STATE; 133 case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: 134 addChildProcedure(createRemoteSnapshotProcedures(env)); 135 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); 136 return Flow.HAS_MORE_STATE; 137 case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: 138 snapshotSplitRegions(env); 139 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 140 return Flow.HAS_MORE_STATE; 141 case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: 142 snapshotClosedRegions(env); 143 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 144 return Flow.HAS_MORE_STATE; 145 case SNAPSHOT_SNAPSHOT_MOB_REGION: 146 snapshotMobRegion(env); 147 setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); 148 return Flow.HAS_MORE_STATE; 149 case SNAPSHOT_CONSOLIDATE_SNAPSHOT: 150 // flush the in-memory state, and write the single manifest 151 status.setStatus("Consolidate snapshot: " + snapshot.getName()); 152 snapshotManifest.consolidate(); 153 setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); 154 return Flow.HAS_MORE_STATE; 155 case SNAPSHOT_VERIFIER_SNAPSHOT: 156 status.setStatus("Verifying snapshot: " + snapshot.getName()); 157 verifySnapshot(env); 158 setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); 159 return Flow.HAS_MORE_STATE; 160 case SNAPSHOT_COMPLETE_SNAPSHOT: 161 if (isSnapshotCorrupted()) { 162 throw new CorruptedSnapshotException(snapshot.getName()); 163 } 164 if ( 165 SnapshotDescriptionUtils.isExpiredSnapshot(snapshot.getTtl(), 166 snapshot.getCreationTime(), EnvironmentEdgeManager.currentTime()) 167 ) { 168 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshot)); 169 } 170 completeSnapshot(env); 171 setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); 172 return Flow.HAS_MORE_STATE; 173 case SNAPSHOT_POST_OPERATION: 174 postSnapshot(env); 175 return Flow.NO_MORE_STATE; 176 default: 177 throw new UnsupportedOperationException("unhandled state=" + state); 178 } 179 } catch (ProcedureSuspendedException e) { 180 throw e; 181 } catch (Exception e) { 182 setFailure("master-snapshot", e); 183 LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); 184 status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); 185 return Flow.NO_MORE_STATE; 186 } 187 } 188 189 @Override 190 protected void rollbackState(MasterProcedureEnv env, SnapshotState state) 191 throws IOException, InterruptedException { 192 if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { 193 try { 194 if (!workingDirFS.delete(workingDir, true)) { 195 LOG.error("Couldn't delete snapshot working directory {}", workingDir); 196 } 197 } catch (IOException e) { 198 LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); 199 } 200 } 201 } 202 203 @Override 204 protected boolean isRollbackSupported(SnapshotState state) { 205 return true; 206 } 207 208 @Override 209 protected SnapshotState getState(final int stateId) { 210 return SnapshotState.forNumber(stateId); 211 } 212 213 @Override 214 protected int getStateId(SnapshotState state) { 215 return state.getNumber(); 216 } 217 218 @Override 219 protected SnapshotState getInitialState() { 220 return SnapshotState.SNAPSHOT_PREPARE; 221 } 222 223 private void prepareSnapshot(MasterProcedureEnv env) 224 throws ProcedureSuspendedException, IOException { 225 if (isAnySplitOrMergeProcedureRunning(env)) { 226 if (retryCounter == null) { 227 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 228 } 229 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 230 LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); 231 setTimeout(Math.toIntExact(backoff)); 232 setState(ProcedureState.WAITING_TIMEOUT); 233 skipPersistence(); 234 throw new ProcedureSuspendedException(); 235 } 236 prepareSnapshotEnv(env); 237 } 238 239 private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { 240 this.conf = env.getMasterConfiguration(); 241 this.snapshotTable = TableName.valueOf(snapshot.getTable()); 242 this.htd = loadTableDescriptorSnapshot(env); 243 this.rootFs = env.getMasterFileSystem().getFileSystem(); 244 this.rootDir = CommonFSUtils.getRootDir(conf); 245 this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 246 this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); 247 this.workingDirFS = workingDir.getFileSystem(conf); 248 this.status = TaskMonitor.get() 249 .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); 250 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); 251 this.snapshotManifest = 252 SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); 253 this.snapshotManifest.addTableDescriptor(htd); 254 } 255 256 @Override 257 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 258 setState(ProcedureState.RUNNABLE); 259 env.getProcedureScheduler().addFront(this); 260 return false; 261 } 262 263 private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { 264 return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() 265 .filter(p -> !p.isFinished()) 266 .filter( 267 p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure) 268 .anyMatch( 269 p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName())); 270 } 271 272 private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { 273 TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); 274 if (htd == null) { 275 throw new IOException("TableDescriptor missing for " + snapshotTable); 276 } 277 if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { 278 return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, 279 Long.toString(this.snapshot.getMaxFileSize())).build(); 280 } 281 return htd; 282 } 283 284 private void preSnapshot(MasterProcedureEnv env) throws IOException { 285 env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); 286 } 287 288 private void postSnapshot(MasterProcedureEnv env) throws IOException { 289 SnapshotManager sm = env.getMasterServices().getSnapshotManager(); 290 if (sm != null) { 291 sm.unregisterSnapshotProcedure(snapshot, getProcId()); 292 } 293 } 294 295 private void verifySnapshot(MasterProcedureEnv env) throws IOException { 296 int verifyThreshold = 297 env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000); 298 List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false) 299 .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); 300 int numRegions = regions.size(); 301 302 MasterSnapshotVerifier verifier = 303 new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); 304 if (numRegions >= verifyThreshold) { 305 verifier.verifySnapshot(workingDir, false); 306 addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r)) 307 .toArray(SnapshotVerifyProcedure[]::new)); 308 } else { 309 verifier.verifySnapshot(workingDir, true); 310 } 311 } 312 313 private void completeSnapshot(MasterProcedureEnv env) throws IOException { 314 // complete the snapshot, atomically moving from tmp to .snapshot dir. 315 SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, 316 env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); 317 // update metric. when master restarts, the metric value is wrong 318 metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); 319 if (env.getMasterCoprocessorHost() != null) { 320 env.getMasterCoprocessorHost() 321 .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); 322 } 323 status.markComplete("Snapshot " + snapshot.getName() + " completed"); 324 } 325 326 private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { 327 List<RegionInfo> regions = 328 getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList()); 329 snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); 330 } 331 332 private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { 333 List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList()); 334 snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); 335 } 336 337 private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) { 338 return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream() 339 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)); 340 } 341 342 private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions, 343 String threadPoolName) throws IOException { 344 ThreadPoolExecutor exec = 345 SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName); 346 try { 347 ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { 348 @Override 349 public void editRegion(final RegionInfo region) throws IOException { 350 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 351 LOG.info("take snapshot region={}, table={}", region, snapshotTable); 352 } 353 }); 354 } finally { 355 exec.shutdown(); 356 } 357 status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); 358 } 359 360 private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { 361 if (!MobUtils.hasMobColumns(htd)) { 362 return; 363 } 364 ThreadPoolExecutor exec = 365 SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); 366 RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); 367 try { 368 ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), 369 new ModifyRegionUtils.RegionEditTask() { 370 @Override 371 public void editRegion(final RegionInfo region) throws IOException { 372 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 373 } 374 }); 375 } finally { 376 exec.shutdown(); 377 } 378 status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); 379 } 380 381 @Override 382 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 383 super.serializeStateData(serializer); 384 serializer 385 .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build()); 386 } 387 388 @Override 389 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 390 super.deserializeStateData(serializer); 391 SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); 392 this.snapshot = data.getSnapshot(); 393 } 394 395 private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { 396 return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream() 397 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 398 .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new); 399 } 400 401 @Override 402 public void toStringClassDetails(StringBuilder builder) { 403 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=") 404 .append(ClientSnapshotDescriptionUtils.toString(snapshot)); 405 } 406 407 public SnapshotDescription getSnapshotDesc() { 408 return snapshot; 409 } 410 411 @Override 412 protected void afterReplay(MasterProcedureEnv env) { 413 if (getCurrentState() == getInitialState()) { 414 // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). 415 return; 416 } 417 try { 418 prepareSnapshotEnv(env); 419 boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, 420 SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 421 if (!snapshotProcedureEnabled) { 422 throw new IOException("SnapshotProcedure is DISABLED"); 423 } 424 } catch (IOException e) { 425 LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); 426 setFailure("master-snapshot", e); 427 } 428 } 429 430 public SnapshotDescription getSnapshot() { 431 return snapshot; 432 } 433 434 public synchronized void markSnapshotCorrupted() throws IOException { 435 Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); 436 if (!workingDirFS.exists(flagFile)) { 437 workingDirFS.create(flagFile).close(); 438 LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); 439 } 440 } 441 442 public boolean isSnapshotCorrupted() throws IOException { 443 return workingDirFS 444 .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir)); 445 } 446}