001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.DroppedSnapshotException;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionReplicaUtil;
037import org.apache.hadoop.hbase.errorhandling.ForeignException;
038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.procedure.ProcedureMember;
042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
044import org.apache.hadoop.hbase.procedure.Subprocedure;
045import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
051import org.apache.hadoop.hbase.util.Threads;
052import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
053import org.apache.yetus.audience.InterfaceAudience;
054import org.apache.yetus.audience.InterfaceStability;
055import org.apache.zookeeper.KeeperException;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
060
061import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
062
063/**
064 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
065 * <p>
066 * This provides the mechanism necessary to kick off a online snapshot specific
067 * {@link Subprocedure} that is responsible for the regions being served by this region server.
068 * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure
069 * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all
070 * others.
071 * <p>
072 * On startup, requires {@link #start()} to be called.
073 * <p>
074 * On shutdown, requires {@link #stop(boolean)} to be called
075 */
076@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
077@InterfaceStability.Unstable
078public class RegionServerSnapshotManager extends RegionServerProcedureManager {
079  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class);
080
081  /** Maximum number of snapshot region tasks that can run concurrently */
082  private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
083  private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
084
085  /** Conf key for number of request threads to start snapshots on regionservers */
086  public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
087  /** # of threads for snapshotting regions on the rs. */
088  public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
089
090  /** Conf key for max time to keep threads in snapshot request pool waiting */
091  public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
092  /** Keep threads alive in request pool for max of 300 seconds */
093  public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
094
095  /** Conf key for millis between checks to see if snapshot completed or if there are errors*/
096  public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
097  /** Default amount of time to check for errors while regions finish snapshotting */
098  private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
099
100  private RegionServerServices rss;
101  private ProcedureMemberRpcs memberRpcs;
102  private ProcedureMember member;
103
104  /**
105   * Exposed for testing.
106   * @param conf HBase configuration.
107   * @param parent parent running the snapshot handler
108   * @param memberRpc use specified memberRpc instance
109   * @param procMember use specified ProcedureMember
110   */
111   RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
112      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
113    this.rss = parent;
114    this.memberRpcs = memberRpc;
115    this.member = procMember;
116  }
117
118  public RegionServerSnapshotManager() {}
119
120  /**
121   * Start accepting snapshot requests.
122   */
123  @Override
124  public void start() {
125    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
126    this.memberRpcs.start(rss.getServerName().toString(), member);
127  }
128
129  /**
130   * Close <tt>this</tt> and all running snapshot tasks
131   * @param force forcefully stop all running tasks
132   * @throws IOException
133   */
134  @Override
135  public void stop(boolean force) throws IOException {
136    String mode = force ? "abruptly" : "gracefully";
137    LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
138
139    try {
140      this.member.close();
141    } finally {
142      this.memberRpcs.close();
143    }
144  }
145
146  /**
147   * If in a running state, creates the specified subprocedure for handling an online snapshot.
148   *
149   * Because this gets the local list of regions to snapshot and not the set the master had,
150   * there is a possibility of a race where regions may be missed.  This detected by the master in
151   * the snapshot verification step.
152   *
153   * @param snapshot
154   * @return Subprocedure to submit to the ProcedureMember.
155   */
156  public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
157
158    // don't run a snapshot if the parent is stop(ping)
159    if (rss.isStopping() || rss.isStopped()) {
160      throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
161          + ", because stopping/stopped!");
162    }
163
164    // check to see if this server is hosting any regions for the snapshots
165    // check to see if we have regions for the snapshot
166    List<HRegion> involvedRegions;
167    try {
168      involvedRegions = getRegionsToSnapshot(snapshot);
169    } catch (IOException e1) {
170      throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
171          + "something has gone awry with the online regions.", e1);
172    }
173
174    // We need to run the subprocedure even if we have no relevant regions.  The coordinator
175    // expects participation in the procedure and without sending message the snapshot attempt
176    // will hang and fail.
177
178    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
179        + snapshot.getTable() + " type " + snapshot.getType());
180    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
181    Configuration conf = rss.getConfiguration();
182    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
183        SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
184    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
185        SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
186
187    switch (snapshot.getType()) {
188    case FLUSH:
189      SnapshotSubprocedurePool taskManager =
190        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
191      return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
192          timeoutMillis, involvedRegions, snapshot, taskManager);
193    case SKIPFLUSH:
194        /*
195         * This is to take an online-snapshot without force a coordinated flush to prevent pause
196         * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
197         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
198         * turned on/off based on the flush type.
199         * To minimized the code change, class name is not changed.
200         */
201        SnapshotSubprocedurePool taskManager2 =
202            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
203        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
204            timeoutMillis, involvedRegions, snapshot, taskManager2);
205
206    default:
207      throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
208    }
209  }
210
211  /**
212   * Determine if the snapshot should be handled on this server
213   *
214   * NOTE: This is racy -- the master expects a list of regionservers.
215   * This means if a region moves somewhere between the calls we'll miss some regions.
216   * For example, a region move during a snapshot could result in a region to be skipped or done
217   * twice.  This is manageable because the {@link MasterSnapshotVerifier} will double check the
218   * region lists after the online portion of the snapshot completes and will explicitly fail the
219   * snapshot.
220   *
221   * @param snapshot
222   * @return the list of online regions. Empty list is returned if no regions are responsible for
223   *         the given snapshot.
224   * @throws IOException
225   */
226  private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
227    List<HRegion> onlineRegions = (List<HRegion>) rss
228        .getRegions(TableName.valueOf(snapshot.getTable()));
229    Iterator<HRegion> iterator = onlineRegions.iterator();
230    // remove the non-default regions
231    while (iterator.hasNext()) {
232      HRegion r = iterator.next();
233      if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
234        iterator.remove();
235      }
236    }
237    return onlineRegions;
238  }
239
240  /**
241   * Build the actual snapshot runner that will do all the 'hard' work
242   */
243  public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
244
245    @Override
246    public Subprocedure buildSubprocedure(String name, byte[] data) {
247      try {
248        // unwrap the snapshot information
249        SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
250        return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
251      } catch (IOException e) {
252        throw new IllegalArgumentException("Could not read snapshot information from request.");
253      }
254    }
255
256  }
257
258  /**
259   * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
260   * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
261   *
262   * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
263   * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
264   * failures.
265   *
266   * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
267   * really built for coordinated tasks where multiple threads as part of one larger task.  In
268   * RS's the HBase Executor services are only used for open and close and not other threadpooled
269   * operations such as compactions and replication  sinks.
270   */
271  static class SnapshotSubprocedurePool {
272    private final Abortable abortable;
273    private final ExecutorCompletionService<Void> taskPool;
274    private final ThreadPoolExecutor executor;
275    private volatile boolean stopped;
276    private final List<Future<Void>> futures = new ArrayList<>();
277    private final String name;
278
279    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
280      this.abortable = abortable;
281      // configure the executor service
282      long keepAlive = conf.getLong(
283        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
284        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
285      int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
286      this.name = name;
287      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
288        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
289          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
290      taskPool = new ExecutorCompletionService<>(executor);
291    }
292
293    boolean hasTasks() {
294      return futures.size() != 0;
295    }
296
297    /**
298     * Submit a task to the pool.
299     *
300     * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
301     * version does not support issuing tasks from multiple concurrent table snapshots requests.
302     */
303    void submitTask(final Callable<Void> task) {
304      Future<Void> f = this.taskPool.submit(task);
305      futures.add(f);
306    }
307
308    /**
309     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
310     * This *must* be called after all tasks are submitted via submitTask.
311     *
312     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
313     * @throws InterruptedException
314     * @throws SnapshotCreationException if the snapshot failed while we were waiting
315     */
316    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
317      LOG.debug("Waiting for local region snapshots to finish.");
318
319      int sz = futures.size();
320      try {
321        // Using the completion service to process the futures that finish first first.
322        for (int i = 0; i < sz; i++) {
323          Future<Void> f = taskPool.take();
324          f.get();
325          if (!futures.remove(f)) {
326            LOG.warn("unexpected future" + f);
327          }
328          LOG.debug("Completed " + (i+1) + "/" + sz +  " local region snapshots.");
329        }
330        LOG.debug("Completed " + sz +  " local region snapshots.");
331        return true;
332      } catch (InterruptedException e) {
333        LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
334        if (!stopped) {
335          Thread.currentThread().interrupt();
336          throw new ForeignException("SnapshotSubprocedurePool", e);
337        }
338        // we are stopped so we can just exit.
339      } catch (ExecutionException e) {
340        Throwable cause = e.getCause();
341        if (cause instanceof ForeignException) {
342          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
343          throw (ForeignException)e.getCause();
344        } else if (cause instanceof DroppedSnapshotException) {
345          // we have to abort the region server according to contract of flush
346          abortable.abort("Received DroppedSnapshotException, aborting", cause);
347        }
348        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
349        throw new ForeignException(name, e.getCause());
350      } finally {
351        cancelTasks();
352      }
353      return false;
354    }
355
356    /**
357     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
358     * @throws InterruptedException
359     */
360    void cancelTasks() throws InterruptedException {
361      Collection<Future<Void>> tasks = futures;
362      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
363      for (Future<Void> f: tasks) {
364        // TODO Ideally we'd interrupt hbase threads when we cancel.  However it seems that there
365        // are places in the HBase code where row/region locks are taken and not released in a
366        // finally block.  Thus we cancel without interrupting.  Cancellations will be slower to
367        // complete but we won't suffer from unreleased locks due to poor code discipline.
368        f.cancel(false);
369      }
370
371      // evict remaining tasks and futures from taskPool.
372      futures.clear();
373      while (taskPool.poll() != null) {}
374      stop();
375    }
376
377    /**
378     * Abruptly shutdown the thread pool.  Call when exiting a region server.
379     */
380    void stop() {
381      if (this.stopped) return;
382
383      this.stopped = true;
384      this.executor.shutdown();
385    }
386  }
387
388  /**
389   * Create a default snapshot handler - uses a zookeeper based member controller.
390   * @param rss region server running the handler
391   * @throws KeeperException if the zookeeper cluster cannot be reached
392   */
393  @Override
394  public void initialize(RegionServerServices rss) throws KeeperException {
395    this.rss = rss;
396    ZKWatcher zkw = rss.getZooKeeper();
397    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
398        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
399
400    // read in the snapshot request configuration properties
401    Configuration conf = rss.getConfiguration();
402    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
403    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
404
405    // create the actual snapshot procedure member
406    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
407      opThreads, keepAlive);
408    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
409  }
410
411  @Override
412  public String getProcedureSignature() {
413    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
414  }
415
416}