001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.stream.Collectors;
025import java.util.stream.Stream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionReplicaUtil;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.client.TableState;
035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
036import org.apache.hadoop.hbase.master.MetricsSnapshot;
037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.mob.MobUtils;
042import org.apache.hadoop.hbase.monitoring.MonitoredTask;
043import org.apache.hadoop.hbase.monitoring.TaskMonitor;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
047import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
052import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.ModifyRegionUtils;
055import org.apache.hadoop.hbase.util.RetryCounter;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
065
066/**
067 * A procedure used to take snapshot on tables.
068 */
069@InterfaceAudience.Private
070public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> {
071  private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class);
072  private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
073
074  private Configuration conf;
075  private SnapshotDescription snapshot;
076  private Path rootDir;
077  private Path snapshotDir;
078  private Path workingDir;
079  private FileSystem workingDirFS;
080  private FileSystem rootFs;
081  private TableName snapshotTable;
082  private MonitoredTask status;
083  private SnapshotManifest snapshotManifest;
084  private TableDescriptor htd;
085
086  private RetryCounter retryCounter;
087
088  public SnapshotProcedure() {
089  }
090
091  public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) {
092    super(env);
093    this.snapshot = snapshot;
094  }
095
096  @Override
097  public TableName getTableName() {
098    return TableName.valueOf(snapshot.getTable());
099  }
100
101  @Override
102  public TableOperationType getTableOperationType() {
103    return TableOperationType.SNAPSHOT;
104  }
105
106  @Override
107  protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
108    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
109    LOG.info("{} execute state={}", this, state);
110
111    try {
112      switch (state) {
113        case SNAPSHOT_PREPARE:
114          prepareSnapshot(env);
115          setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
116          return Flow.HAS_MORE_STATE;
117        case SNAPSHOT_PRE_OPERATION:
118          preSnapshot(env);
119          setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
120          return Flow.HAS_MORE_STATE;
121        case SNAPSHOT_WRITE_SNAPSHOT_INFO:
122          SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS);
123          TableState tableState =
124            env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
125          if (tableState.isEnabled()) {
126            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
127          } else if (tableState.isDisabled()) {
128            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS);
129          }
130          return Flow.HAS_MORE_STATE;
131        case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
132          addChildProcedure(createRemoteSnapshotProcedures(env));
133          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS);
134          return Flow.HAS_MORE_STATE;
135        case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS:
136          snapshotSplitRegions(env);
137          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
138          return Flow.HAS_MORE_STATE;
139        case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS:
140          snapshotClosedRegions(env);
141          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
142          return Flow.HAS_MORE_STATE;
143        case SNAPSHOT_SNAPSHOT_MOB_REGION:
144          snapshotMobRegion(env);
145          setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
146          return Flow.HAS_MORE_STATE;
147        case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
148          // flush the in-memory state, and write the single manifest
149          status.setStatus("Consolidate snapshot: " + snapshot.getName());
150          snapshotManifest.consolidate();
151          setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
152          return Flow.HAS_MORE_STATE;
153        case SNAPSHOT_VERIFIER_SNAPSHOT:
154          status.setStatus("Verifying snapshot: " + snapshot.getName());
155          verifySnapshot(env);
156          setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
157          return Flow.HAS_MORE_STATE;
158        case SNAPSHOT_COMPLETE_SNAPSHOT:
159          if (isSnapshotCorrupted()) {
160            throw new CorruptedSnapshotException(snapshot.getName());
161          }
162          completeSnapshot(env);
163          setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
164          return Flow.HAS_MORE_STATE;
165        case SNAPSHOT_POST_OPERATION:
166          postSnapshot(env);
167          return Flow.NO_MORE_STATE;
168        default:
169          throw new UnsupportedOperationException("unhandled state=" + state);
170      }
171    } catch (ProcedureSuspendedException e) {
172      throw e;
173    } catch (Exception e) {
174      setFailure("master-snapshot", e);
175      LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e);
176      status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable);
177      return Flow.NO_MORE_STATE;
178    }
179  }
180
181  @Override
182  protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
183    throws IOException, InterruptedException {
184    if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
185      try {
186        if (!workingDirFS.delete(workingDir, true)) {
187          LOG.error("Couldn't delete snapshot working directory {}", workingDir);
188        }
189      } catch (IOException e) {
190        LOG.error("Couldn't delete snapshot working directory {}", workingDir, e);
191      }
192    }
193  }
194
195  @Override
196  protected boolean isRollbackSupported(SnapshotState state) {
197    return true;
198  }
199
200  @Override
201  protected SnapshotState getState(final int stateId) {
202    return SnapshotState.forNumber(stateId);
203  }
204
205  @Override
206  protected int getStateId(SnapshotState state) {
207    return state.getNumber();
208  }
209
210  @Override
211  protected SnapshotState getInitialState() {
212    return SnapshotState.SNAPSHOT_PREPARE;
213  }
214
215  private void prepareSnapshot(MasterProcedureEnv env)
216    throws ProcedureSuspendedException, IOException {
217    if (isAnySplitOrMergeProcedureRunning(env)) {
218      if (retryCounter == null) {
219        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
220      }
221      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
222      LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff);
223      setTimeout(Math.toIntExact(backoff));
224      setState(ProcedureState.WAITING_TIMEOUT);
225      skipPersistence();
226      throw new ProcedureSuspendedException();
227    }
228    prepareSnapshotEnv(env);
229  }
230
231  private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
232    this.conf = env.getMasterConfiguration();
233    this.snapshotTable = TableName.valueOf(snapshot.getTable());
234    this.htd = loadTableDescriptorSnapshot(env);
235    this.rootFs = env.getMasterFileSystem().getFileSystem();
236    this.rootDir = CommonFSUtils.getRootDir(conf);
237    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
238    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
239    this.workingDirFS = workingDir.getFileSystem(conf);
240    this.status = TaskMonitor.get()
241      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
242    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
243    this.snapshotManifest =
244      SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
245    this.snapshotManifest.addTableDescriptor(htd);
246  }
247
248  @Override
249  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
250    setState(ProcedureState.RUNNABLE);
251    env.getProcedureScheduler().addFront(this);
252    return false;
253  }
254
255  private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
256    return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
257      .filter(p -> !p.isFinished())
258      .filter(
259        p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure)
260      .anyMatch(
261        p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName()));
262  }
263
264  private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException {
265    TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable);
266    if (htd == null) {
267      throw new IOException("TableDescriptor missing for " + snapshotTable);
268    }
269    if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
270      return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
271        Long.toString(this.snapshot.getMaxFileSize())).build();
272    }
273    return htd;
274  }
275
276  private void preSnapshot(MasterProcedureEnv env) throws IOException {
277    env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
278  }
279
280  private void postSnapshot(MasterProcedureEnv env) throws IOException {
281    SnapshotManager sm = env.getMasterServices().getSnapshotManager();
282    if (sm != null) {
283      sm.unregisterSnapshotProcedure(snapshot, getProcId());
284    }
285  }
286
287  private void verifySnapshot(MasterProcedureEnv env) throws IOException {
288    int verifyThreshold =
289      env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000);
290    List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false)
291      .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList());
292    int numRegions = regions.size();
293
294    MasterSnapshotVerifier verifier =
295      new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS);
296    if (numRegions >= verifyThreshold) {
297      verifier.verifySnapshot(workingDir, false);
298      addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r))
299        .toArray(SnapshotVerifyProcedure[]::new));
300    } else {
301      verifier.verifySnapshot(workingDir, true);
302    }
303  }
304
305  private void completeSnapshot(MasterProcedureEnv env) throws IOException {
306    // complete the snapshot, atomically moving from tmp to .snapshot dir.
307    SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
308      env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
309    // update metric. when master restarts, the metric value is wrong
310    metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
311    if (env.getMasterCoprocessorHost() != null) {
312      env.getMasterCoprocessorHost()
313        .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
314    }
315    status.markComplete("Snapshot " + snapshot.getName() + "  completed");
316  }
317
318  private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException {
319    List<RegionInfo> regions =
320      getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList());
321    snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool");
322  }
323
324  private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException {
325    List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList());
326    snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool");
327  }
328
329  private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) {
330    return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream()
331      .filter(r -> RegionReplicaUtil.isDefaultReplica(r));
332  }
333
334  private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions,
335    String threadPoolName) throws IOException {
336    ThreadPoolExecutor exec =
337      SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName);
338    try {
339      ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
340        @Override
341        public void editRegion(final RegionInfo region) throws IOException {
342          snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
343          LOG.info("take snapshot region={}, table={}", region, snapshotTable);
344        }
345      });
346    } finally {
347      exec.shutdown();
348    }
349    status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable);
350  }
351
352  private void snapshotMobRegion(MasterProcedureEnv env) throws IOException {
353    if (!MobUtils.hasMobColumns(htd)) {
354      return;
355    }
356    ThreadPoolExecutor exec =
357      SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool");
358    RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
359    try {
360      ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo),
361        new ModifyRegionUtils.RegionEditTask() {
362          @Override
363          public void editRegion(final RegionInfo region) throws IOException {
364            snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
365          }
366        });
367    } finally {
368      exec.shutdown();
369    }
370    status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
371  }
372
373  @Override
374  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
375    super.serializeStateData(serializer);
376    serializer
377      .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build());
378  }
379
380  @Override
381  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
382    super.deserializeStateData(serializer);
383    SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class);
384    this.snapshot = data.getSnapshot();
385  }
386
387  private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) {
388    return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream()
389      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
390      .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new);
391  }
392
393  @Override
394  public void toStringClassDetails(StringBuilder builder) {
395    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=")
396      .append(ClientSnapshotDescriptionUtils.toString(snapshot));
397  }
398
399  public SnapshotDescription getSnapshotDesc() {
400    return snapshot;
401  }
402
403  @Override
404  protected void afterReplay(MasterProcedureEnv env) {
405    if (getCurrentState() == getInitialState()) {
406      // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv().
407      return;
408    }
409    try {
410      prepareSnapshotEnv(env);
411      boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED,
412        SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
413      if (!snapshotProcedureEnabled) {
414        throw new IOException("SnapshotProcedure is DISABLED");
415      }
416    } catch (IOException e) {
417      LOG.error("Failed replaying {}, mark procedure as FAILED", this, e);
418      setFailure("master-snapshot", e);
419    }
420  }
421
422  public SnapshotDescription getSnapshot() {
423    return snapshot;
424  }
425
426  public synchronized void markSnapshotCorrupted() throws IOException {
427    Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir);
428    if (!workingDirFS.exists(flagFile)) {
429      workingDirFS.create(flagFile).close();
430      LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName());
431    }
432  }
433
434  public boolean isSnapshotCorrupted() throws IOException {
435    return workingDirFS
436      .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir));
437  }
438}