001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.stream.Collectors; 025import java.util.stream.Stream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionReplicaUtil; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.client.TableState; 035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 036import org.apache.hadoop.hbase.master.MetricsSnapshot; 037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.monitoring.MonitoredTask; 043import org.apache.hadoop.hbase.monitoring.TaskMonitor; 044import org.apache.hadoop.hbase.procedure2.Procedure; 045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 047import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; 051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 052import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 053import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException; 054import org.apache.hadoop.hbase.util.CommonFSUtils; 055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 056import org.apache.hadoop.hbase.util.ModifyRegionUtils; 057import org.apache.hadoop.hbase.util.RetryCounter; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type; 068 069/** 070 * A procedure used to take snapshot on tables. 071 */ 072@InterfaceAudience.Private 073public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> { 074 private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); 075 private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); 076 077 private Configuration conf; 078 private SnapshotDescription snapshot; 079 private Path rootDir; 080 private Path snapshotDir; 081 private Path workingDir; 082 private FileSystem workingDirFS; 083 private FileSystem rootFs; 084 private TableName snapshotTable; 085 private MonitoredTask status; 086 private SnapshotManifest snapshotManifest; 087 private TableDescriptor htd; 088 089 private RetryCounter retryCounter; 090 091 public SnapshotProcedure() { 092 } 093 094 public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { 095 super(env); 096 this.snapshot = snapshot; 097 } 098 099 @Override 100 public TableName getTableName() { 101 return TableName.valueOf(snapshot.getTable()); 102 } 103 104 @Override 105 public TableOperationType getTableOperationType() { 106 return TableOperationType.SNAPSHOT; 107 } 108 109 @Override 110 protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) 111 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 112 LOG.info("{} execute state={}", this, state); 113 114 try { 115 switch (state) { 116 case SNAPSHOT_PREPARE: 117 prepareSnapshot(env); 118 setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); 119 return Flow.HAS_MORE_STATE; 120 case SNAPSHOT_PRE_OPERATION: 121 preSnapshot(env); 122 setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); 123 return Flow.HAS_MORE_STATE; 124 case SNAPSHOT_WRITE_SNAPSHOT_INFO: 125 TableState tableState = 126 env.getMasterServices().getTableStateManager().getTableState(snapshotTable); 127 if (tableState.isEnabled()) { 128 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); 129 } else if (tableState.isDisabled()) { 130 // Set the snapshot type to DISABLED as the table is in DISABLED state 131 snapshot = snapshot.toBuilder().setType(Type.DISABLED).build(); 132 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); 133 } 134 SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); 135 return Flow.HAS_MORE_STATE; 136 case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: 137 addChildProcedure(createRemoteSnapshotProcedures(env)); 138 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); 139 return Flow.HAS_MORE_STATE; 140 case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: 141 snapshotSplitRegions(env); 142 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 143 return Flow.HAS_MORE_STATE; 144 case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: 145 snapshotClosedRegions(env); 146 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 147 return Flow.HAS_MORE_STATE; 148 case SNAPSHOT_SNAPSHOT_MOB_REGION: 149 snapshotMobRegion(env); 150 setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); 151 return Flow.HAS_MORE_STATE; 152 case SNAPSHOT_CONSOLIDATE_SNAPSHOT: 153 // flush the in-memory state, and write the single manifest 154 status.setStatus("Consolidate snapshot: " + snapshot.getName()); 155 snapshotManifest.consolidate(); 156 setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); 157 return Flow.HAS_MORE_STATE; 158 case SNAPSHOT_VERIFIER_SNAPSHOT: 159 status.setStatus("Verifying snapshot: " + snapshot.getName()); 160 verifySnapshot(env); 161 setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); 162 return Flow.HAS_MORE_STATE; 163 case SNAPSHOT_COMPLETE_SNAPSHOT: 164 if (isSnapshotCorrupted()) { 165 throw new CorruptedSnapshotException(snapshot.getName()); 166 } 167 if ( 168 SnapshotDescriptionUtils.isExpiredSnapshot(snapshot.getTtl(), 169 snapshot.getCreationTime(), EnvironmentEdgeManager.currentTime()) 170 ) { 171 throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshot)); 172 } 173 completeSnapshot(env); 174 setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); 175 return Flow.HAS_MORE_STATE; 176 case SNAPSHOT_POST_OPERATION: 177 postSnapshot(env); 178 return Flow.NO_MORE_STATE; 179 default: 180 throw new UnsupportedOperationException("unhandled state=" + state); 181 } 182 } catch (ProcedureSuspendedException e) { 183 throw e; 184 } catch (Exception e) { 185 setFailure("master-snapshot", e); 186 LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); 187 status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); 188 return Flow.NO_MORE_STATE; 189 } 190 } 191 192 @Override 193 protected void rollbackState(MasterProcedureEnv env, SnapshotState state) 194 throws IOException, InterruptedException { 195 if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { 196 try { 197 if (!workingDirFS.delete(workingDir, true)) { 198 LOG.error("Couldn't delete snapshot working directory {}", workingDir); 199 } 200 } catch (IOException e) { 201 LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); 202 } 203 } 204 } 205 206 @Override 207 protected boolean isRollbackSupported(SnapshotState state) { 208 return true; 209 } 210 211 @Override 212 protected SnapshotState getState(final int stateId) { 213 return SnapshotState.forNumber(stateId); 214 } 215 216 @Override 217 protected int getStateId(SnapshotState state) { 218 return state.getNumber(); 219 } 220 221 @Override 222 protected SnapshotState getInitialState() { 223 return SnapshotState.SNAPSHOT_PREPARE; 224 } 225 226 private void prepareSnapshot(MasterProcedureEnv env) 227 throws ProcedureSuspendedException, IOException { 228 if (isAnySplitOrMergeProcedureRunning(env)) { 229 if (retryCounter == null) { 230 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 231 } 232 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 233 LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); 234 setTimeout(Math.toIntExact(backoff)); 235 setState(ProcedureState.WAITING_TIMEOUT); 236 skipPersistence(); 237 throw new ProcedureSuspendedException(); 238 } 239 prepareSnapshotEnv(env); 240 } 241 242 private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { 243 this.conf = env.getMasterConfiguration(); 244 this.snapshotTable = TableName.valueOf(snapshot.getTable()); 245 this.htd = loadTableDescriptorSnapshot(env); 246 this.rootFs = env.getMasterFileSystem().getFileSystem(); 247 this.rootDir = CommonFSUtils.getRootDir(conf); 248 this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 249 this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); 250 this.workingDirFS = workingDir.getFileSystem(conf); 251 this.status = TaskMonitor.get() 252 .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); 253 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); 254 this.snapshotManifest = 255 SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); 256 this.snapshotManifest.addTableDescriptor(htd); 257 } 258 259 @Override 260 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 261 setState(ProcedureState.RUNNABLE); 262 env.getProcedureScheduler().addFront(this); 263 return false; 264 } 265 266 private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { 267 return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() 268 .filter(p -> !p.isFinished()) 269 .filter( 270 p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure) 271 .anyMatch( 272 p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName())); 273 } 274 275 private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { 276 TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); 277 if (htd == null) { 278 throw new IOException("TableDescriptor missing for " + snapshotTable); 279 } 280 if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { 281 return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, 282 Long.toString(this.snapshot.getMaxFileSize())).build(); 283 } 284 return htd; 285 } 286 287 private void preSnapshot(MasterProcedureEnv env) throws IOException { 288 env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); 289 } 290 291 private void postSnapshot(MasterProcedureEnv env) throws IOException { 292 SnapshotManager sm = env.getMasterServices().getSnapshotManager(); 293 if (sm != null) { 294 sm.unregisterSnapshotProcedure(snapshot, getProcId()); 295 } 296 } 297 298 private void verifySnapshot(MasterProcedureEnv env) throws IOException { 299 int verifyThreshold = 300 env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000); 301 List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false) 302 .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); 303 int numRegions = regions.size(); 304 305 MasterSnapshotVerifier verifier = 306 new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); 307 if (numRegions >= verifyThreshold) { 308 verifier.verifySnapshot(workingDir, false); 309 addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r)) 310 .toArray(SnapshotVerifyProcedure[]::new)); 311 } else { 312 verifier.verifySnapshot(workingDir, true); 313 } 314 } 315 316 private void completeSnapshot(MasterProcedureEnv env) throws IOException { 317 // complete the snapshot, atomically moving from tmp to .snapshot dir. 318 SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, 319 env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); 320 // update metric. when master restarts, the metric value is wrong 321 metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); 322 if (env.getMasterCoprocessorHost() != null) { 323 env.getMasterCoprocessorHost() 324 .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); 325 } 326 status.markComplete("Snapshot " + snapshot.getName() + " completed"); 327 } 328 329 private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { 330 List<RegionInfo> regions = 331 getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList()); 332 snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); 333 } 334 335 private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { 336 List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList()); 337 snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); 338 } 339 340 private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) { 341 return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream() 342 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)); 343 } 344 345 private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions, 346 String threadPoolName) throws IOException { 347 ThreadPoolExecutor exec = 348 SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName); 349 try { 350 ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { 351 @Override 352 public void editRegion(final RegionInfo region) throws IOException { 353 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 354 LOG.info("take snapshot region={}, table={}", region, snapshotTable); 355 } 356 }); 357 } finally { 358 exec.shutdown(); 359 } 360 status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); 361 } 362 363 private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { 364 if (!MobUtils.hasMobColumns(htd)) { 365 return; 366 } 367 ThreadPoolExecutor exec = 368 SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); 369 RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); 370 try { 371 ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), 372 new ModifyRegionUtils.RegionEditTask() { 373 @Override 374 public void editRegion(final RegionInfo region) throws IOException { 375 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 376 } 377 }); 378 } finally { 379 exec.shutdown(); 380 } 381 status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); 382 } 383 384 @Override 385 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 386 super.serializeStateData(serializer); 387 serializer 388 .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build()); 389 } 390 391 @Override 392 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 393 super.deserializeStateData(serializer); 394 SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); 395 this.snapshot = data.getSnapshot(); 396 } 397 398 private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { 399 return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream() 400 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 401 .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new); 402 } 403 404 @Override 405 public void toStringClassDetails(StringBuilder builder) { 406 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=") 407 .append(ClientSnapshotDescriptionUtils.toString(snapshot)); 408 } 409 410 public SnapshotDescription getSnapshotDesc() { 411 return snapshot; 412 } 413 414 @Override 415 protected void afterReplay(MasterProcedureEnv env) { 416 if (getCurrentState() == getInitialState()) { 417 // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). 418 return; 419 } 420 try { 421 prepareSnapshotEnv(env); 422 boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, 423 SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 424 if (!snapshotProcedureEnabled) { 425 throw new IOException("SnapshotProcedure is DISABLED"); 426 } 427 } catch (IOException e) { 428 LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); 429 setFailure("master-snapshot", e); 430 } 431 } 432 433 public SnapshotDescription getSnapshot() { 434 return snapshot; 435 } 436 437 public synchronized void markSnapshotCorrupted() throws IOException { 438 Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); 439 if (!workingDirFS.exists(flagFile)) { 440 workingDirFS.create(flagFile).close(); 441 LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); 442 } 443 } 444 445 public boolean isSnapshotCorrupted() throws IOException { 446 return workingDirFS 447 .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir)); 448 } 449}