001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.stream.Collectors; 025import java.util.stream.Stream; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.client.RegionInfo; 031import org.apache.hadoop.hbase.client.RegionReplicaUtil; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.client.TableState; 035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 036import org.apache.hadoop.hbase.master.MetricsSnapshot; 037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; 038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.mob.MobUtils; 042import org.apache.hadoop.hbase.monitoring.MonitoredTask; 043import org.apache.hadoop.hbase.monitoring.TaskMonitor; 044import org.apache.hadoop.hbase.procedure2.Procedure; 045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 047import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; 050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; 051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; 052import org.apache.hadoop.hbase.snapshot.SnapshotManifest; 053import org.apache.hadoop.hbase.util.CommonFSUtils; 054import org.apache.hadoop.hbase.util.ModifyRegionUtils; 055import org.apache.hadoop.hbase.util.RetryCounter; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 065 066/** 067 * A procedure used to take snapshot on tables. 068 */ 069@InterfaceAudience.Private 070public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> { 071 private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class); 072 private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot(); 073 074 private Configuration conf; 075 private SnapshotDescription snapshot; 076 private Path rootDir; 077 private Path snapshotDir; 078 private Path workingDir; 079 private FileSystem workingDirFS; 080 private FileSystem rootFs; 081 private TableName snapshotTable; 082 private MonitoredTask status; 083 private SnapshotManifest snapshotManifest; 084 private TableDescriptor htd; 085 086 private RetryCounter retryCounter; 087 088 public SnapshotProcedure() { 089 } 090 091 public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) { 092 super(env); 093 this.snapshot = snapshot; 094 } 095 096 @Override 097 public TableName getTableName() { 098 return TableName.valueOf(snapshot.getTable()); 099 } 100 101 @Override 102 public TableOperationType getTableOperationType() { 103 return TableOperationType.SNAPSHOT; 104 } 105 106 @Override 107 protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state) 108 throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { 109 LOG.info("{} execute state={}", this, state); 110 111 try { 112 switch (state) { 113 case SNAPSHOT_PREPARE: 114 prepareSnapshot(env); 115 setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION); 116 return Flow.HAS_MORE_STATE; 117 case SNAPSHOT_PRE_OPERATION: 118 preSnapshot(env); 119 setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO); 120 return Flow.HAS_MORE_STATE; 121 case SNAPSHOT_WRITE_SNAPSHOT_INFO: 122 SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS); 123 TableState tableState = 124 env.getMasterServices().getTableStateManager().getTableState(snapshotTable); 125 if (tableState.isEnabled()) { 126 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS); 127 } else if (tableState.isDisabled()) { 128 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS); 129 } 130 return Flow.HAS_MORE_STATE; 131 case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS: 132 addChildProcedure(createRemoteSnapshotProcedures(env)); 133 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS); 134 return Flow.HAS_MORE_STATE; 135 case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS: 136 snapshotSplitRegions(env); 137 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 138 return Flow.HAS_MORE_STATE; 139 case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS: 140 snapshotClosedRegions(env); 141 setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION); 142 return Flow.HAS_MORE_STATE; 143 case SNAPSHOT_SNAPSHOT_MOB_REGION: 144 snapshotMobRegion(env); 145 setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT); 146 return Flow.HAS_MORE_STATE; 147 case SNAPSHOT_CONSOLIDATE_SNAPSHOT: 148 // flush the in-memory state, and write the single manifest 149 status.setStatus("Consolidate snapshot: " + snapshot.getName()); 150 snapshotManifest.consolidate(); 151 setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT); 152 return Flow.HAS_MORE_STATE; 153 case SNAPSHOT_VERIFIER_SNAPSHOT: 154 status.setStatus("Verifying snapshot: " + snapshot.getName()); 155 verifySnapshot(env); 156 setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT); 157 return Flow.HAS_MORE_STATE; 158 case SNAPSHOT_COMPLETE_SNAPSHOT: 159 if (isSnapshotCorrupted()) { 160 throw new CorruptedSnapshotException(snapshot.getName()); 161 } 162 completeSnapshot(env); 163 setNextState(SnapshotState.SNAPSHOT_POST_OPERATION); 164 return Flow.HAS_MORE_STATE; 165 case SNAPSHOT_POST_OPERATION: 166 postSnapshot(env); 167 return Flow.NO_MORE_STATE; 168 default: 169 throw new UnsupportedOperationException("unhandled state=" + state); 170 } 171 } catch (ProcedureSuspendedException e) { 172 throw e; 173 } catch (Exception e) { 174 setFailure("master-snapshot", e); 175 LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e); 176 status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable); 177 return Flow.NO_MORE_STATE; 178 } 179 } 180 181 @Override 182 protected void rollbackState(MasterProcedureEnv env, SnapshotState state) 183 throws IOException, InterruptedException { 184 if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) { 185 try { 186 if (!workingDirFS.delete(workingDir, true)) { 187 LOG.error("Couldn't delete snapshot working directory {}", workingDir); 188 } 189 } catch (IOException e) { 190 LOG.error("Couldn't delete snapshot working directory {}", workingDir, e); 191 } 192 } 193 } 194 195 @Override 196 protected boolean isRollbackSupported(SnapshotState state) { 197 return true; 198 } 199 200 @Override 201 protected SnapshotState getState(final int stateId) { 202 return SnapshotState.forNumber(stateId); 203 } 204 205 @Override 206 protected int getStateId(SnapshotState state) { 207 return state.getNumber(); 208 } 209 210 @Override 211 protected SnapshotState getInitialState() { 212 return SnapshotState.SNAPSHOT_PREPARE; 213 } 214 215 private void prepareSnapshot(MasterProcedureEnv env) 216 throws ProcedureSuspendedException, IOException { 217 if (isAnySplitOrMergeProcedureRunning(env)) { 218 if (retryCounter == null) { 219 retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration()); 220 } 221 long backoff = retryCounter.getBackoffTimeAndIncrementAttempts(); 222 LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff); 223 setTimeout(Math.toIntExact(backoff)); 224 setState(ProcedureState.WAITING_TIMEOUT); 225 skipPersistence(); 226 throw new ProcedureSuspendedException(); 227 } 228 prepareSnapshotEnv(env); 229 } 230 231 private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException { 232 this.conf = env.getMasterConfiguration(); 233 this.snapshotTable = TableName.valueOf(snapshot.getTable()); 234 this.htd = loadTableDescriptorSnapshot(env); 235 this.rootFs = env.getMasterFileSystem().getFileSystem(); 236 this.rootDir = CommonFSUtils.getRootDir(conf); 237 this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir); 238 this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf); 239 this.workingDirFS = workingDir.getFileSystem(conf); 240 this.status = TaskMonitor.get() 241 .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable); 242 ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName()); 243 this.snapshotManifest = 244 SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status); 245 this.snapshotManifest.addTableDescriptor(htd); 246 } 247 248 @Override 249 protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { 250 setState(ProcedureState.RUNNABLE); 251 env.getProcedureScheduler().addFront(this); 252 return false; 253 } 254 255 private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) { 256 return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() 257 .filter(p -> !p.isFinished()) 258 .filter( 259 p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure) 260 .anyMatch( 261 p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName())); 262 } 263 264 private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException { 265 TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable); 266 if (htd == null) { 267 throw new IOException("TableDescriptor missing for " + snapshotTable); 268 } 269 if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) { 270 return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE, 271 Long.toString(this.snapshot.getMaxFileSize())).build(); 272 } 273 return htd; 274 } 275 276 private void preSnapshot(MasterProcedureEnv env) throws IOException { 277 env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot); 278 } 279 280 private void postSnapshot(MasterProcedureEnv env) throws IOException { 281 SnapshotManager sm = env.getMasterServices().getSnapshotManager(); 282 if (sm != null) { 283 sm.unregisterSnapshotProcedure(snapshot, getProcId()); 284 } 285 } 286 287 private void verifySnapshot(MasterProcedureEnv env) throws IOException { 288 int verifyThreshold = 289 env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000); 290 List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false) 291 .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList()); 292 int numRegions = regions.size(); 293 294 MasterSnapshotVerifier verifier = 295 new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS); 296 if (numRegions >= verifyThreshold) { 297 verifier.verifySnapshot(workingDir, false); 298 addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r)) 299 .toArray(SnapshotVerifyProcedure[]::new)); 300 } else { 301 verifier.verifySnapshot(workingDir, true); 302 } 303 } 304 305 private void completeSnapshot(MasterProcedureEnv env) throws IOException { 306 // complete the snapshot, atomically moving from tmp to .snapshot dir. 307 SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir, 308 env.getMasterFileSystem().getFileSystem(), workingDirFS, conf); 309 // update metric. when master restarts, the metric value is wrong 310 metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime()); 311 if (env.getMasterCoprocessorHost() != null) { 312 env.getMasterCoprocessorHost() 313 .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd); 314 } 315 status.markComplete("Snapshot " + snapshot.getName() + " completed"); 316 } 317 318 private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException { 319 List<RegionInfo> regions = 320 getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList()); 321 snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool"); 322 } 323 324 private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException { 325 List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList()); 326 snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool"); 327 } 328 329 private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) { 330 return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream() 331 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)); 332 } 333 334 private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions, 335 String threadPoolName) throws IOException { 336 ThreadPoolExecutor exec = 337 SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName); 338 try { 339 ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() { 340 @Override 341 public void editRegion(final RegionInfo region) throws IOException { 342 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 343 LOG.info("take snapshot region={}, table={}", region, snapshotTable); 344 } 345 }); 346 } finally { 347 exec.shutdown(); 348 } 349 status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable); 350 } 351 352 private void snapshotMobRegion(MasterProcedureEnv env) throws IOException { 353 if (!MobUtils.hasMobColumns(htd)) { 354 return; 355 } 356 ThreadPoolExecutor exec = 357 SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool"); 358 RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName()); 359 try { 360 ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo), 361 new ModifyRegionUtils.RegionEditTask() { 362 @Override 363 public void editRegion(final RegionInfo region) throws IOException { 364 snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region); 365 } 366 }); 367 } finally { 368 exec.shutdown(); 369 } 370 status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable); 371 } 372 373 @Override 374 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 375 super.serializeStateData(serializer); 376 serializer 377 .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build()); 378 } 379 380 @Override 381 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 382 super.deserializeStateData(serializer); 383 SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class); 384 this.snapshot = data.getSnapshot(); 385 } 386 387 private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) { 388 return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream() 389 .filter(r -> RegionReplicaUtil.isDefaultReplica(r)) 390 .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new); 391 } 392 393 @Override 394 public void toStringClassDetails(StringBuilder builder) { 395 builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=") 396 .append(ClientSnapshotDescriptionUtils.toString(snapshot)); 397 } 398 399 public SnapshotDescription getSnapshotDesc() { 400 return snapshot; 401 } 402 403 @Override 404 protected void afterReplay(MasterProcedureEnv env) { 405 if (getCurrentState() == getInitialState()) { 406 // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv(). 407 return; 408 } 409 try { 410 prepareSnapshotEnv(env); 411 boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED, 412 SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT); 413 if (!snapshotProcedureEnabled) { 414 throw new IOException("SnapshotProcedure is DISABLED"); 415 } 416 } catch (IOException e) { 417 LOG.error("Failed replaying {}, mark procedure as FAILED", this, e); 418 setFailure("master-snapshot", e); 419 } 420 } 421 422 public SnapshotDescription getSnapshot() { 423 return snapshot; 424 } 425 426 public synchronized void markSnapshotCorrupted() throws IOException { 427 Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir); 428 if (!workingDirFS.exists(flagFile)) { 429 workingDirFS.create(flagFile).close(); 430 LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName()); 431 } 432 } 433 434 public boolean isSnapshotCorrupted() throws IOException { 435 return workingDirFS 436 .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir)); 437 } 438}