001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.Future;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.DaemonThreadFactory;
036import org.apache.hadoop.hbase.DroppedSnapshotException;
037import org.apache.hadoop.hbase.HBaseInterfaceAudience;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.apache.hadoop.hbase.client.RegionReplicaUtil;
043import org.apache.hadoop.hbase.errorhandling.ForeignException;
044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
045import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
046import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
047import org.apache.hadoop.hbase.procedure.ProcedureMember;
048import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
049import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
050import org.apache.hadoop.hbase.procedure.Subprocedure;
051import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
052import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
057import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
058import org.apache.zookeeper.KeeperException;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
064 * <p>
065 * This provides the mechanism necessary to kick off a online snapshot specific
066 * {@link Subprocedure} that is responsible for the regions being served by this region server.
067 * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure
068 * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all
069 * others.
070 * <p>
071 * On startup, requires {@link #start()} to be called.
072 * <p>
073 * On shutdown, requires {@link #stop(boolean)} to be called
074 */
075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
076@InterfaceStability.Unstable
077public class RegionServerSnapshotManager extends RegionServerProcedureManager {
078  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class);
079
080  /** Maximum number of snapshot region tasks that can run concurrently */
081  private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
082  private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
083
084  /** Conf key for number of request threads to start snapshots on regionservers */
085  public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
086  /** # of threads for snapshotting regions on the rs. */
087  public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
088
089  /** Conf key for max time to keep threads in snapshot request pool waiting */
090  public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
091  /** Keep threads alive in request pool for max of 300 seconds */
092  public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
093
094  /** Conf key for millis between checks to see if snapshot completed or if there are errors*/
095  public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
096  /** Default amount of time to check for errors while regions finish snapshotting */
097  private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
098
099  private RegionServerServices rss;
100  private ProcedureMemberRpcs memberRpcs;
101  private ProcedureMember member;
102
103  /**
104   * Exposed for testing.
105   * @param conf HBase configuration.
106   * @param parent parent running the snapshot handler
107   * @param memberRpc use specified memberRpc instance
108   * @param procMember use specified ProcedureMember
109   */
110   RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
111      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
112    this.rss = parent;
113    this.memberRpcs = memberRpc;
114    this.member = procMember;
115  }
116
117  public RegionServerSnapshotManager() {}
118
119  /**
120   * Start accepting snapshot requests.
121   */
122  @Override
123  public void start() {
124    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
125    this.memberRpcs.start(rss.getServerName().toString(), member);
126  }
127
128  /**
129   * Close <tt>this</tt> and all running snapshot tasks
130   * @param force forcefully stop all running tasks
131   * @throws IOException
132   */
133  @Override
134  public void stop(boolean force) throws IOException {
135    String mode = force ? "abruptly" : "gracefully";
136    LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
137
138    try {
139      this.member.close();
140    } finally {
141      this.memberRpcs.close();
142    }
143  }
144
145  /**
146   * If in a running state, creates the specified subprocedure for handling an online snapshot.
147   *
148   * Because this gets the local list of regions to snapshot and not the set the master had,
149   * there is a possibility of a race where regions may be missed.  This detected by the master in
150   * the snapshot verification step.
151   *
152   * @param snapshot
153   * @return Subprocedure to submit to the ProcedureMemeber.
154   */
155  public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
156
157    // don't run a snapshot if the parent is stop(ping)
158    if (rss.isStopping() || rss.isStopped()) {
159      throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
160          + ", because stopping/stopped!");
161    }
162
163    // check to see if this server is hosting any regions for the snapshots
164    // check to see if we have regions for the snapshot
165    List<HRegion> involvedRegions;
166    try {
167      involvedRegions = getRegionsToSnapshot(snapshot);
168    } catch (IOException e1) {
169      throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
170          + "something has gone awry with the online regions.", e1);
171    }
172
173    // We need to run the subprocedure even if we have no relevant regions.  The coordinator
174    // expects participation in the procedure and without sending message the snapshot attempt
175    // will hang and fail.
176
177    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
178        + snapshot.getTable() + " type " + snapshot.getType());
179    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
180    Configuration conf = rss.getConfiguration();
181    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
182        SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
183    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
184        SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
185
186    switch (snapshot.getType()) {
187    case FLUSH:
188      SnapshotSubprocedurePool taskManager =
189        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
190      return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
191          timeoutMillis, involvedRegions, snapshot, taskManager);
192    case SKIPFLUSH:
193        /*
194         * This is to take an online-snapshot without force a coordinated flush to prevent pause
195         * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
196         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
197         * turned on/off based on the flush type.
198         * To minimized the code change, class name is not changed.
199         */
200        SnapshotSubprocedurePool taskManager2 =
201            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
202        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
203            timeoutMillis, involvedRegions, snapshot, taskManager2);
204
205    default:
206      throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
207    }
208  }
209
210  /**
211   * Determine if the snapshot should be handled on this server
212   *
213   * NOTE: This is racy -- the master expects a list of regionservers.
214   * This means if a region moves somewhere between the calls we'll miss some regions.
215   * For example, a region move during a snapshot could result in a region to be skipped or done
216   * twice.  This is manageable because the {@link MasterSnapshotVerifier} will double check the
217   * region lists after the online portion of the snapshot completes and will explicitly fail the
218   * snapshot.
219   *
220   * @param snapshot
221   * @return the list of online regions. Empty list is returned if no regions are responsible for
222   *         the given snapshot.
223   * @throws IOException
224   */
225  private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
226    List<HRegion> onlineRegions = (List<HRegion>) rss
227        .getRegions(TableName.valueOf(snapshot.getTable()));
228    Iterator<HRegion> iterator = onlineRegions.iterator();
229    // remove the non-default regions
230    while (iterator.hasNext()) {
231      HRegion r = iterator.next();
232      if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
233        iterator.remove();
234      }
235    }
236    return onlineRegions;
237  }
238
239  /**
240   * Build the actual snapshot runner that will do all the 'hard' work
241   */
242  public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
243
244    @Override
245    public Subprocedure buildSubprocedure(String name, byte[] data) {
246      try {
247        // unwrap the snapshot information
248        SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
249        return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
250      } catch (IOException e) {
251        throw new IllegalArgumentException("Could not read snapshot information from request.");
252      }
253    }
254
255  }
256
257  /**
258   * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
259   * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
260   *
261   * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
262   * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
263   * failures.
264   *
265   * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
266   * really built for coordinated tasks where multiple threads as part of one larger task.  In
267   * RS's the HBase Executor services are only used for open and close and not other threadpooled
268   * operations such as compactions and replication  sinks.
269   */
270  static class SnapshotSubprocedurePool {
271    private final Abortable abortable;
272    private final ExecutorCompletionService<Void> taskPool;
273    private final ThreadPoolExecutor executor;
274    private volatile boolean stopped;
275    private final List<Future<Void>> futures = new ArrayList<>();
276    private final String name;
277
278    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
279      this.abortable = abortable;
280      // configure the executor service
281      long keepAlive = conf.getLong(
282        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
283        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
284      int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
285      this.name = name;
286      executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
287          new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
288              + name + ")-snapshot-pool"));
289      executor.allowCoreThreadTimeOut(true);
290      taskPool = new ExecutorCompletionService<>(executor);
291    }
292
293    boolean hasTasks() {
294      return futures.size() != 0;
295    }
296
297    /**
298     * Submit a task to the pool.
299     *
300     * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
301     * version does not support issuing tasks from multiple concurrent table snapshots requests.
302     */
303    void submitTask(final Callable<Void> task) {
304      Future<Void> f = this.taskPool.submit(task);
305      futures.add(f);
306    }
307
308    /**
309     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
310     * This *must* be called after all tasks are submitted via submitTask.
311     *
312     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
313     * @throws InterruptedException
314     * @throws SnapshotCreationException if the snapshot failed while we were waiting
315     */
316    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
317      LOG.debug("Waiting for local region snapshots to finish.");
318
319      int sz = futures.size();
320      try {
321        // Using the completion service to process the futures that finish first first.
322        for (int i = 0; i < sz; i++) {
323          Future<Void> f = taskPool.take();
324          f.get();
325          if (!futures.remove(f)) {
326            LOG.warn("unexpected future" + f);
327          }
328          LOG.debug("Completed " + (i+1) + "/" + sz +  " local region snapshots.");
329        }
330        LOG.debug("Completed " + sz +  " local region snapshots.");
331        return true;
332      } catch (InterruptedException e) {
333        LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
334        if (!stopped) {
335          Thread.currentThread().interrupt();
336          throw new ForeignException("SnapshotSubprocedurePool", e);
337        }
338        // we are stopped so we can just exit.
339      } catch (ExecutionException e) {
340        Throwable cause = e.getCause();
341        if (cause instanceof ForeignException) {
342          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
343          throw (ForeignException)e.getCause();
344        } else if (cause instanceof DroppedSnapshotException) {
345          // we have to abort the region server according to contract of flush
346          abortable.abort("Received DroppedSnapshotException, aborting", cause);
347        }
348        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
349        throw new ForeignException(name, e.getCause());
350      } finally {
351        cancelTasks();
352      }
353      return false;
354    }
355
356    /**
357     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
358     * @throws InterruptedException
359     */
360    void cancelTasks() throws InterruptedException {
361      Collection<Future<Void>> tasks = futures;
362      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
363      for (Future<Void> f: tasks) {
364        // TODO Ideally we'd interrupt hbase threads when we cancel.  However it seems that there
365        // are places in the HBase code where row/region locks are taken and not released in a
366        // finally block.  Thus we cancel without interrupting.  Cancellations will be slower to
367        // complete but we won't suffer from unreleased locks due to poor code discipline.
368        f.cancel(false);
369      }
370
371      // evict remaining tasks and futures from taskPool.
372      futures.clear();
373      while (taskPool.poll() != null) {}
374      stop();
375    }
376
377    /**
378     * Abruptly shutdown the thread pool.  Call when exiting a region server.
379     */
380    void stop() {
381      if (this.stopped) return;
382
383      this.stopped = true;
384      this.executor.shutdown();
385    }
386  }
387
388  /**
389   * Create a default snapshot handler - uses a zookeeper based member controller.
390   * @param rss region server running the handler
391   * @throws KeeperException if the zookeeper cluster cannot be reached
392   */
393  @Override
394  public void initialize(RegionServerServices rss) throws KeeperException {
395    this.rss = rss;
396    ZKWatcher zkw = rss.getZooKeeper();
397    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
398        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
399
400    // read in the snapshot request configuration properties
401    Configuration conf = rss.getConfiguration();
402    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
403    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
404
405    // create the actual snapshot procedure member
406    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
407      opThreads, keepAlive);
408    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
409  }
410
411  @Override
412  public String getProcedureSignature() {
413    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
414  }
415
416}