001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.Abortable;
033import org.apache.hadoop.hbase.DroppedSnapshotException;
034import org.apache.hadoop.hbase.HBaseInterfaceAudience;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.RegionReplicaUtil;
037import org.apache.hadoop.hbase.errorhandling.ForeignException;
038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
041import org.apache.hadoop.hbase.procedure.ProcedureMember;
042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
044import org.apache.hadoop.hbase.procedure.Subprocedure;
045import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
047import org.apache.hadoop.hbase.regionserver.HRegion;
048import org.apache.hadoop.hbase.regionserver.HRegionServer;
049import org.apache.hadoop.hbase.regionserver.RegionServerServices;
050import org.apache.hadoop.hbase.util.Threads;
051import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
052import org.apache.yetus.audience.InterfaceAudience;
053import org.apache.yetus.audience.InterfaceStability;
054import org.apache.zookeeper.KeeperException;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
059
060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
061
062/**
063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
064 * <p>
065 * This provides the mechanism necessary to kick off a online snapshot specific {@link Subprocedure}
066 * that is responsible for the regions being served by this region server. If any failures occur
067 * with the subprocedure, the RegionSeverSnapshotManager's subprocedure handler,
068 * {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all others.
069 * <p>
070 * On startup, requires {@link #start()} to be called.
071 * <p>
072 * On shutdown, requires {@link #stop(boolean)} to be called
073 */
074@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
075@InterfaceStability.Unstable
076public class RegionServerSnapshotManager extends RegionServerProcedureManager {
077  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class);
078
079  /** Maximum number of snapshot region tasks that can run concurrently */
080  private static final String CONCURENT_SNAPSHOT_TASKS_KEY =
081    "hbase.snapshot.region.concurrentTasks";
082  private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
083
084  /** Conf key for number of request threads to start snapshots on regionservers */
085  public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
086  /** # of threads for snapshotting regions on the rs. */
087  public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
088
089  /** Conf key for max time to keep threads in snapshot request pool waiting */
090  public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
091  /** Keep threads alive in request pool for max of 300 seconds */
092  public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
093
094  /** Conf key for millis between checks to see if snapshot completed or if there are errors */
095  public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY =
096    "hbase.snapshot.region.wakefrequency";
097  /** Default amount of time to check for errors while regions finish snapshotting */
098  private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
099
100  private RegionServerServices rss;
101  private ProcedureMemberRpcs memberRpcs;
102  private ProcedureMember member;
103
104  /**
105   * Exposed for testing.
106   * @param conf       HBase configuration.
107   * @param parent     parent running the snapshot handler
108   * @param memberRpc  use specified memberRpc instance
109   * @param procMember use specified ProcedureMember
110   */
111  RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
112    ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
113    this.rss = parent;
114    this.memberRpcs = memberRpc;
115    this.member = procMember;
116  }
117
118  public RegionServerSnapshotManager() {
119  }
120
121  /**
122   * Start accepting snapshot requests.
123   */
124  @Override
125  public void start() {
126    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
127    this.memberRpcs.start(rss.getServerName().toString(), member);
128  }
129
130  /**
131   * Close <tt>this</tt> and all running snapshot tasks
132   * @param force forcefully stop all running tasks n
133   */
134  @Override
135  public void stop(boolean force) throws IOException {
136    String mode = force ? "abruptly" : "gracefully";
137    LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
138
139    try {
140      this.member.close();
141    } finally {
142      this.memberRpcs.close();
143    }
144  }
145
146  /**
147   * If in a running state, creates the specified subprocedure for handling an online snapshot.
148   * Because this gets the local list of regions to snapshot and not the set the master had, there
149   * is a possibility of a race where regions may be missed. This detected by the master in the
150   * snapshot verification step. n * @return Subprocedure to submit to the ProcedureMember.
151   */
152  public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
153
154    // don't run a snapshot if the parent is stop(ping)
155    if (rss.isStopping() || rss.isStopped()) {
156      throw new IllegalStateException(
157        "Can't start snapshot on RS: " + rss.getServerName() + ", because stopping/stopped!");
158    }
159
160    // check to see if this server is hosting any regions for the snapshots
161    // check to see if we have regions for the snapshot
162    List<HRegion> involvedRegions;
163    try {
164      involvedRegions = getRegionsToSnapshot(snapshot);
165    } catch (IOException e1) {
166      throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
167        + "something has gone awry with the online regions.", e1);
168    }
169
170    // We need to run the subprocedure even if we have no relevant regions. The coordinator
171    // expects participation in the procedure and without sending message the snapshot attempt
172    // will hang and fail.
173
174    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
175      + snapshot.getTable() + " type " + snapshot.getType());
176    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
177    Configuration conf = rss.getConfiguration();
178    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
179    long wakeMillis =
180      conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
181
182    switch (snapshot.getType()) {
183      case FLUSH:
184        SnapshotSubprocedurePool taskManager =
185          new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
186        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
187          involvedRegions, snapshot, taskManager);
188      case SKIPFLUSH:
189        /*
190         * This is to take an online-snapshot without force a coordinated flush to prevent pause The
191         * snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
192         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
193         * turned on/off based on the flush type. To minimized the code change, class name is not
194         * changed.
195         */
196        SnapshotSubprocedurePool taskManager2 =
197          new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
198        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis,
199          involvedRegions, snapshot, taskManager2);
200
201      default:
202        throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
203    }
204  }
205
206  /**
207   * Determine if the snapshot should be handled on this server NOTE: This is racy -- the master
208   * expects a list of regionservers. This means if a region moves somewhere between the calls we'll
209   * miss some regions. For example, a region move during a snapshot could result in a region to be
210   * skipped or done twice. This is manageable because the {@link MasterSnapshotVerifier} will
211   * double check the region lists after the online portion of the snapshot completes and will
212   * explicitly fail the snapshot. n * @return the list of online regions. Empty list is returned if
213   * no regions are responsible for the given snapshot. n
214   */
215  private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
216    List<HRegion> onlineRegions =
217      (List<HRegion>) rss.getRegions(TableName.valueOf(snapshot.getTable()));
218    Iterator<HRegion> iterator = onlineRegions.iterator();
219    // remove the non-default regions
220    while (iterator.hasNext()) {
221      HRegion r = iterator.next();
222      if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
223        iterator.remove();
224      }
225    }
226    return onlineRegions;
227  }
228
229  /**
230   * Build the actual snapshot runner that will do all the 'hard' work
231   */
232  public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
233
234    @Override
235    public Subprocedure buildSubprocedure(String name, byte[] data) {
236      try {
237        // unwrap the snapshot information
238        SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
239        return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
240      } catch (IOException e) {
241        throw new IllegalArgumentException("Could not read snapshot information from request.");
242      }
243    }
244
245  }
246
247  /**
248   * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
249   * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a
250   * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed
251   * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures.
252   * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
253   * really built for coordinated tasks where multiple threads as part of one larger task. In RS's
254   * the HBase Executor services are only used for open and close and not other threadpooled
255   * operations such as compactions and replication sinks.
256   */
257  static class SnapshotSubprocedurePool {
258    private final Abortable abortable;
259    private final ExecutorCompletionService<Void> taskPool;
260    private final ThreadPoolExecutor executor;
261    private volatile boolean stopped;
262    private final List<Future<Void>> futures = new ArrayList<>();
263    private final String name;
264
265    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
266      this.abortable = abortable;
267      // configure the executor service
268      long keepAlive = conf.getLong(RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
269        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
270      int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
271      this.name = name;
272      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
273        new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d")
274          .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
275      taskPool = new ExecutorCompletionService<>(executor);
276    }
277
278    boolean hasTasks() {
279      return futures.size() != 0;
280    }
281
282    /**
283     * Submit a task to the pool. NOTE: all must be submitted before you can safely
284     * {@link #waitForOutstandingTasks()}. This version does not support issuing tasks from multiple
285     * concurrent table snapshots requests.
286     */
287    void submitTask(final Callable<Void> task) {
288      Future<Void> f = this.taskPool.submit(task);
289      futures.add(f);
290    }
291
292    /**
293     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
294     * This *must* be called after all tasks are submitted via submitTask.
295     * @return <tt>true</tt> on success, <tt>false</tt> otherwise n * @throws
296     *         SnapshotCreationException if the snapshot failed while we were waiting
297     */
298    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
299      LOG.debug("Waiting for local region snapshots to finish.");
300
301      int sz = futures.size();
302      try {
303        // Using the completion service to process the futures that finish first first.
304        for (int i = 0; i < sz; i++) {
305          Future<Void> f = taskPool.take();
306          f.get();
307          if (!futures.remove(f)) {
308            LOG.warn("unexpected future" + f);
309          }
310          LOG.debug("Completed " + (i + 1) + "/" + sz + " local region snapshots.");
311        }
312        LOG.debug("Completed " + sz + " local region snapshots.");
313        return true;
314      } catch (InterruptedException e) {
315        LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
316        if (!stopped) {
317          Thread.currentThread().interrupt();
318          throw new ForeignException("SnapshotSubprocedurePool", e);
319        }
320        // we are stopped so we can just exit.
321      } catch (ExecutionException e) {
322        Throwable cause = e.getCause();
323        if (cause instanceof ForeignException) {
324          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
325          throw (ForeignException) e.getCause();
326        } else if (cause instanceof DroppedSnapshotException) {
327          // we have to abort the region server according to contract of flush
328          abortable.abort("Received DroppedSnapshotException, aborting", cause);
329        }
330        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
331        throw new ForeignException(name, e.getCause());
332      } finally {
333        cancelTasks();
334      }
335      return false;
336    }
337
338    /**
339     * This attempts to cancel out all pending and in progress tasks (interruptions issues) n
340     */
341    void cancelTasks() throws InterruptedException {
342      Collection<Future<Void>> tasks = futures;
343      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
344      for (Future<Void> f : tasks) {
345        // TODO Ideally we'd interrupt hbase threads when we cancel. However it seems that there
346        // are places in the HBase code where row/region locks are taken and not released in a
347        // finally block. Thus we cancel without interrupting. Cancellations will be slower to
348        // complete but we won't suffer from unreleased locks due to poor code discipline.
349        f.cancel(false);
350      }
351
352      // evict remaining tasks and futures from taskPool.
353      futures.clear();
354      while (taskPool.poll() != null) {
355      }
356      stop();
357    }
358
359    /**
360     * Abruptly shutdown the thread pool. Call when exiting a region server.
361     */
362    void stop() {
363      if (this.stopped) return;
364
365      this.stopped = true;
366      this.executor.shutdown();
367    }
368  }
369
370  /**
371   * Create a default snapshot handler - uses a zookeeper based member controller.
372   * @param rss region server running the handler
373   * @throws KeeperException if the zookeeper cluster cannot be reached
374   */
375  @Override
376  public void initialize(RegionServerServices rss) throws KeeperException {
377    this.rss = rss;
378    ZKWatcher zkw = rss.getZooKeeper();
379    this.memberRpcs =
380      new ZKProcedureMemberRpcs(zkw, SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
381
382    // read in the snapshot request configuration properties
383    Configuration conf = rss.getConfiguration();
384    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
385    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
386
387    // create the actual snapshot procedure member
388    ThreadPoolExecutor pool =
389      ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
390    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
391  }
392
393  @Override
394  public String getProcedureSignature() {
395    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
396  }
397
398}