001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.stream.Collectors;
025import java.util.stream.Stream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionReplicaUtil;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.client.TableState;
035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
036import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
037import org.apache.hadoop.hbase.master.MetricsSnapshot;
038import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
039import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
040import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
041import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
042import org.apache.hadoop.hbase.mob.MobUtils;
043import org.apache.hadoop.hbase.monitoring.MonitoredTask;
044import org.apache.hadoop.hbase.monitoring.TaskMonitor;
045import org.apache.hadoop.hbase.procedure2.Procedure;
046import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
047import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
048import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
049import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
050import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
051import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
052import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
053import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.ModifyRegionUtils;
056import org.apache.hadoop.hbase.util.RetryCounter;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
066
067/**
068 * A procedure used to take snapshot on tables.
069 */
070@InterfaceAudience.Private
071public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> {
072  private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class);
073  private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
074
075  private Configuration conf;
076  private SnapshotDescription snapshot;
077  private Path rootDir;
078  private Path snapshotDir;
079  private Path workingDir;
080  private FileSystem workingDirFS;
081  private FileSystem rootFs;
082  private TableName snapshotTable;
083  private MonitoredTask status;
084  private SnapshotManifest snapshotManifest;
085  private TableDescriptor htd;
086
087  private RetryCounter retryCounter;
088
089  public SnapshotProcedure() {
090  }
091
092  public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) {
093    super(env);
094    this.snapshot = snapshot;
095  }
096
097  @Override
098  public TableName getTableName() {
099    return TableName.valueOf(snapshot.getTable());
100  }
101
102  @Override
103  public TableOperationType getTableOperationType() {
104    return TableOperationType.SNAPSHOT;
105  }
106
107  @Override
108  protected LockState acquireLock(MasterProcedureEnv env) {
109    // AbstractStateMachineTableProcedure acquires exclusive table lock by default,
110    // but we may need to downgrade it to shared lock for some reasons:
111    // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details.
112    // b. we want to support taking multiple different snapshots on same table on the same time.
113    if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) {
114      return LockState.LOCK_EVENT_WAIT;
115    }
116    return LockState.LOCK_ACQUIRED;
117  }
118
119  @Override
120  protected void releaseLock(MasterProcedureEnv env) {
121    env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
122  }
123
124  @Override
125  protected boolean holdLock(MasterProcedureEnv env) {
126    // In order to avoid enabling/disabling/modifying/deleting table during snapshot,
127    // we don't release lock during suspend
128    return true;
129  }
130
131  @Override
132  protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
133    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
134    LOG.info("{} execute state={}", this, state);
135
136    try {
137      switch (state) {
138        case SNAPSHOT_PREPARE:
139          prepareSnapshot(env);
140          setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
141          return Flow.HAS_MORE_STATE;
142        case SNAPSHOT_PRE_OPERATION:
143          preSnapshot(env);
144          setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
145          return Flow.HAS_MORE_STATE;
146        case SNAPSHOT_WRITE_SNAPSHOT_INFO:
147          SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS);
148          TableState tableState =
149            env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
150          if (tableState.isEnabled()) {
151            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
152          } else if (tableState.isDisabled()) {
153            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS);
154          }
155          return Flow.HAS_MORE_STATE;
156        case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
157          addChildProcedure(createRemoteSnapshotProcedures(env));
158          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS);
159          return Flow.HAS_MORE_STATE;
160        case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS:
161          snapshotSplitRegions(env);
162          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
163          return Flow.HAS_MORE_STATE;
164        case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS:
165          snapshotClosedRegions(env);
166          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
167          return Flow.HAS_MORE_STATE;
168        case SNAPSHOT_SNAPSHOT_MOB_REGION:
169          snapshotMobRegion(env);
170          setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
171          return Flow.HAS_MORE_STATE;
172        case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
173          // flush the in-memory state, and write the single manifest
174          status.setStatus("Consolidate snapshot: " + snapshot.getName());
175          snapshotManifest.consolidate();
176          setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
177          return Flow.HAS_MORE_STATE;
178        case SNAPSHOT_VERIFIER_SNAPSHOT:
179          status.setStatus("Verifying snapshot: " + snapshot.getName());
180          verifySnapshot(env);
181          setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
182          return Flow.HAS_MORE_STATE;
183        case SNAPSHOT_COMPLETE_SNAPSHOT:
184          if (isSnapshotCorrupted()) {
185            throw new CorruptedSnapshotException(snapshot.getName());
186          }
187          completeSnapshot(env);
188          setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
189          return Flow.HAS_MORE_STATE;
190        case SNAPSHOT_POST_OPERATION:
191          postSnapshot(env);
192          return Flow.NO_MORE_STATE;
193        default:
194          throw new UnsupportedOperationException("unhandled state=" + state);
195      }
196    } catch (ProcedureSuspendedException e) {
197      throw e;
198    } catch (Exception e) {
199      setFailure("master-snapshot", e);
200      LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e);
201      status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable);
202      return Flow.NO_MORE_STATE;
203    }
204  }
205
206  @Override
207  protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
208    throws IOException, InterruptedException {
209    if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
210      try {
211        if (!workingDirFS.delete(workingDir, true)) {
212          LOG.error("Couldn't delete snapshot working directory {}", workingDir);
213        }
214      } catch (IOException e) {
215        LOG.error("Couldn't delete snapshot working directory {}", workingDir, e);
216      }
217    }
218  }
219
220  @Override
221  protected boolean isRollbackSupported(SnapshotState state) {
222    return true;
223  }
224
225  @Override
226  protected SnapshotState getState(final int stateId) {
227    return SnapshotState.forNumber(stateId);
228  }
229
230  @Override
231  protected int getStateId(SnapshotState state) {
232    return state.getNumber();
233  }
234
235  @Override
236  protected SnapshotState getInitialState() {
237    return SnapshotState.SNAPSHOT_PREPARE;
238  }
239
240  private void prepareSnapshot(MasterProcedureEnv env)
241    throws ProcedureSuspendedException, IOException {
242    if (isAnySplitOrMergeProcedureRunning(env)) {
243      if (retryCounter == null) {
244        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
245      }
246      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
247      LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff);
248      setTimeout(Math.toIntExact(backoff));
249      setState(ProcedureState.WAITING_TIMEOUT);
250      skipPersistence();
251      throw new ProcedureSuspendedException();
252    }
253    prepareSnapshotEnv(env);
254  }
255
256  private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
257    this.conf = env.getMasterConfiguration();
258    this.snapshotTable = TableName.valueOf(snapshot.getTable());
259    this.htd = loadTableDescriptorSnapshot(env);
260    this.rootFs = env.getMasterFileSystem().getFileSystem();
261    this.rootDir = CommonFSUtils.getRootDir(conf);
262    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
263    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
264    this.workingDirFS = workingDir.getFileSystem(conf);
265    this.status = TaskMonitor.get()
266      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
267    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
268    this.snapshotManifest =
269      SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
270    this.snapshotManifest.addTableDescriptor(htd);
271  }
272
273  @Override
274  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
275    setState(ProcedureState.RUNNABLE);
276    env.getProcedureScheduler().addFront(this);
277    return false;
278  }
279
280  private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
281    return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
282      .filter(p -> !p.isFinished())
283      .filter(
284        p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure)
285      .anyMatch(
286        p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName()));
287  }
288
289  private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException {
290    TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable);
291    if (htd == null) {
292      throw new IOException("TableDescriptor missing for " + snapshotTable);
293    }
294    if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
295      return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
296        Long.toString(this.snapshot.getMaxFileSize())).build();
297    }
298    return htd;
299  }
300
301  private void preSnapshot(MasterProcedureEnv env) throws IOException {
302    env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
303
304    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
305    if (cpHost != null) {
306      cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser());
307    }
308  }
309
310  private void postSnapshot(MasterProcedureEnv env) throws IOException {
311    SnapshotManager sm = env.getMasterServices().getSnapshotManager();
312    if (sm != null) {
313      sm.unregisterSnapshotProcedure(snapshot, getProcId());
314    }
315
316    MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
317    if (cpHost != null) {
318      cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd, getUser());
319    }
320  }
321
322  private void verifySnapshot(MasterProcedureEnv env) throws IOException {
323    int verifyThreshold =
324      env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000);
325    List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false)
326      .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList());
327    int numRegions = regions.size();
328
329    MasterSnapshotVerifier verifier =
330      new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS);
331    if (numRegions >= verifyThreshold) {
332      verifier.verifySnapshot(workingDir, false);
333      addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r))
334        .toArray(SnapshotVerifyProcedure[]::new));
335    } else {
336      verifier.verifySnapshot(workingDir, true);
337    }
338  }
339
340  private void completeSnapshot(MasterProcedureEnv env) throws IOException {
341    // complete the snapshot, atomically moving from tmp to .snapshot dir.
342    SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
343      env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
344    // update metric. when master restarts, the metric value is wrong
345    metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
346    if (env.getMasterCoprocessorHost() != null) {
347      env.getMasterCoprocessorHost()
348        .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
349    }
350    status.markComplete("Snapshot " + snapshot.getName() + "  completed");
351  }
352
353  private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException {
354    List<RegionInfo> regions =
355      getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList());
356    snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool");
357  }
358
359  private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException {
360    List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList());
361    snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool");
362  }
363
364  private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) {
365    return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream()
366      .filter(r -> RegionReplicaUtil.isDefaultReplica(r));
367  }
368
369  private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions,
370    String threadPoolName) throws IOException {
371    ThreadPoolExecutor exec =
372      SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName);
373    try {
374      ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
375        @Override
376        public void editRegion(final RegionInfo region) throws IOException {
377          snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
378          LOG.info("take snapshot region={}, table={}", region, snapshotTable);
379        }
380      });
381    } finally {
382      exec.shutdown();
383    }
384    status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable);
385  }
386
387  private void snapshotMobRegion(MasterProcedureEnv env) throws IOException {
388    if (!MobUtils.hasMobColumns(htd)) {
389      return;
390    }
391    ThreadPoolExecutor exec =
392      SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool");
393    RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
394    try {
395      ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo),
396        new ModifyRegionUtils.RegionEditTask() {
397          @Override
398          public void editRegion(final RegionInfo region) throws IOException {
399            snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
400          }
401        });
402    } finally {
403      exec.shutdown();
404    }
405    status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
406  }
407
408  @Override
409  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
410    super.serializeStateData(serializer);
411    serializer
412      .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build());
413  }
414
415  @Override
416  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
417    super.deserializeStateData(serializer);
418    SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class);
419    this.snapshot = data.getSnapshot();
420  }
421
422  private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) {
423    return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream()
424      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
425      .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new);
426  }
427
428  @Override
429  public void toStringClassDetails(StringBuilder builder) {
430    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=")
431      .append(ClientSnapshotDescriptionUtils.toString(snapshot));
432  }
433
434  public SnapshotDescription getSnapshotDesc() {
435    return snapshot;
436  }
437
438  @Override
439  protected void afterReplay(MasterProcedureEnv env) {
440    if (getCurrentState() == getInitialState()) {
441      // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv().
442      return;
443    }
444    try {
445      prepareSnapshotEnv(env);
446      boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED,
447        SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
448      if (!snapshotProcedureEnabled) {
449        throw new IOException("SnapshotProcedure is DISABLED");
450      }
451    } catch (IOException e) {
452      LOG.error("Failed replaying {}, mark procedure as FAILED", this, e);
453      setFailure("master-snapshot", e);
454    }
455  }
456
457  public SnapshotDescription getSnapshot() {
458    return snapshot;
459  }
460
461  public synchronized void markSnapshotCorrupted() throws IOException {
462    Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir);
463    if (!workingDirFS.exists(flagFile)) {
464      workingDirFS.create(flagFile).close();
465      LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName());
466    }
467  }
468
469  public boolean isSnapshotCorrupted() throws IOException {
470    return workingDirFS
471      .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir));
472  }
473}