View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.snapshot;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Iterator;
24  import java.util.List;
25  import java.util.concurrent.Callable;
26  import java.util.concurrent.ExecutionException;
27  import java.util.concurrent.ExecutorCompletionService;
28  import java.util.concurrent.Future;
29  import java.util.concurrent.LinkedBlockingQueue;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.classification.InterfaceStability;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.DaemonThreadFactory;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
42  import org.apache.hadoop.hbase.errorhandling.ForeignException;
43  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
44  import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
45  import org.apache.hadoop.hbase.procedure.ProcedureMember;
46  import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
47  import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
48  import org.apache.hadoop.hbase.procedure.Subprocedure;
49  import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
50  import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
51  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
52  import org.apache.hadoop.hbase.regionserver.HRegion;
53  import org.apache.hadoop.hbase.regionserver.HRegionServer;
54  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
56  import org.apache.zookeeper.KeeperException;
57  
58  import com.google.protobuf.InvalidProtocolBufferException;
59  
60  /**
61   * This manager class handles the work dealing with snapshots for a {@link HRegionServer}.
62   * <p>
63   * This provides the mechanism necessary to kick off a online snapshot specific
64   * {@link Subprocedure} that is responsible for the regions being served by this region server.
65   * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure
66   * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all
67   * others.
68   * <p>
69   * On startup, requires {@link #start()} to be called.
70   * <p>
71   * On shutdown, requires {@link #stop(boolean)} to be called
72   */
73  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
74  @InterfaceStability.Unstable
75  public class RegionServerSnapshotManager extends RegionServerProcedureManager {
76    private static final Log LOG = LogFactory.getLog(RegionServerSnapshotManager.class);
77  
78    /** Maximum number of snapshot region tasks that can run concurrently */
79    private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks";
80    private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3;
81  
82    /** Conf key for number of request threads to start snapshots on regionservers */
83    public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads";
84    /** # of threads for snapshotting regions on the rs. */
85    public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10;
86  
87    /** Conf key for max time to keep threads in snapshot request pool waiting */
88    public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout";
89    /** Keep threads alive in request pool for max of 60 seconds */
90    public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
91  
92    /** Conf key for millis between checks to see if snapshot completed or if there are errors*/
93    public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency";
94    /** Default amount of time to check for errors while regions finish snapshotting */
95    private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500;
96  
97    private RegionServerServices rss;
98    private ProcedureMemberRpcs memberRpcs;
99    private ProcedureMember member;
100 
101   /**
102    * Exposed for testing.
103    * @param conf HBase configuration.
104    * @param parent parent running the snapshot handler
105    * @param memberRpc use specified memberRpc instance
106    * @param procMember use specified ProcedureMember
107    */
108    RegionServerSnapshotManager(Configuration conf, HRegionServer parent,
109       ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
110     this.rss = parent;
111     this.memberRpcs = memberRpc;
112     this.member = procMember;
113   }
114 
115   public RegionServerSnapshotManager() {}
116 
117   /**
118    * Start accepting snapshot requests.
119    */
120   @Override
121   public void start() {
122     LOG.debug("Start Snapshot Manager " + rss.getServerName().toString());
123     this.memberRpcs.start(rss.getServerName().toString(), member);
124   }
125 
126   /**
127    * Close <tt>this</tt> and all running snapshot tasks
128    * @param force forcefully stop all running tasks
129    * @throws IOException
130    */
131   @Override
132   public void stop(boolean force) throws IOException {
133     String mode = force ? "abruptly" : "gracefully";
134     LOG.info("Stopping RegionServerSnapshotManager " + mode + ".");
135 
136     try {
137       this.member.close();
138     } finally {
139       this.memberRpcs.close();
140     }
141   }
142 
143   /**
144    * If in a running state, creates the specified subprocedure for handling an online snapshot.
145    *
146    * Because this gets the local list of regions to snapshot and not the set the master had,
147    * there is a possibility of a race where regions may be missed.  This detected by the master in
148    * the snapshot verification step.
149    *
150    * @param snapshot
151    * @return Subprocedure to submit to the ProcedureMemeber.
152    */
153   public Subprocedure buildSubprocedure(SnapshotDescription snapshot) {
154 
155     // don't run a snapshot if the parent is stop(ping)
156     if (rss.isStopping() || rss.isStopped()) {
157       throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName()
158           + ", because stopping/stopped!");
159     }
160 
161     // check to see if this server is hosting any regions for the snapshots
162     // check to see if we have regions for the snapshot
163     List<HRegion> involvedRegions;
164     try {
165       involvedRegions = getRegionsToSnapshot(snapshot);
166     } catch (IOException e1) {
167       throw new IllegalStateException("Failed to figure out if we should handle a snapshot - "
168           + "something has gone awry with the online regions.", e1);
169     }
170 
171     // We need to run the subprocedure even if we have no relevant regions.  The coordinator
172     // expects participation in the procedure and without sending message the snapshot attempt
173     // will hang and fail.
174 
175     LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table "
176         + snapshot.getTable());
177     ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
178     Configuration conf = rss.getConfiguration();
179     long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
180         SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
181     long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY,
182         SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
183 
184     switch (snapshot.getType()) {
185     case FLUSH:
186       SnapshotSubprocedurePool taskManager =
187         new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
188       return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
189           timeoutMillis, involvedRegions, snapshot, taskManager);
190     case SKIPFLUSH:
191         /*
192          * This is to take an online-snapshot without force a coordinated flush to prevent pause
193          * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure
194          * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be
195          * turned on/off based on the flush type.
196          * To minimized the code change, class name is not changed.
197          */
198         SnapshotSubprocedurePool taskManager2 =
199             new SnapshotSubprocedurePool(rss.getServerName().toString(), conf);
200         return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis,
201             timeoutMillis, involvedRegions, snapshot, taskManager2);
202 
203     default:
204       throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType());
205     }
206   }
207 
208   /**
209    * Determine if the snapshot should be handled on this server
210    *
211    * NOTE: This is racy -- the master expects a list of regionservers.
212    * This means if a region moves somewhere between the calls we'll miss some regions.
213    * For example, a region move during a snapshot could result in a region to be skipped or done
214    * twice.  This is manageable because the {@link MasterSnapshotVerifier} will double check the
215    * region lists after the online portion of the snapshot completes and will explicitly fail the
216    * snapshot.
217    *
218    * @param snapshot
219    * @return the list of online regions. Empty list is returned if no regions are responsible for
220    *         the given snapshot.
221    * @throws IOException
222    */
223   private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException {
224     List<HRegion> onlineRegions = rss.getOnlineRegions(TableName.valueOf(snapshot.getTable()));
225     Iterator<HRegion> iterator = onlineRegions.iterator();
226     // remove the non-default regions
227     while (iterator.hasNext()) {
228       HRegion r = iterator.next();
229       if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
230         iterator.remove();
231       }
232     }
233     return onlineRegions;
234   }
235 
236   /**
237    * Build the actual snapshot runner that will do all the 'hard' work
238    */
239   public class SnapshotSubprocedureBuilder implements SubprocedureFactory {
240 
241     @Override
242     public Subprocedure buildSubprocedure(String name, byte[] data) {
243       try {
244         // unwrap the snapshot information
245         SnapshotDescription snapshot = SnapshotDescription.parseFrom(data);
246         return RegionServerSnapshotManager.this.buildSubprocedure(snapshot);
247       } catch (InvalidProtocolBufferException e) {
248         throw new IllegalArgumentException("Could not read snapshot information from request.");
249       }
250     }
251 
252   }
253 
254   /**
255    * We use the SnapshotSubprocedurePool, a class specific thread pool instead of
256    * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
257    *
258    * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
259    * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
260    * failures.
261    *
262    * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't
263    * really built for coordinated tasks where multiple threads as part of one larger task.  In
264    * RS's the HBase Executor services are only used for open and close and not other threadpooled
265    * operations such as compactions and replication  sinks.
266    */
267   static class SnapshotSubprocedurePool {
268     private final ExecutorCompletionService<Void> taskPool;
269     private final ThreadPoolExecutor executor;
270     private volatile boolean stopped;
271     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
272     private final String name;
273 
274     SnapshotSubprocedurePool(String name, Configuration conf) {
275       // configure the executor service
276       long keepAlive = conf.getLong(
277         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY,
278         RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
279       int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
280       this.name = name;
281       executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
282           new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
283               + name + ")-snapshot-pool"));
284       taskPool = new ExecutorCompletionService<Void>(executor);
285     }
286 
287     boolean hasTasks() {
288       return futures.size() != 0;
289     }
290 
291     /**
292      * Submit a task to the pool.
293      *
294      * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This
295      * version does not support issuing tasks from multiple concurrent table snapshots requests.
296      */
297     void submitTask(final Callable<Void> task) {
298       Future<Void> f = this.taskPool.submit(task);
299       futures.add(f);
300     }
301 
302     /**
303      * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
304      * This *must* be called after all tasks are submitted via submitTask.
305      *
306      * @return <tt>true</tt> on success, <tt>false</tt> otherwise
307      * @throws InterruptedException
308      * @throws SnapshotCreationException if the snapshot failed while we were waiting
309      */
310     boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
311       LOG.debug("Waiting for local region snapshots to finish.");
312 
313       int sz = futures.size();
314       try {
315         // Using the completion service to process the futures that finish first first.
316         for (int i = 0; i < sz; i++) {
317           Future<Void> f = taskPool.take();
318           f.get();
319           if (!futures.remove(f)) {
320             LOG.warn("unexpected future" + f);
321           }
322           LOG.debug("Completed " + (i+1) + "/" + sz +  " local region snapshots.");
323         }
324         LOG.debug("Completed " + sz +  " local region snapshots.");
325         return true;
326       } catch (InterruptedException e) {
327         LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e);
328         if (!stopped) {
329           Thread.currentThread().interrupt();
330           throw new ForeignException("SnapshotSubprocedurePool", e);
331         }
332         // we are stopped so we can just exit.
333       } catch (ExecutionException e) {
334         if (e.getCause() instanceof ForeignException) {
335           LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e);
336           throw (ForeignException)e.getCause();
337         }
338         LOG.warn("Got Exception in SnapshotSubprocedurePool", e);
339         throw new ForeignException(name, e.getCause());
340       } finally {
341         cancelTasks();
342       }
343       return false;
344     }
345 
346     /**
347      * This attempts to cancel out all pending and in progress tasks (interruptions issues)
348      * @throws InterruptedException
349      */
350     void cancelTasks() throws InterruptedException {
351       Collection<Future<Void>> tasks = futures;
352       LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name);
353       for (Future<Void> f: tasks) {
354         // TODO Ideally we'd interrupt hbase threads when we cancel.  However it seems that there
355         // are places in the HBase code where row/region locks are taken and not released in a
356         // finally block.  Thus we cancel without interrupting.  Cancellations will be slower to
357         // complete but we won't suffer from unreleased locks due to poor code discipline.
358         f.cancel(false);
359       }
360 
361       // evict remaining tasks and futures from taskPool.
362       futures.clear();
363       while (taskPool.poll() != null) {}
364       stop();
365     }
366 
367     /**
368      * Abruptly shutdown the thread pool.  Call when exiting a region server.
369      */
370     void stop() {
371       if (this.stopped) return;
372 
373       this.stopped = true;
374       this.executor.shutdownNow();
375     }
376   }
377 
378   /**
379    * Create a default snapshot handler - uses a zookeeper based member controller.
380    * @param rss region server running the handler
381    * @throws KeeperException if the zookeeper cluster cannot be reached
382    */
383   @Override
384   public void initialize(RegionServerServices rss) throws KeeperException {
385     this.rss = rss;
386     ZooKeeperWatcher zkw = rss.getZooKeeper();
387     this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
388         SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION);
389 
390     // read in the snapshot request configuration properties
391     Configuration conf = rss.getConfiguration();
392     long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
393     int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
394 
395     // create the actual snapshot procedure member
396     ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
397       opThreads, keepAlive);
398     this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
399   }
400 
401   @Override
402   public String getProcedureSignature() {
403     return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
404   }
405 
406 }