001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.stream.Collectors;
025import java.util.stream.Stream;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.RegionInfo;
031import org.apache.hadoop.hbase.client.RegionReplicaUtil;
032import org.apache.hadoop.hbase.client.TableDescriptor;
033import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
034import org.apache.hadoop.hbase.client.TableState;
035import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
036import org.apache.hadoop.hbase.master.MetricsSnapshot;
037import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
038import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.mob.MobUtils;
042import org.apache.hadoop.hbase.monitoring.MonitoredTask;
043import org.apache.hadoop.hbase.monitoring.TaskMonitor;
044import org.apache.hadoop.hbase.procedure2.Procedure;
045import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
046import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
047import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
048import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
049import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
050import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
051import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
052import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
053import org.apache.hadoop.hbase.util.CommonFSUtils;
054import org.apache.hadoop.hbase.util.ModifyRegionUtils;
055import org.apache.hadoop.hbase.util.RetryCounter;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
065
066/**
067 * A procedure used to take snapshot on tables.
068 */
069@InterfaceAudience.Private
070public class SnapshotProcedure extends AbstractStateMachineTableProcedure<SnapshotState> {
071  private static final Logger LOG = LoggerFactory.getLogger(SnapshotProcedure.class);
072  private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
073
074  private Configuration conf;
075  private SnapshotDescription snapshot;
076  private Path rootDir;
077  private Path snapshotDir;
078  private Path workingDir;
079  private FileSystem workingDirFS;
080  private FileSystem rootFs;
081  private TableName snapshotTable;
082  private MonitoredTask status;
083  private SnapshotManifest snapshotManifest;
084  private TableDescriptor htd;
085
086  private RetryCounter retryCounter;
087
088  public SnapshotProcedure() {
089  }
090
091  public SnapshotProcedure(final MasterProcedureEnv env, final SnapshotDescription snapshot) {
092    super(env);
093    this.snapshot = snapshot;
094  }
095
096  @Override
097  public TableName getTableName() {
098    return TableName.valueOf(snapshot.getTable());
099  }
100
101  @Override
102  public TableOperationType getTableOperationType() {
103    return TableOperationType.SNAPSHOT;
104  }
105
106  @Override
107  protected LockState acquireLock(MasterProcedureEnv env) {
108    // AbstractStateMachineTableProcedure acquires exclusive table lock by default,
109    // but we may need to downgrade it to shared lock for some reasons:
110    // a. exclusive lock has a negative effect on assigning region. See HBASE-21480 for details.
111    // b. we want to support taking multiple different snapshots on same table on the same time.
112    if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName())) {
113      return LockState.LOCK_EVENT_WAIT;
114    }
115    return LockState.LOCK_ACQUIRED;
116  }
117
118  @Override
119  protected void releaseLock(MasterProcedureEnv env) {
120    env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
121  }
122
123  @Override
124  protected boolean holdLock(MasterProcedureEnv env) {
125    // In order to avoid enabling/disabling/modifying/deleting table during snapshot,
126    // we don't release lock during suspend
127    return true;
128  }
129
130  @Override
131  protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
132    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
133    LOG.info("{} execute state={}", this, state);
134
135    try {
136      switch (state) {
137        case SNAPSHOT_PREPARE:
138          prepareSnapshot(env);
139          setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
140          return Flow.HAS_MORE_STATE;
141        case SNAPSHOT_PRE_OPERATION:
142          preSnapshot(env);
143          setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
144          return Flow.HAS_MORE_STATE;
145        case SNAPSHOT_WRITE_SNAPSHOT_INFO:
146          SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir, workingDirFS);
147          TableState tableState =
148            env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
149          if (tableState.isEnabled()) {
150            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
151          } else if (tableState.isDisabled()) {
152            setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_CLOSED_REGIONS);
153          }
154          return Flow.HAS_MORE_STATE;
155        case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
156          addChildProcedure(createRemoteSnapshotProcedures(env));
157          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_SPLIT_REGIONS);
158          return Flow.HAS_MORE_STATE;
159        case SNAPSHOT_SNAPSHOT_SPLIT_REGIONS:
160          snapshotSplitRegions(env);
161          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
162          return Flow.HAS_MORE_STATE;
163        case SNAPSHOT_SNAPSHOT_CLOSED_REGIONS:
164          snapshotClosedRegions(env);
165          setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
166          return Flow.HAS_MORE_STATE;
167        case SNAPSHOT_SNAPSHOT_MOB_REGION:
168          snapshotMobRegion(env);
169          setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
170          return Flow.HAS_MORE_STATE;
171        case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
172          // flush the in-memory state, and write the single manifest
173          status.setStatus("Consolidate snapshot: " + snapshot.getName());
174          snapshotManifest.consolidate();
175          setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
176          return Flow.HAS_MORE_STATE;
177        case SNAPSHOT_VERIFIER_SNAPSHOT:
178          status.setStatus("Verifying snapshot: " + snapshot.getName());
179          verifySnapshot(env);
180          setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
181          return Flow.HAS_MORE_STATE;
182        case SNAPSHOT_COMPLETE_SNAPSHOT:
183          if (isSnapshotCorrupted()) {
184            throw new CorruptedSnapshotException(snapshot.getName());
185          }
186          completeSnapshot(env);
187          setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
188          return Flow.HAS_MORE_STATE;
189        case SNAPSHOT_POST_OPERATION:
190          postSnapshot(env);
191          return Flow.NO_MORE_STATE;
192        default:
193          throw new UnsupportedOperationException("unhandled state=" + state);
194      }
195    } catch (ProcedureSuspendedException e) {
196      throw e;
197    } catch (Exception e) {
198      setFailure("master-snapshot", e);
199      LOG.warn("unexpected exception while execute {}. Mark procedure Failed.", this, e);
200      status.abort("Abort Snapshot " + snapshot.getName() + " on Table " + snapshotTable);
201      return Flow.NO_MORE_STATE;
202    }
203  }
204
205  @Override
206  protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
207    throws IOException, InterruptedException {
208    if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
209      try {
210        if (!workingDirFS.delete(workingDir, true)) {
211          LOG.error("Couldn't delete snapshot working directory {}", workingDir);
212        }
213      } catch (IOException e) {
214        LOG.error("Couldn't delete snapshot working directory {}", workingDir, e);
215      }
216    }
217  }
218
219  @Override
220  protected boolean isRollbackSupported(SnapshotState state) {
221    return true;
222  }
223
224  @Override
225  protected SnapshotState getState(final int stateId) {
226    return SnapshotState.forNumber(stateId);
227  }
228
229  @Override
230  protected int getStateId(SnapshotState state) {
231    return state.getNumber();
232  }
233
234  @Override
235  protected SnapshotState getInitialState() {
236    return SnapshotState.SNAPSHOT_PREPARE;
237  }
238
239  private void prepareSnapshot(MasterProcedureEnv env)
240    throws ProcedureSuspendedException, IOException {
241    if (isAnySplitOrMergeProcedureRunning(env)) {
242      if (retryCounter == null) {
243        retryCounter = ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
244      }
245      long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
246      LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this, backoff);
247      setTimeout(Math.toIntExact(backoff));
248      setState(ProcedureState.WAITING_TIMEOUT);
249      skipPersistence();
250      throw new ProcedureSuspendedException();
251    }
252    prepareSnapshotEnv(env);
253  }
254
255  private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
256    this.conf = env.getMasterConfiguration();
257    this.snapshotTable = TableName.valueOf(snapshot.getTable());
258    this.htd = loadTableDescriptorSnapshot(env);
259    this.rootFs = env.getMasterFileSystem().getFileSystem();
260    this.rootDir = CommonFSUtils.getRootDir(conf);
261    this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
262    this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir, conf);
263    this.workingDirFS = workingDir.getFileSystem(conf);
264    this.status = TaskMonitor.get()
265      .createStatus("Taking " + snapshot.getType() + " snapshot on table: " + snapshotTable);
266    ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(snapshot.getName());
267    this.snapshotManifest =
268      SnapshotManifest.create(conf, rootFs, workingDir, snapshot, monitor, status);
269    this.snapshotManifest.addTableDescriptor(htd);
270  }
271
272  @Override
273  protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
274    setState(ProcedureState.RUNNABLE);
275    env.getProcedureScheduler().addFront(this);
276    return false;
277  }
278
279  private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
280    return env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
281      .filter(p -> !p.isFinished())
282      .filter(
283        p -> p instanceof SplitTableRegionProcedure || p instanceof MergeTableRegionsProcedure)
284      .anyMatch(
285        p -> ((AbstractStateMachineTableProcedure<?>) p).getTableName().equals(getTableName()));
286  }
287
288  private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env) throws IOException {
289    TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(snapshotTable);
290    if (htd == null) {
291      throw new IOException("TableDescriptor missing for " + snapshotTable);
292    }
293    if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
294      return TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
295        Long.toString(this.snapshot.getMaxFileSize())).build();
296    }
297    return htd;
298  }
299
300  private void preSnapshot(MasterProcedureEnv env) throws IOException {
301    env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
302  }
303
304  private void postSnapshot(MasterProcedureEnv env) throws IOException {
305    SnapshotManager sm = env.getMasterServices().getSnapshotManager();
306    if (sm != null) {
307      sm.unregisterSnapshotProcedure(snapshot, getProcId());
308    }
309  }
310
311  private void verifySnapshot(MasterProcedureEnv env) throws IOException {
312    int verifyThreshold =
313      env.getMasterConfiguration().getInt("hbase.snapshot.remote.verify.threshold", 10000);
314    List<RegionInfo> regions = env.getAssignmentManager().getTableRegions(snapshotTable, false)
315      .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).collect(Collectors.toList());
316    int numRegions = regions.size();
317
318    MasterSnapshotVerifier verifier =
319      new MasterSnapshotVerifier(env.getMasterServices(), snapshot, workingDirFS);
320    if (numRegions >= verifyThreshold) {
321      verifier.verifySnapshot(workingDir, false);
322      addChildProcedure(regions.stream().map(r -> new SnapshotVerifyProcedure(snapshot, r))
323        .toArray(SnapshotVerifyProcedure[]::new));
324    } else {
325      verifier.verifySnapshot(workingDir, true);
326    }
327  }
328
329  private void completeSnapshot(MasterProcedureEnv env) throws IOException {
330    // complete the snapshot, atomically moving from tmp to .snapshot dir.
331    SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
332      env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
333    // update metric. when master restarts, the metric value is wrong
334    metricsSnapshot.addSnapshot(status.getCompletionTimestamp() - status.getStartTime());
335    if (env.getMasterCoprocessorHost() != null) {
336      env.getMasterCoprocessorHost()
337        .postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
338    }
339    status.markComplete("Snapshot " + snapshot.getName() + "  completed");
340  }
341
342  private void snapshotSplitRegions(MasterProcedureEnv env) throws IOException {
343    List<RegionInfo> regions =
344      getDefaultRegionReplica(env).filter(RegionInfo::isSplit).collect(Collectors.toList());
345    snapshotSplitOrClosedRegions(env, regions, "SplitRegionsSnapshotPool");
346  }
347
348  private void snapshotClosedRegions(MasterProcedureEnv env) throws IOException {
349    List<RegionInfo> regions = getDefaultRegionReplica(env).collect(Collectors.toList());
350    snapshotSplitOrClosedRegions(env, regions, "ClosedRegionsSnapshotPool");
351  }
352
353  private Stream<RegionInfo> getDefaultRegionReplica(MasterProcedureEnv env) {
354    return env.getAssignmentManager().getTableRegions(snapshotTable, false).stream()
355      .filter(r -> RegionReplicaUtil.isDefaultReplica(r));
356  }
357
358  private void snapshotSplitOrClosedRegions(MasterProcedureEnv env, List<RegionInfo> regions,
359    String threadPoolName) throws IOException {
360    ThreadPoolExecutor exec =
361      SnapshotManifest.createExecutor(env.getMasterConfiguration(), threadPoolName);
362    try {
363      ModifyRegionUtils.editRegions(exec, regions, new ModifyRegionUtils.RegionEditTask() {
364        @Override
365        public void editRegion(final RegionInfo region) throws IOException {
366          snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
367          LOG.info("take snapshot region={}, table={}", region, snapshotTable);
368        }
369      });
370    } finally {
371      exec.shutdown();
372    }
373    status.setStatus("Completed referencing closed/split regions of table: " + snapshotTable);
374  }
375
376  private void snapshotMobRegion(MasterProcedureEnv env) throws IOException {
377    if (!MobUtils.hasMobColumns(htd)) {
378      return;
379    }
380    ThreadPoolExecutor exec =
381      SnapshotManifest.createExecutor(env.getMasterConfiguration(), "MobRegionSnapshotPool");
382    RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
383    try {
384      ModifyRegionUtils.editRegions(exec, Collections.singleton(mobRegionInfo),
385        new ModifyRegionUtils.RegionEditTask() {
386          @Override
387          public void editRegion(final RegionInfo region) throws IOException {
388            snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir, snapshotTable), region);
389          }
390        });
391    } finally {
392      exec.shutdown();
393    }
394    status.setStatus("Completed referencing HFiles for the mob region of table: " + snapshotTable);
395  }
396
397  @Override
398  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
399    super.serializeStateData(serializer);
400    serializer
401      .serialize(SnapshotProcedureStateData.newBuilder().setSnapshot(this.snapshot).build());
402  }
403
404  @Override
405  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
406    super.deserializeStateData(serializer);
407    SnapshotProcedureStateData data = serializer.deserialize(SnapshotProcedureStateData.class);
408    this.snapshot = data.getSnapshot();
409  }
410
411  private Procedure<MasterProcedureEnv>[] createRemoteSnapshotProcedures(MasterProcedureEnv env) {
412    return env.getAssignmentManager().getTableRegions(snapshotTable, true).stream()
413      .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
414      .map(r -> new SnapshotRegionProcedure(snapshot, r)).toArray(SnapshotRegionProcedure[]::new);
415  }
416
417  @Override
418  public void toStringClassDetails(StringBuilder builder) {
419    builder.append(getClass().getName()).append(", id=").append(getProcId()).append(", snapshot=")
420      .append(ClientSnapshotDescriptionUtils.toString(snapshot));
421  }
422
423  public SnapshotDescription getSnapshotDesc() {
424    return snapshot;
425  }
426
427  @Override
428  protected void afterReplay(MasterProcedureEnv env) {
429    if (getCurrentState() == getInitialState()) {
430      // if we are in the initial state, it is unnecessary to call prepareSnapshotEnv().
431      return;
432    }
433    try {
434      prepareSnapshotEnv(env);
435      boolean snapshotProcedureEnabled = conf.getBoolean(SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED,
436        SnapshotManager.SNAPSHOT_PROCEDURE_ENABLED_DEFAULT);
437      if (!snapshotProcedureEnabled) {
438        throw new IOException("SnapshotProcedure is DISABLED");
439      }
440    } catch (IOException e) {
441      LOG.error("Failed replaying {}, mark procedure as FAILED", this, e);
442      setFailure("master-snapshot", e);
443    }
444  }
445
446  public SnapshotDescription getSnapshot() {
447    return snapshot;
448  }
449
450  public synchronized void markSnapshotCorrupted() throws IOException {
451    Path flagFile = SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir);
452    if (!workingDirFS.exists(flagFile)) {
453      workingDirFS.create(flagFile).close();
454      LOG.info("touch corrupted snapshot flag file {} for {}", flagFile, snapshot.getName());
455    }
456  }
457
458  public boolean isSnapshotCorrupted() throws IOException {
459    return workingDirFS
460      .exists(SnapshotDescriptionUtils.getCorruptedFlagFileForSnapshot(workingDir));
461  }
462}