001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.concurrent.Callable;
023
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028import org.apache.hadoop.hbase.client.IsolationLevel;
029import org.apache.hadoop.hbase.errorhandling.ForeignException;
030import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
031import org.apache.hadoop.hbase.procedure.ProcedureMember;
032import org.apache.hadoop.hbase.procedure.Subprocedure;
033import org.apache.hadoop.hbase.regionserver.HRegion;
034import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
035import org.apache.hadoop.hbase.regionserver.Region.Operation;
036import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager.SnapshotSubprocedurePool;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
038import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
039
040/**
041 * This online snapshot implementation uses the distributed procedure framework to force a
042 * store flush and then records the hfiles.  Its enter stage does nothing.  Its leave stage then
043 * flushes the memstore, builds the region server's snapshot manifest from its hfiles list, and
044 * copies .regioninfos into the snapshot working directory.  At the master side, there is an atomic
045 * rename of the working dir into the proper snapshot directory.
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Unstable
049public class FlushSnapshotSubprocedure extends Subprocedure {
050  private static final Logger LOG = LoggerFactory.getLogger(FlushSnapshotSubprocedure.class);
051
052  private final List<HRegion> regions;
053  private final SnapshotDescription snapshot;
054  private final SnapshotSubprocedurePool taskManager;
055  private boolean snapshotSkipFlush = false;
056
057  // the maximum number of attempts we flush
058  final static int MAX_RETRIES = 3;
059
060  public FlushSnapshotSubprocedure(ProcedureMember member,
061      ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
062      List<HRegion> regions, SnapshotDescription snapshot,
063      SnapshotSubprocedurePool taskManager) {
064    super(member, snapshot.getName(), errorListener, wakeFrequency, timeout);
065    this.snapshot = snapshot;
066
067    if (this.snapshot.getType() == SnapshotDescription.Type.SKIPFLUSH) {
068      snapshotSkipFlush = true;
069    }
070    this.regions = regions;
071    this.taskManager = taskManager;
072  }
073
074  /**
075   * Callable for adding files to snapshot manifest working dir.  Ready for multithreading.
076   */
077  public static class RegionSnapshotTask implements Callable<Void> {
078    private HRegion region;
079    private boolean skipFlush;
080    private ForeignExceptionDispatcher monitor;
081    private SnapshotDescription snapshotDesc;
082
083    public RegionSnapshotTask(HRegion region, SnapshotDescription snapshotDesc,
084        boolean skipFlush, ForeignExceptionDispatcher monitor) {
085      this.region = region;
086      this.skipFlush = skipFlush;
087      this.monitor = monitor;
088      this.snapshotDesc = snapshotDesc;
089    }
090
091    @Override
092    public Void call() throws Exception {
093      // Taking the region read lock prevents the individual region from being closed while a
094      // snapshot is in progress.  This is helpful but not sufficient for preventing races with
095      // snapshots that involve multiple regions and regionservers.  It is still possible to have
096      // an interleaving such that globally regions are missing, so we still need the verification
097      // step.
098      LOG.debug("Starting snapshot operation on " + region);
099      region.startRegionOperation(Operation.SNAPSHOT);
100      try {
101        if (skipFlush) {
102        /*
103         * This is to take an online-snapshot without force a coordinated flush to prevent pause
104         * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
105         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
106         * turned on/off based on the flush type.
107         * To minimized the code change, class name is not changed.
108         */
109          LOG.debug("take snapshot without flush memstore first");
110        } else {
111          LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
112          boolean succeeded = false;
113          long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED);
114          for (int i = 0; i < MAX_RETRIES; i++) {
115            FlushResult res = region.flush(true);
116            if (res.getResult() == FlushResult.Result.CANNOT_FLUSH) {
117              // CANNOT_FLUSH may mean that a flush is already on-going
118              // we need to wait for that flush to complete
119              region.waitForFlushes();
120              if (region.getMaxFlushedSeqId() >= readPt) {
121                // writes at the start of the snapshot have been persisted
122                succeeded = true;
123                break;
124              }
125            } else {
126              succeeded = true;
127              break;
128            }
129          }
130          if (!succeeded) {
131            throw new IOException("Unable to complete flush after " + MAX_RETRIES + " attempts");
132          }
133        }
134        region.addRegionToSnapshot(snapshotDesc, monitor);
135        if (skipFlush) {
136          LOG.debug("... SkipFlush Snapshotting region " + region.toString() + " completed.");
137        } else {
138          LOG.debug("... Flush Snapshotting region " + region.toString() + " completed.");
139        }
140      } finally {
141        LOG.debug("Closing snapshot operation on " + region);
142        region.closeRegionOperation(Operation.SNAPSHOT);
143      }
144      return null;
145    }
146  }
147
148  private void flushSnapshot() throws ForeignException {
149    if (regions.isEmpty()) {
150      // No regions on this RS, we are basically done.
151      return;
152    }
153
154    monitor.rethrowException();
155
156    // assert that the taskManager is empty.
157    if (taskManager.hasTasks()) {
158      throw new IllegalStateException("Attempting to take snapshot "
159          + ClientSnapshotDescriptionUtils.toString(snapshot)
160          + " but we currently have outstanding tasks");
161    }
162
163    // Add all hfiles already existing in region.
164    for (HRegion region : regions) {
165      // submit one task per region for parallelize by region.
166      taskManager.submitTask(new RegionSnapshotTask(region, snapshot, snapshotSkipFlush, monitor));
167      monitor.rethrowException();
168    }
169
170    // wait for everything to complete.
171    LOG.debug("Flush Snapshot Tasks submitted for " + regions.size() + " regions");
172    try {
173      taskManager.waitForOutstandingTasks();
174    } catch (InterruptedException e) {
175      LOG.error("got interrupted exception for " + getMemberName());
176      throw new ForeignException(getMemberName(), e);
177    }
178  }
179
180  /**
181   * do nothing, core of snapshot is executed in {@link #insideBarrier} step.
182   */
183  @Override
184  public void acquireBarrier() throws ForeignException {
185    // NO OP
186  }
187
188  /**
189   * do a flush snapshot of every region on this rs from the target table.
190   */
191  @Override
192  public byte[] insideBarrier() throws ForeignException {
193    flushSnapshot();
194    return new byte[0];
195  }
196
197  /**
198   * Cancel threads if they haven't finished.
199   */
200  @Override
201  public void cleanup(Exception e) {
202    LOG.info("Aborting all online FLUSH snapshot subprocedure task threads for '"
203        + snapshot.getName() + "' due to error", e);
204    try {
205      taskManager.cancelTasks();
206    } catch (InterruptedException e1) {
207      Thread.currentThread().interrupt();
208    }
209  }
210
211  /**
212   * Hooray!
213   */
214  public void releaseBarrier() {
215    // NO OP
216  }
217
218}