001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.DroppedSnapshotException;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionReplicaUtil;
037import org.apache.hadoop.hbase.errorhandling.ForeignException;
038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.procedure.ProcedureMember;
042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
044import org.apache.hadoop.hbase.procedure.Subprocedure;
045import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.yetus.audience.InterfaceStability;
054import org.apache.zookeeper.KeeperException;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
061
062/**
063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
064 * <p>
065 * This provides the mechanism necessary to kick off a online snapshot specific {@link Subprocedure}
066 * that is responsible for the regions being served by this region server. If any failures occur
067 * with the subprocedure, the RegionSeverSnapshotManager's subprocedure handler,
068 * {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all others.
069 * <p>
070 * On startup, requires {@link #start()} to be called.
071 * <p>
072 * On shutdown, requires {@link #stop(boolean)} to be called
073 */
074@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
075@InterfaceStability.Unstable
076public class RegionServerSnapshotManager extends RegionServerProcedureManager {
077  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class);
078
079  /** Maximum number of snapshot region tasks that can run concurrently */
080  private static final String CONCURENT_SNAPSHOT_TASKS_KEY =
081    "hbase.snapshot.region.concurrentTasks";
082  private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
083
084  /** Conf key for number of request threads to start snapshots on regionservers */
085  public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
086  /** # of threads for snapshotting regions on the rs. */
087  public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
088
089  /** Conf key for max time to keep threads in snapshot request pool waiting */
090  public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
091  /** Keep threads alive in request pool for max of 300 seconds */
092  public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
093
094  /** Conf key for millis between checks to see if snapshot completed or if there are errors */
095  public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY =
096    "hbase.snapshot.region.wakefrequency";
097  /** Default amount of time to check for errors while regions finish snapshotting */
098  private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
099
100  private RegionServerServices rss;
101  private ProcedureMemberRpcs memberRpcs;
102  private ProcedureMember member;
103
104  /**
105   * Exposed for testing.
106   * @param conf       HBase configuration.
107   * @param parent     parent running the snapshot handler
108   * @param memberRpc  use specified memberRpc instance
109   * @param procMember use specified ProcedureMember
110   */
111  RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
112    ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
113    this.rss = parent;
114    this.memberRpcs = memberRpc;
115    this.member = procMember;
116  }
117
118  public RegionServerSnapshotManager() {
119  }
120
121  /**
122   * Start accepting snapshot requests.
123   */
124  @Override
125  public void start() {
126    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
127    this.memberRpcs.start(rss.getServerName().toString(), member);
128  }
129
130  /**
131   * Close <tt>this</tt> and all running snapshot tasks
132   * @param force forcefully stop all running tasks
133   */
134  @Override
135  public void stop(boolean force) throws IOException {
136    String mode = force ? "abruptly" : "gracefully";
137    LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
138
139    try {
140      this.member.close();
141    } finally {
142      this.memberRpcs.close();
143    }
144  }
145
146  /**
147   * If in a running state, creates the specified subprocedure for handling an online snapshot.
148   * Because this gets the local list of regions to snapshot and not the set the master had, there
149   * is a possibility of a race where regions may be missed. This detected by the master in the
150   * snapshot verification step.
151   * @return Subprocedure to submit to the ProcedureMember.
152   */
153  public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
154
155    // don't run a snapshot if the parent is stop(ping)
156    if (rss.isStopping() || rss.isStopped()) {
157      throw new IllegalStateException(
158        "Can't start snapshot on RS: " + rss.getServerName() + ", because stopping/stopped!");
159    }
160
161    // check to see if this server is hosting any regions for the snapshots
162    // check to see if we have regions for the snapshot
163    List<HRegion> involvedRegions;
164    try {
165      involvedRegions = getRegionsToSnapshot(snapshot);
166    } catch (IOException e1) {
167      throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
168        + "something has gone awry with the online regions.", e1);
169    }
170
171    // We need to run the subprocedure even if we have no relevant regions. The coordinator
172    // expects participation in the procedure and without sending message the snapshot attempt
173    // will hang and fail.
174
175    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
176      + snapshot.getTable() + " type " + snapshot.getType());
177    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
178    Configuration conf = rss.getConfiguration();
179    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
180    long wakeMillis =
181      conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
182
183    switch (snapshot.getType()) {
184      case FLUSH:
185        SnapshotSubprocedurePool taskManager =
186          new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
187        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
188          involvedRegions, snapshot, taskManager);
189      case SKIPFLUSH:
190        /*
191         * This is to take an online-snapshot without force a coordinated flush to prevent pause The
192         * snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
193         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
194         * turned on/off based on the flush type. To minimized the code change, class name is not
195         * changed.
196         */
197        SnapshotSubprocedurePool taskManager2 =
198          new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
199        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
200          involvedRegions, snapshot, taskManager2);
201
202      default:
203        throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
204    }
205  }
206
207  /**
208   * Determine if the snapshot should be handled on this server NOTE: This is racy -- the master
209   * expects a list of regionservers. This means if a region moves somewhere between the calls we'll
210   * miss some regions. For example, a region move during a snapshot could result in a region to be
211   * skipped or done twice. This is manageable because the {@link MasterSnapshotVerifier} will
212   * double check the region lists after the online portion of the snapshot completes and will
213   * explicitly fail the snapshot.
214   * @return the list of online regions. Empty list is returned if no regions are responsible for
215   *         the given snapshot.
216   */
217  private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
218    List<HRegion> onlineRegions =
219      (List<HRegion>) rss.getRegions(TableName.valueOf(snapshot.getTable()));
220    Iterator<HRegion> iterator = onlineRegions.iterator();
221    // remove the non-default regions
222    while (iterator.hasNext()) {
223      HRegion r = iterator.next();
224      if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
225        iterator.remove();
226      }
227    }
228    return onlineRegions;
229  }
230
231  /**
232   * Build the actual snapshot runner that will do all the 'hard' work
233   */
234  public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
235
236    @Override
237    public Subprocedure buildSubprocedure(String name, byte[] data) {
238      try {
239        // unwrap the snapshot information
240        SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
241        return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
242      } catch (IOException e) {
243        throw new IllegalArgumentException("Could not read snapshot information from request.");
244      }
245    }
246
247  }
248
249  /**
250   * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
251   * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a
252   * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed
253   * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures.
254   * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
255   * really built for coordinated tasks where multiple threads as part of one larger task. In RS's
256   * the HBase Executor services are only used for open and close and not other threadpooled
257   * operations such as compactions and replication sinks.
258   */
259  static class SnapshotSubprocedurePool {
260    private final Abortable abortable;
261    private final ExecutorCompletionService<Void> taskPool;
262    private final ThreadPoolExecutor executor;
263    private volatile boolean stopped;
264    private final List<Future<Void>> futures = new ArrayList<>();
265    private final String name;
266
267    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
268      this.abortable = abortable;
269      // configure the executor service
270      long keepAlive = conf.getLong(RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
271        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
272      int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
273      this.name = name;
274      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
275        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
276          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
277      taskPool = new ExecutorCompletionService<>(executor);
278    }
279
280    boolean hasTasks() {
281      return futures.size() != 0;
282    }
283
284    /**
285     * Submit a task to the pool. NOTE: all must be submitted before you can safely
286     * {@link #waitForOutstandingTasks()}. This version does not support issuing tasks from multiple
287     * concurrent table snapshots requests.
288     */
289    void submitTask(final Callable<Void> task) {
290      Future<Void> f = this.taskPool.submit(task);
291      futures.add(f);
292    }
293
294    /**
295     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
296     * This *must* be called after all tasks are submitted via submitTask.
297     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
298     * @throws SnapshotCreationException if the snapshot failed while we were waiting
299     */
300    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
301      LOG.debug("Waiting for local region snapshots to finish.");
302
303      int sz = futures.size();
304      try {
305        // Using the completion service to process the futures that finish first first.
306        for (int i = 0; i < sz; i++) {
307          Future<Void> f = taskPool.take();
308          f.get();
309          if (!futures.remove(f)) {
310            LOG.warn("unexpected future" + f);
311          }
312          LOG.debug("Completed " + (i + 1) + "/" + sz + " local region snapshots.");
313        }
314        LOG.debug("Completed " + sz + " local region snapshots.");
315        return true;
316      } catch (InterruptedException e) {
317        LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
318        if (!stopped) {
319          Thread.currentThread().interrupt();
320          throw new ForeignException("SnapshotSubprocedurePool", e);
321        }
322        // we are stopped so we can just exit.
323      } catch (ExecutionException e) {
324        Throwable cause = e.getCause();
325        if (cause instanceof ForeignException) {
326          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
327          throw (ForeignException) e.getCause();
328        } else if (cause instanceof DroppedSnapshotException) {
329          // we have to abort the region server according to contract of flush
330          abortable.abort("Received DroppedSnapshotException, aborting", cause);
331        }
332        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
333        throw new ForeignException(name, e.getCause());
334      } finally {
335        cancelTasks();
336      }
337      return false;
338    }
339
340    /**
341     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
342     */
343    void cancelTasks() throws InterruptedException {
344      Collection<Future<Void>> tasks = futures;
345      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
346      for (Future<Void> f : tasks) {
347        // TODO Ideally we'd interrupt hbase threads when we cancel. However it seems that there
348        // are places in the HBase code where row/region locks are taken and not released in a
349        // finally block. Thus we cancel without interrupting. Cancellations will be slower to
350        // complete but we won't suffer from unreleased locks due to poor code discipline.
351        f.cancel(false);
352      }
353
354      // evict remaining tasks and futures from taskPool.
355      futures.clear();
356      while (taskPool.poll() != null) {
357      }
358      stop();
359    }
360
361    /**
362     * Abruptly shutdown the thread pool. Call when exiting a region server.
363     */
364    void stop() {
365      if (this.stopped) return;
366
367      this.stopped = true;
368      this.executor.shutdown();
369    }
370  }
371
372  /**
373   * Create a default snapshot handler - uses a zookeeper based member controller.
374   * @param rss region server running the handler
375   * @throws KeeperException if the zookeeper cluster cannot be reached
376   */
377  @Override
378  public void initialize(RegionServerServices rss) throws KeeperException {
379    this.rss = rss;
380    ZKWatcher zkw = rss.getZooKeeper();
381    this.memberRpcs =
382      new ZKProcedureMemberRpcs(zkw, SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
383
384    // read in the snapshot request configuration properties
385    Configuration conf = rss.getConfiguration();
386    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
387    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
388
389    // create the actual snapshot procedure member
390    ThreadPoolExecutor pool =
391      ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
392    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
393  }
394
395  @Override
396  public String getProcedureSignature() {
397    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
398  }
399
400}