001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.stream.Collectors;
025import java.util.stream.Stream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionReplicaUtil;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.client.TableState;
035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
036import org.apache.hadoop.hbase.master.MetricsSnapshot;
037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.mob.MobUtils;
042import org.apache.hadoop.hbase.monitoring.MonitoredTask;
043import org.apache.hadoop.hbase.monitoring.TaskMonitor;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
047import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
052import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
053import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.ModifyRegionUtils;
057import org.apache.hadoop.hbase.util.RetryCounter;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
068
069/**
070 * A procedure used to take snapshot on tables.
071 */
072@InterfaceAudience.Private
073public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> {
074  private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class);
075  private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
076
077  private Configuration conf;
078  private SnapshotDescription snapshot;
079  private Path rootDir;
080  private Path snapshotDir;
081  private Path workingDir;
082  private FileSystem workingDirFS;
083  private FileSystem rootFs;
084  private TableName snapshotTable;
085  private MonitoredTask status;
086  private SnapshotManifest snapshotManifest;
087  private TableDescriptor htd;
088
089  private RetryCounter retryCounter;
090
091  public SnapshotProcedure() {
092  }
093
094  public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) {
095    super(env);
096    this.snapshot = snapshot;
097  }
098
099  @Override
100  public TableName getTableName() {
101    return TableName.valueOf(snapshot.getTable());
102  }
103
104  @Override
105  public TableOperationType getTableOperationType() {
106    return TableOperationType.SNAPSHOT;
107  }
108
109  @Override
110  protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
111    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
112    LOG.info("{} execute state={}", this, state);
113
114    try {
115      switch (state) {
116        case SNAPSHOT_PREPARE:
117          prepareSnapshot(env);
118          setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
119          return Flow.HAS_MORE_STATE;
120        case SNAPSHOT_PRE_OPERATION:
121          preSnapshot(env);
122          setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
123          return Flow.HAS_MORE_STATE;
124        case SNAPSHOT_WRITE_SNAPSHOT_INFO:
125          TableState tableState =
126            env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
127          if (tableState.isEnabled()) {
128            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
129          } else if (tableState.isDisabled()) {
130            // Set the snapshot type to DISABLED as the table is in DISABLED state
131            snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
132            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS);
133          }
134          SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS);
135          return Flow.HAS_MORE_STATE;
136        case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
137          addChildProcedure(createRemoteSnapshotProcedures(env));
138          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS);
139          return Flow.HAS_MORE_STATE;
140        case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS:
141          snapshotSplitRegions(env);
142          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
143          return Flow.HAS_MORE_STATE;
144        case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS:
145          snapshotClosedRegions(env);
146          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
147          return Flow.HAS_MORE_STATE;
148        case SNAPSHOT_SNAPSHOT_MOB_REGION:
149          snapshotMobRegion(env);
150          setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
151          return Flow.HAS_MORE_STATE;
152        case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
153          // flush the in-memory state, and write the single manifest
154          status.setStatus("Consolidate snapshot: " + snapshot.getName());
155          snapshotManifest.consolidate();
156          setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
157          return Flow.HAS_MORE_STATE;
158        case SNAPSHOT_VERIFIER_SNAPSHOT:
159          status.setStatus("Verifying snapshot: " + snapshot.getName());
160          verifySnapshot(env);
161          setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
162          return Flow.HAS_MORE_STATE;
163        case SNAPSHOT_COMPLETE_SNAPSHOT:
164          if (isSnapshotCorrupted()) {
165            throw new CorruptedSnapshotException(snapshot.getName());
166          }
167          if (
168            SnapshotDescriptionUtils.isExpiredSnapshot(snapshot.getTtl(),
169              snapshot.getCreationTime(), EnvironmentEdgeManager.currentTime())
170          ) {
171            throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshot));
172          }
173          completeSnapshot(env);
174          setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
175          return Flow.HAS_MORE_STATE;
176        case SNAPSHOT_POST_OPERATION:
177          postSnapshot(env);
178          return Flow.NO_MORE_STATE;
179        default:
180          throw new UnsupportedOperationException("unhandled state=" + state);
181      }
182    } catch (ProcedureSuspendedException e) {
183      throw e;
184    } catch (Exception e) {
185      setFailure("master-snapshot", e);
186      LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e);
187      status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable);
188      return Flow.NO_MORE_STATE;
189    }
190  }
191
192  @Override
193  protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
194    throws IOException, InterruptedException {
195    if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
196      try {
197        if (!workingDirFS.delete(workingDir, true)) {
198          LOG.error("Couldn't delete snapshot working directory {}", workingDir);
199        }
200      } catch (IOException e) {
201        LOG.error("Couldn't delete snapshot working directory {}", workingDir, e);
202      }
203    }
204  }
205
206  @Override
207  protected boolean isRollbackSupported(SnapshotState state) {
208    return true;
209  }
210
211  @Override
212  protected SnapshotState getState(final int stateId) {
213    return SnapshotState.forNumber(stateId);
214  }
215
216  @Override
217  protected int getStateId(SnapshotState state) {
218    return state.getNumber();
219  }
220
221  @Override
222  protected SnapshotState getInitialState() {
223    return SnapshotState.SNAPSHOT_PREPARE;
224  }
225
226  private void prepareSnapshot(MasterProcedureEnv env)
227    throws ProcedureSuspendedException, IOException {
228    if (isAnySplitOrMergeProcedureRunning(env)) {
229      if (retryCounter == null) {
230        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
231      }
232      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
233      LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff);
234      setTimeout(Math.toIntExact(backoff));
235      setState(ProcedureState.WAITING_TIMEOUT);
236      skipPersistence();
237      throw new ProcedureSuspendedException();
238    }
239    prepareSnapshotEnv(env);
240  }
241
242  private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
243    this.conf = env.getMasterConfiguration();
244    this.snapshotTable = TableName.valueOf(snapshot.getTable());
245    this.htd = loadTableDescriptorSnapshot(env);
246    this.rootFs = env.getMasterFileSystem().getFileSystem();
247    this.rootDir = CommonFSUtils.getRootDir(conf);
248    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
249    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
250    this.workingDirFS = workingDir.getFileSystem(conf);
251    this.status = TaskMonitor.get()
252      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
253    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
254    this.snapshotManifest =
255      SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
256    this.snapshotManifest.addTableDescriptor(htd);
257  }
258
259  @Override
260  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
261    setState(ProcedureState.RUNNABLE);
262    env.getProcedureScheduler().addFront(this);
263    return false;
264  }
265
266  private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
267    return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
268      .filter(p -> !p.isFinished())
269      .filter(
270        p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure)
271      .anyMatch(
272        p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName()));
273  }
274
275  private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException {
276    TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable);
277    if (htd == null) {
278      throw new IOException("TableDescriptor missing for " + snapshotTable);
279    }
280    if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
281      return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
282        Long.toString(this.snapshot.getMaxFileSize())).build();
283    }
284    return htd;
285  }
286
287  private void preSnapshot(MasterProcedureEnv env) throws IOException {
288    env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
289  }
290
291  private void postSnapshot(MasterProcedureEnv env) throws IOException {
292    SnapshotManager sm = env.getMasterServices().getSnapshotManager();
293    if (sm != null) {
294      sm.unregisterSnapshotProcedure(snapshot, getProcId());
295    }
296  }
297
298  private void verifySnapshot(MasterProcedureEnv env) throws IOException {
299    int verifyThreshold =
300      env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000);
301    List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false)
302      .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList());
303    int numRegions = regions.size();
304
305    MasterSnapshotVerifier verifier =
306      new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS);
307    if (numRegions >= verifyThreshold) {
308      verifier.verifySnapshot(workingDir, false);
309      addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r))
310        .toArray(SnapshotVerifyProcedure[]::new));
311    } else {
312      verifier.verifySnapshot(workingDir, true);
313    }
314  }
315
316  private void completeSnapshot(MasterProcedureEnv env) throws IOException {
317    // complete the snapshot, atomically moving from tmp to .snapshot dir.
318    SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
319      env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
320    // update metric. when master restarts, the metric value is wrong
321    metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
322    if (env.getMasterCoprocessorHost() != null) {
323      env.getMasterCoprocessorHost()
324        .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
325    }
326    status.markComplete("Snapshot " + snapshot.getName() + "  completed");
327  }
328
329  private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException {
330    List<RegionInfo> regions =
331      getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList());
332    snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool");
333  }
334
335  private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException {
336    List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList());
337    snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool");
338  }
339
340  private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) {
341    return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream()
342      .filter(r -> RegionReplicaUtil.isDefaultReplica(r));
343  }
344
345  private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions,
346    String threadPoolName) throws IOException {
347    ThreadPoolExecutor exec =
348      SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName);
349    try {
350      ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
351        @Override
352        public void editRegion(final RegionInfo region) throws IOException {
353          snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
354          LOG.info("take snapshot region={}, table={}", region, snapshotTable);
355        }
356      });
357    } finally {
358      exec.shutdown();
359    }
360    status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable);
361  }
362
363  private void snapshotMobRegion(MasterProcedureEnv env) throws IOException {
364    if (!MobUtils.hasMobColumns(htd)) {
365      return;
366    }
367    ThreadPoolExecutor exec =
368      SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool");
369    RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
370    try {
371      ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo),
372        new ModifyRegionUtils.RegionEditTask() {
373          @Override
374          public void editRegion(final RegionInfo region) throws IOException {
375            snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
376          }
377        });
378    } finally {
379      exec.shutdown();
380    }
381    status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
382  }
383
384  @Override
385  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
386    super.serializeStateData(serializer);
387    serializer
388      .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build());
389  }
390
391  @Override
392  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
393    super.deserializeStateData(serializer);
394    SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class);
395    this.snapshot = data.getSnapshot();
396  }
397
398  private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) {
399    return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream()
400      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
401      .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new);
402  }
403
404  @Override
405  public void toStringClassDetails(StringBuilder builder) {
406    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=")
407      .append(ClientSnapshotDescriptionUtils.toString(snapshot));
408  }
409
410  public SnapshotDescription getSnapshotDesc() {
411    return snapshot;
412  }
413
414  @Override
415  protected void afterReplay(MasterProcedureEnv env) {
416    if (getCurrentState() == getInitialState()) {
417      // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv().
418      return;
419    }
420    try {
421      prepareSnapshotEnv(env);
422      boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED,
423        SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
424      if (!snapshotProcedureEnabled) {
425        throw new IOException("SnapshotProcedure is DISABLED");
426      }
427    } catch (IOException e) {
428      LOG.error("Failed replaying {}, mark procedure as FAILED", this, e);
429      setFailure("master-snapshot", e);
430    }
431  }
432
433  public SnapshotDescription getSnapshot() {
434    return snapshot;
435  }
436
437  public synchronized void markSnapshotCorrupted() throws IOException {
438    Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir);
439    if (!workingDirFS.exists(flagFile)) {
440      workingDirFS.create(flagFile).close();
441      LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName());
442    }
443  }
444
445  public boolean isSnapshotCorrupted() throws IOException {
446    return workingDirFS
447      .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir));
448  }
449}