001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.snapshot;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.ExecutorCompletionService;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Abortable;
034import org.apache.hadoop.hbase.DaemonThreadFactory;
035import org.apache.hadoop.hbase.DroppedSnapshotException;
036import org.apache.hadoop.hbase.HBaseInterfaceAudience;
037import org.apache.hadoop.hbase.TableName;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.apache.yetus.audience.InterfaceStability;
042import org.apache.hadoop.hbase.client.RegionReplicaUtil;
043import org.apache.hadoop.hbase.errorhandling.ForeignException;
044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
045import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
046import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
047import org.apache.hadoop.hbase.procedure.ProcedureMember;
048import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
049import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
050import org.apache.hadoop.hbase.procedure.Subprocedure;
051import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
052import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
053import org.apache.hadoop.hbase.regionserver.HRegion;
054import org.apache.hadoop.hbase.regionserver.HRegionServer;
055import org.apache.hadoop.hbase.regionserver.RegionServerServices;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
057import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
058import org.apache.zookeeper.KeeperException;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/**
063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
064 * <p>
065 * This provides the mechanism necessary to kick off a online snapshot specific
066 * {@link Subprocedure} that is responsible for the regions being served by this region server.
067 * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure
068 * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all
069 * others.
070 * <p>
071 * On startup, requires {@link #start()} to be called.
072 * <p>
073 * On shutdown, requires {@link #stop(boolean)} to be called
074 */
075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
076@InterfaceStability.Unstable
077public class RegionServerSnapshotManager extends RegionServerProcedureManager {
078  private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class);
079
080  /** Maximum number of snapshot region tasks that can run concurrently */
081  private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
082  private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
083
084  /** Conf key for number of request threads to start snapshots on regionservers */
085  public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
086  /** # of threads for snapshotting regions on the rs. */
087  public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
088
089  /** Conf key for max time to keep threads in snapshot request pool waiting */
090  public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
091  /** Keep threads alive in request pool for max of 300 seconds */
092  public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000;
093
094  /** Conf key for millis between checks to see if snapshot completed or if there are errors*/
095  public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
096  /** Default amount of time to check for errors while regions finish snapshotting */
097  private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
098
099  private RegionServerServices rss;
100  private ProcedureMemberRpcs memberRpcs;
101  private ProcedureMember member;
102
103  /**
104   * Exposed for testing.
105   * @param conf HBase configuration.
106   * @param parent parent running the snapshot handler
107   * @param memberRpc use specified memberRpc instance
108   * @param procMember use specified ProcedureMember
109   */
110   RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
111      ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
112    this.rss = parent;
113    this.memberRpcs = memberRpc;
114    this.member = procMember;
115  }
116
117  public RegionServerSnapshotManager() {}
118
119  /**
120   * Start accepting snapshot requests.
121   */
122  @Override
123  public void start() {
124    LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
125    this.memberRpcs.start(rss.getServerName().toString(), member);
126  }
127
128  /**
129   * Close <tt>this</tt> and all running snapshot tasks
130   * @param force forcefully stop all running tasks
131   * @throws IOException
132   */
133  @Override
134  public void stop(boolean force) throws IOException {
135    String mode = force ? "abruptly" : "gracefully";
136    LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
137
138    try {
139      this.member.close();
140    } finally {
141      this.memberRpcs.close();
142    }
143  }
144
145  /**
146   * If in a running state, creates the specified subprocedure for handling an online snapshot.
147   *
148   * Because this gets the local list of regions to snapshot and not the set the master had,
149   * there is a possibility of a race where regions may be missed.  This detected by the master in
150   * the snapshot verification step.
151   *
152   * @param snapshot
153   * @return Subprocedure to submit to the ProcedureMemeber.
154   */
155  public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
156
157    // don't run a snapshot if the parent is stop(ping)
158    if (rss.isStopping() || rss.isStopped()) {
159      throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
160          + ", because stopping/stopped!");
161    }
162
163    // check to see if this server is hosting any regions for the snapshots
164    // check to see if we have regions for the snapshot
165    List<HRegion> involvedRegions;
166    try {
167      involvedRegions = getRegionsToSnapshot(snapshot);
168    } catch (IOException e1) {
169      throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
170          + "something has gone awry with the online regions.", e1);
171    }
172
173    // We need to run the subprocedure even if we have no relevant regions.  The coordinator
174    // expects participation in the procedure and without sending message the snapshot attempt
175    // will hang and fail.
176
177    LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
178        + snapshot.getTable() + " type " + snapshot.getType());
179    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
180    Configuration conf = rss.getConfiguration();
181    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
182        SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
183    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
184        SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
185
186    switch (snapshot.getType()) {
187    case FLUSH:
188      SnapshotSubprocedurePool taskManager =
189        new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
190      return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
191          timeoutMillis, involvedRegions, snapshot, taskManager);
192    case SKIPFLUSH:
193        /*
194         * This is to take an online-snapshot without force a coordinated flush to prevent pause
195         * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
196         * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
197         * turned on/off based on the flush type.
198         * To minimized the code change, class name is not changed.
199         */
200        SnapshotSubprocedurePool taskManager2 =
201            new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss);
202        return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
203            timeoutMillis, involvedRegions, snapshot, taskManager2);
204
205    default:
206      throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
207    }
208  }
209
210  /**
211   * Determine if the snapshot should be handled on this server
212   *
213   * NOTE: This is racy -- the master expects a list of regionservers.
214   * This means if a region moves somewhere between the calls we'll miss some regions.
215   * For example, a region move during a snapshot could result in a region to be skipped or done
216   * twice.  This is manageable because the {@link MasterSnapshotVerifier} will double check the
217   * region lists after the online portion of the snapshot completes and will explicitly fail the
218   * snapshot.
219   *
220   * @param snapshot
221   * @return the list of online regions. Empty list is returned if no regions are responsible for
222   *         the given snapshot.
223   * @throws IOException
224   */
225  private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
226    List<HRegion> onlineRegions = (List<HRegion>) rss
227        .getRegions(TableName.valueOf(snapshot.getTable()));
228    Iterator<HRegion> iterator = onlineRegions.iterator();
229    // remove the non-default regions
230    while (iterator.hasNext()) {
231      HRegion r = iterator.next();
232      if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
233        iterator.remove();
234      }
235    }
236    return onlineRegions;
237  }
238
239  /**
240   * Build the actual snapshot runner that will do all the 'hard' work
241   */
242  public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
243
244    @Override
245    public Subprocedure buildSubprocedure(String name, byte[] data) {
246      try {
247        // unwrap the snapshot information
248        SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
249        return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
250      } catch (IOException e) {
251        throw new IllegalArgumentException("Could not read snapshot information from request.");
252      }
253    }
254
255  }
256
257  /**
258   * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
259   * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
260   *
261   * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
262   * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
263   * failures.
264   *
265   * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
266   * really built for coordinated tasks where multiple threads as part of one larger task.  In
267   * RS's the HBase Executor services are only used for open and close and not other threadpooled
268   * operations such as compactions and replication  sinks.
269   */
270  static class SnapshotSubprocedurePool {
271    private final Abortable abortable;
272    private final ExecutorCompletionService<Void> taskPool;
273    private final ThreadPoolExecutor executor;
274    private volatile boolean stopped;
275    private final List<Future<Void>> futures = new ArrayList<>();
276    private final String name;
277
278    SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) {
279      this.abortable = abortable;
280      // configure the executor service
281      long keepAlive = conf.getLong(
282        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
283        RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
284      int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
285      this.name = name;
286      executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
287          new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
288      taskPool = new ExecutorCompletionService<>(executor);
289    }
290
291    boolean hasTasks() {
292      return futures.size() != 0;
293    }
294
295    /**
296     * Submit a task to the pool.
297     *
298     * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
299     * version does not support issuing tasks from multiple concurrent table snapshots requests.
300     */
301    void submitTask(final Callable<Void> task) {
302      Future<Void> f = this.taskPool.submit(task);
303      futures.add(f);
304    }
305
306    /**
307     * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
308     * This *must* be called after all tasks are submitted via submitTask.
309     *
310     * @return <tt>true</tt> on success, <tt>false</tt> otherwise
311     * @throws InterruptedException
312     * @throws SnapshotCreationException if the snapshot failed while we were waiting
313     */
314    boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
315      LOG.debug("Waiting for local region snapshots to finish.");
316
317      int sz = futures.size();
318      try {
319        // Using the completion service to process the futures that finish first first.
320        for (int i = 0; i < sz; i++) {
321          Future<Void> f = taskPool.take();
322          f.get();
323          if (!futures.remove(f)) {
324            LOG.warn("unexpected future" + f);
325          }
326          LOG.debug("Completed " + (i+1) + "/" + sz +  " local region snapshots.");
327        }
328        LOG.debug("Completed " + sz +  " local region snapshots.");
329        return true;
330      } catch (InterruptedException e) {
331        LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
332        if (!stopped) {
333          Thread.currentThread().interrupt();
334          throw new ForeignException("SnapshotSubprocedurePool", e);
335        }
336        // we are stopped so we can just exit.
337      } catch (ExecutionException e) {
338        Throwable cause = e.getCause();
339        if (cause instanceof ForeignException) {
340          LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
341          throw (ForeignException)e.getCause();
342        } else if (cause instanceof DroppedSnapshotException) {
343          // we have to abort the region server according to contract of flush
344          abortable.abort("Received DroppedSnapshotException, aborting", cause);
345        }
346        LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
347        throw new ForeignException(name, e.getCause());
348      } finally {
349        cancelTasks();
350      }
351      return false;
352    }
353
354    /**
355     * This attempts to cancel out all pending and in progress tasks (interruptions issues)
356     * @throws InterruptedException
357     */
358    void cancelTasks() throws InterruptedException {
359      Collection<Future<Void>> tasks = futures;
360      LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
361      for (Future<Void> f: tasks) {
362        // TODO Ideally we'd interrupt hbase threads when we cancel.  However it seems that there
363        // are places in the HBase code where row/region locks are taken and not released in a
364        // finally block.  Thus we cancel without interrupting.  Cancellations will be slower to
365        // complete but we won't suffer from unreleased locks due to poor code discipline.
366        f.cancel(false);
367      }
368
369      // evict remaining tasks and futures from taskPool.
370      futures.clear();
371      while (taskPool.poll() != null) {}
372      stop();
373    }
374
375    /**
376     * Abruptly shutdown the thread pool.  Call when exiting a region server.
377     */
378    void stop() {
379      if (this.stopped) return;
380
381      this.stopped = true;
382      this.executor.shutdown();
383    }
384  }
385
386  /**
387   * Create a default snapshot handler - uses a zookeeper based member controller.
388   * @param rss region server running the handler
389   * @throws KeeperException if the zookeeper cluster cannot be reached
390   */
391  @Override
392  public void initialize(RegionServerServices rss) throws KeeperException {
393    this.rss = rss;
394    ZKWatcher zkw = rss.getZooKeeper();
395    this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
396        SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
397
398    // read in the snapshot request configuration properties
399    Configuration conf = rss.getConfiguration();
400    long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
401    int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
402
403    // create the actual snapshot procedure member
404    ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
405      opThreads, keepAlive);
406    this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
407  }
408
409  @Override
410  public String getProcedureSignature() {
411    return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
412  }
413
414}