001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.stream.Collectors;
025import java.util.stream.Stream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionReplicaUtil;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.client.TableState;
035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
036import org.apache.hadoop.hbase.master.MetricsSnapshot;
037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.mob.MobUtils;
042import org.apache.hadoop.hbase.monitoring.MonitoredTask;
043import org.apache.hadoop.hbase.monitoring.TaskMonitor;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
047import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
052import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
053import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.ModifyRegionUtils;
057import org.apache.hadoop.hbase.util.RetryCounter;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
067
068/**
069 * A procedure used to take snapshot on tables.
070 */
071@InterfaceAudience.Private
072public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> {
073  private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class);
074  private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
075
076  private Configuration conf;
077  private SnapshotDescription snapshot;
078  private Path rootDir;
079  private Path snapshotDir;
080  private Path workingDir;
081  private FileSystem workingDirFS;
082  private FileSystem rootFs;
083  private TableName snapshotTable;
084  private MonitoredTask status;
085  private SnapshotManifest snapshotManifest;
086  private TableDescriptor htd;
087
088  private RetryCounter retryCounter;
089
090  public SnapshotProcedure() {
091  }
092
093  public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) {
094    super(env);
095    this.snapshot = snapshot;
096  }
097
098  @Override
099  public TableName getTableName() {
100    return TableName.valueOf(snapshot.getTable());
101  }
102
103  @Override
104  public TableOperationType getTableOperationType() {
105    return TableOperationType.SNAPSHOT;
106  }
107
108  @Override
109  protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
110    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
111    LOG.info("{} execute state={}", this, state);
112
113    try {
114      switch (state) {
115        case SNAPSHOT_PREPARE:
116          prepareSnapshot(env);
117          setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
118          return Flow.HAS_MORE_STATE;
119        case SNAPSHOT_PRE_OPERATION:
120          preSnapshot(env);
121          setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
122          return Flow.HAS_MORE_STATE;
123        case SNAPSHOT_WRITE_SNAPSHOT_INFO:
124          SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS);
125          TableState tableState =
126            env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
127          if (tableState.isEnabled()) {
128            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
129          } else if (tableState.isDisabled()) {
130            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS);
131          }
132          return Flow.HAS_MORE_STATE;
133        case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
134          addChildProcedure(createRemoteSnapshotProcedures(env));
135          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS);
136          return Flow.HAS_MORE_STATE;
137        case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS:
138          snapshotSplitRegions(env);
139          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
140          return Flow.HAS_MORE_STATE;
141        case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS:
142          snapshotClosedRegions(env);
143          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
144          return Flow.HAS_MORE_STATE;
145        case SNAPSHOT_SNAPSHOT_MOB_REGION:
146          snapshotMobRegion(env);
147          setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
148          return Flow.HAS_MORE_STATE;
149        case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
150          // flush the in-memory state, and write the single manifest
151          status.setStatus("Consolidate snapshot: " + snapshot.getName());
152          snapshotManifest.consolidate();
153          setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
154          return Flow.HAS_MORE_STATE;
155        case SNAPSHOT_VERIFIER_SNAPSHOT:
156          status.setStatus("Verifying snapshot: " + snapshot.getName());
157          verifySnapshot(env);
158          setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
159          return Flow.HAS_MORE_STATE;
160        case SNAPSHOT_COMPLETE_SNAPSHOT:
161          if (isSnapshotCorrupted()) {
162            throw new CorruptedSnapshotException(snapshot.getName());
163          }
164          if (
165            SnapshotDescriptionUtils.isExpiredSnapshot(snapshot.getTtl(),
166              snapshot.getCreationTime(), EnvironmentEdgeManager.currentTime())
167          ) {
168            throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshot));
169          }
170          completeSnapshot(env);
171          setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
172          return Flow.HAS_MORE_STATE;
173        case SNAPSHOT_POST_OPERATION:
174          postSnapshot(env);
175          return Flow.NO_MORE_STATE;
176        default:
177          throw new UnsupportedOperationException("unhandled state=" + state);
178      }
179    } catch (ProcedureSuspendedException e) {
180      throw e;
181    } catch (Exception e) {
182      setFailure("master-snapshot", e);
183      LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e);
184      status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable);
185      return Flow.NO_MORE_STATE;
186    }
187  }
188
189  @Override
190  protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
191    throws IOException, InterruptedException {
192    if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
193      try {
194        if (!workingDirFS.delete(workingDir, true)) {
195          LOG.error("Couldn't delete snapshot working directory {}", workingDir);
196        }
197      } catch (IOException e) {
198        LOG.error("Couldn't delete snapshot working directory {}", workingDir, e);
199      }
200    }
201  }
202
203  @Override
204  protected boolean isRollbackSupported(SnapshotState state) {
205    return true;
206  }
207
208  @Override
209  protected SnapshotState getState(final int stateId) {
210    return SnapshotState.forNumber(stateId);
211  }
212
213  @Override
214  protected int getStateId(SnapshotState state) {
215    return state.getNumber();
216  }
217
218  @Override
219  protected SnapshotState getInitialState() {
220    return SnapshotState.SNAPSHOT_PREPARE;
221  }
222
223  private void prepareSnapshot(MasterProcedureEnv env)
224    throws ProcedureSuspendedException, IOException {
225    if (isAnySplitOrMergeProcedureRunning(env)) {
226      if (retryCounter == null) {
227        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
228      }
229      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
230      LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff);
231      setTimeout(Math.toIntExact(backoff));
232      setState(ProcedureState.WAITING_TIMEOUT);
233      skipPersistence();
234      throw new ProcedureSuspendedException();
235    }
236    prepareSnapshotEnv(env);
237  }
238
239  private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
240    this.conf = env.getMasterConfiguration();
241    this.snapshotTable = TableName.valueOf(snapshot.getTable());
242    this.htd = loadTableDescriptorSnapshot(env);
243    this.rootFs = env.getMasterFileSystem().getFileSystem();
244    this.rootDir = CommonFSUtils.getRootDir(conf);
245    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
246    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
247    this.workingDirFS = workingDir.getFileSystem(conf);
248    this.status = TaskMonitor.get()
249      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
250    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
251    this.snapshotManifest =
252      SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
253    this.snapshotManifest.addTableDescriptor(htd);
254  }
255
256  @Override
257  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
258    setState(ProcedureState.RUNNABLE);
259    env.getProcedureScheduler().addFront(this);
260    return false;
261  }
262
263  private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
264    return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
265      .filter(p -> !p.isFinished())
266      .filter(
267        p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure)
268      .anyMatch(
269        p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName()));
270  }
271
272  private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException {
273    TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable);
274    if (htd == null) {
275      throw new IOException("TableDescriptor missing for " + snapshotTable);
276    }
277    if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
278      return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
279        Long.toString(this.snapshot.getMaxFileSize())).build();
280    }
281    return htd;
282  }
283
284  private void preSnapshot(MasterProcedureEnv env) throws IOException {
285    env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
286  }
287
288  private void postSnapshot(MasterProcedureEnv env) throws IOException {
289    SnapshotManager sm = env.getMasterServices().getSnapshotManager();
290    if (sm != null) {
291      sm.unregisterSnapshotProcedure(snapshot, getProcId());
292    }
293  }
294
295  private void verifySnapshot(MasterProcedureEnv env) throws IOException {
296    int verifyThreshold =
297      env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000);
298    List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false)
299      .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList());
300    int numRegions = regions.size();
301
302    MasterSnapshotVerifier verifier =
303      new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS);
304    if (numRegions >= verifyThreshold) {
305      verifier.verifySnapshot(workingDir, false);
306      addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r))
307        .toArray(SnapshotVerifyProcedure[]::new));
308    } else {
309      verifier.verifySnapshot(workingDir, true);
310    }
311  }
312
313  private void completeSnapshot(MasterProcedureEnv env) throws IOException {
314    // complete the snapshot, atomically moving from tmp to .snapshot dir.
315    SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
316      env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
317    // update metric. when master restarts, the metric value is wrong
318    metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
319    if (env.getMasterCoprocessorHost() != null) {
320      env.getMasterCoprocessorHost()
321        .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
322    }
323    status.markComplete("Snapshot " + snapshot.getName() + "  completed");
324  }
325
326  private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException {
327    List<RegionInfo> regions =
328      getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList());
329    snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool");
330  }
331
332  private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException {
333    List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList());
334    snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool");
335  }
336
337  private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) {
338    return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream()
339      .filter(r -> RegionReplicaUtil.isDefaultReplica(r));
340  }
341
342  private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions,
343    String threadPoolName) throws IOException {
344    ThreadPoolExecutor exec =
345      SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName);
346    try {
347      ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
348        @Override
349        public void editRegion(final RegionInfo region) throws IOException {
350          snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
351          LOG.info("take snapshot region={}, table={}", region, snapshotTable);
352        }
353      });
354    } finally {
355      exec.shutdown();
356    }
357    status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable);
358  }
359
360  private void snapshotMobRegion(MasterProcedureEnv env) throws IOException {
361    if (!MobUtils.hasMobColumns(htd)) {
362      return;
363    }
364    ThreadPoolExecutor exec =
365      SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool");
366    RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
367    try {
368      ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo),
369        new ModifyRegionUtils.RegionEditTask() {
370          @Override
371          public void editRegion(final RegionInfo region) throws IOException {
372            snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
373          }
374        });
375    } finally {
376      exec.shutdown();
377    }
378    status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
379  }
380
381  @Override
382  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
383    super.serializeStateData(serializer);
384    serializer
385      .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build());
386  }
387
388  @Override
389  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
390    super.deserializeStateData(serializer);
391    SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class);
392    this.snapshot = data.getSnapshot();
393  }
394
395  private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) {
396    return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream()
397      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
398      .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new);
399  }
400
401  @Override
402  public void toStringClassDetails(StringBuilder builder) {
403    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=")
404      .append(ClientSnapshotDescriptionUtils.toString(snapshot));
405  }
406
407  public SnapshotDescription getSnapshotDesc() {
408    return snapshot;
409  }
410
411  @Override
412  protected void afterReplay(MasterProcedureEnv env) {
413    if (getCurrentState() == getInitialState()) {
414      // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv().
415      return;
416    }
417    try {
418      prepareSnapshotEnv(env);
419      boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED,
420        SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
421      if (!snapshotProcedureEnabled) {
422        throw new IOException("SnapshotProcedure is DISABLED");
423      }
424    } catch (IOException e) {
425      LOG.error("Failed replaying {}, mark procedure as FAILED", this, e);
426      setFailure("master-snapshot", e);
427    }
428  }
429
430  public SnapshotDescription getSnapshot() {
431    return snapshot;
432  }
433
434  public synchronized void markSnapshotCorrupted() throws IOException {
435    Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir);
436    if (!workingDirFS.exists(flagFile)) {
437      workingDirFS.create(flagFile).close();
438      LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName());
439    }
440  }
441
442  public boolean isSnapshotCorrupted() throws IOException {
443    return workingDirFS
444      .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir));
445  }
446}