View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure.flush;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.List;
24  import java.util.concurrent.Callable;
25  import java.util.concurrent.ExecutionException;
26  import java.util.concurrent.ExecutorCompletionService;
27  import java.util.concurrent.Future;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.ThreadPoolExecutor;
30  import java.util.concurrent.TimeUnit;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.hbase.Abortable;
37  import org.apache.hadoop.hbase.DaemonThreadFactory;
38  import org.apache.hadoop.hbase.DroppedSnapshotException;
39  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.errorhandling.ForeignException;
42  import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
43  import org.apache.hadoop.hbase.procedure.ProcedureMember;
44  import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
45  import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
46  import org.apache.hadoop.hbase.procedure.Subprocedure;
47  import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
48  import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
49  import org.apache.hadoop.hbase.regionserver.HRegionServer;
50  import org.apache.hadoop.hbase.regionserver.Region;
51  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
52  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
53  import org.apache.zookeeper.KeeperException;
54  
55  /**
56   * This manager class handles flushing of the regions for table on a {@link HRegionServer}.
57   */
58  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
59  public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager {
60    private static final Log LOG = LogFactory.getLog(RegionServerFlushTableProcedureManager.class);
61  
62    private static final String CONCURENT_FLUSH_TASKS_KEY =
63        "hbase.flush.procedure.region.concurrentTasks";
64    private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3;
65  
66    public static final String FLUSH_REQUEST_THREADS_KEY =
67        "hbase.flush.procedure.region.pool.threads";
68    public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10;
69  
70    public static final String FLUSH_TIMEOUT_MILLIS_KEY =
71        "hbase.flush.procedure.region.timeout";
72    public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000;
73  
74    public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY =
75        "hbase.flush.procedure.region.wakefrequency";
76    private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500;
77  
78    private RegionServerServices rss;
79    private ProcedureMemberRpcs memberRpcs;
80    private ProcedureMember member;
81  
82    /**
83     * Exposed for testing.
84     * @param conf HBase configuration.
85     * @param server region server.
86     * @param memberRpc use specified memberRpc instance
87     * @param procMember use specified ProcedureMember
88     */
89     RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server,
90        ProcedureMemberRpcs memberRpc, ProcedureMember procMember) {
91      this.rss = server;
92      this.memberRpcs = memberRpc;
93      this.member = procMember;
94    }
95  
96    public RegionServerFlushTableProcedureManager() {}
97  
98    /**
99     * Start accepting flush table requests.
100    */
101   @Override
102   public void start() {
103     LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString());
104     this.memberRpcs.start(rss.getServerName().toString(), member);
105   }
106 
107   /**
108    * Close <tt>this</tt> and all running tasks
109    * @param force forcefully stop all running tasks
110    * @throws IOException
111    */
112   @Override
113   public void stop(boolean force) throws IOException {
114     String mode = force ? "abruptly" : "gracefully";
115     LOG.info("Stopping region server flush procedure manager " + mode + ".");
116 
117     try {
118       this.member.close();
119     } finally {
120       this.memberRpcs.close();
121     }
122   }
123 
124   /**
125    * If in a running state, creates the specified subprocedure to flush table regions.
126    *
127    * Because this gets the local list of regions to flush and not the set the master had,
128    * there is a possibility of a race where regions may be missed.
129    *
130    * @param table
131    * @return Subprocedure to submit to the ProcedureMemeber.
132    */
133   public Subprocedure buildSubprocedure(String table) {
134 
135     // don't run the subprocedure if the parent is stop(ping)
136     if (rss.isStopping() || rss.isStopped()) {
137       throw new IllegalStateException("Can't start flush region subprocedure on RS: "
138           + rss.getServerName() + ", because stopping/stopped!");
139     }
140 
141     // check to see if this server is hosting any regions for the table
142     List<Region> involvedRegions;
143     try {
144       involvedRegions = getRegionsToFlush(table);
145     } catch (IOException e1) {
146       throw new IllegalStateException("Failed to figure out if there is region to flush.", e1);
147     }
148 
149     // We need to run the subprocedure even if we have no relevant regions.  The coordinator
150     // expects participation in the procedure and without sending message the master procedure
151     // will hang and fail.
152 
153     LOG.debug("Launching subprocedure to flush regions for " + table);
154     ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table);
155     Configuration conf = rss.getConfiguration();
156     long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY,
157         FLUSH_TIMEOUT_MILLIS_DEFAULT);
158     long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY,
159         FLUSH_REQUEST_WAKE_MILLIS_DEFAULT);
160 
161     FlushTableSubprocedurePool taskManager =
162         new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss);
163     return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis,
164       timeoutMillis, involvedRegions, table, taskManager);
165   }
166 
167   /**
168    * Get the list of regions to flush for the table on this server
169    *
170    * It is possible that if a region moves somewhere between the calls
171    * we'll miss the region.
172    *
173    * @param table
174    * @return the list of online regions. Empty list is returned if no regions.
175    * @throws IOException
176    */
177   private List<Region> getRegionsToFlush(String table) throws IOException {
178     return rss.getOnlineRegions(TableName.valueOf(table));
179   }
180 
181   public class FlushTableSubprocedureBuilder implements SubprocedureFactory {
182 
183     @Override
184     public Subprocedure buildSubprocedure(String name, byte[] data) {
185       // The name of the procedure instance from the master is the table name.
186       return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name);
187     }
188 
189   }
190 
191   /**
192    * We use the FlushTableSubprocedurePool, a class specific thread pool instead of
193    * {@link org.apache.hadoop.hbase.executor.ExecutorService}.
194    *
195    * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of
196    * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation
197    * failures.
198    */
199   static class FlushTableSubprocedurePool {
200     private final Abortable abortable;
201     private final ExecutorCompletionService<Void> taskPool;
202     private final ThreadPoolExecutor executor;
203     private volatile boolean stopped;
204     private final List<Future<Void>> futures = new ArrayList<Future<Void>>();
205     private final String name;
206 
207     FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) {
208       this.abortable = abortable;
209       // configure the executor service
210       long keepAlive = conf.getLong(
211         RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY,
212         RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
213       int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
214       this.name = name;
215       executor = new ThreadPoolExecutor(1, threads, keepAlive, TimeUnit.MILLISECONDS,
216           new LinkedBlockingQueue<Runnable>(), new DaemonThreadFactory("rs("
217               + name + ")-flush-proc-pool"));
218       taskPool = new ExecutorCompletionService<Void>(executor);
219     }
220 
221     boolean hasTasks() {
222       return futures.size() != 0;
223     }
224 
225     /**
226      * Submit a task to the pool.
227      *
228      * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}.
229      */
230     void submitTask(final Callable<Void> task) {
231       Future<Void> f = this.taskPool.submit(task);
232       futures.add(f);
233     }
234 
235     /**
236      * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}.
237      * This *must* be called after all tasks are submitted via submitTask.
238      *
239      * @return <tt>true</tt> on success, <tt>false</tt> otherwise
240      * @throws InterruptedException
241      */
242     boolean waitForOutstandingTasks() throws ForeignException, InterruptedException {
243       LOG.debug("Waiting for local region flush to finish.");
244 
245       int sz = futures.size();
246       try {
247         // Using the completion service to process the futures.
248         for (int i = 0; i < sz; i++) {
249           Future<Void> f = taskPool.take();
250           f.get();
251           if (!futures.remove(f)) {
252             LOG.warn("unexpected future" + f);
253           }
254           LOG.debug("Completed " + (i+1) + "/" + sz +  " local region flush tasks.");
255         }
256         LOG.debug("Completed " + sz +  " local region flush tasks.");
257         return true;
258       } catch (InterruptedException e) {
259         LOG.warn("Got InterruptedException in FlushSubprocedurePool", e);
260         if (!stopped) {
261           Thread.currentThread().interrupt();
262           throw new ForeignException("FlushSubprocedurePool", e);
263         }
264         // we are stopped so we can just exit.
265       } catch (ExecutionException e) {
266         Throwable cause = e.getCause();
267         if (cause instanceof ForeignException) {
268           LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e);
269           throw (ForeignException)e.getCause();
270         } else if (cause instanceof DroppedSnapshotException) {
271           // we have to abort the region server according to contract of flush
272           abortable.abort("Received DroppedSnapshotException, aborting", cause);
273         }
274         LOG.warn("Got Exception in FlushSubprocedurePool", e);
275         throw new ForeignException(name, e.getCause());
276       } finally {
277         cancelTasks();
278       }
279       return false;
280     }
281 
282     /**
283      * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running
284      * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877).
285      * @throws InterruptedException
286      */
287     void cancelTasks() throws InterruptedException {
288       Collection<Future<Void>> tasks = futures;
289       LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name);
290       for (Future<Void> f: tasks) {
291         f.cancel(false);
292       }
293 
294       // evict remaining tasks and futures from taskPool.
295       futures.clear();
296       while (taskPool.poll() != null) {}
297       stop();
298     }
299 
300     /**
301      * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be
302      * interrupted (see HBASE-13877)
303      */
304     void stop() {
305       if (this.stopped) return;
306 
307       this.stopped = true;
308       this.executor.shutdown();
309     }
310   }
311 
312   /**
313    * Initialize this region server flush procedure manager
314    * Uses a zookeeper based member controller.
315    * @param rss region server
316    * @throws KeeperException if the zookeeper cannot be reached
317    */
318   @Override
319   public void initialize(RegionServerServices rss) throws KeeperException {
320     this.rss = rss;
321     ZooKeeperWatcher zkw = rss.getZooKeeper();
322     this.memberRpcs = new ZKProcedureMemberRpcs(zkw,
323       MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE);
324 
325     Configuration conf = rss.getConfiguration();
326     long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT);
327     int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT);
328 
329     // create the actual flush table procedure member
330     ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(),
331       opThreads, keepAlive);
332     this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder());
333   }
334 
335   @Override
336   public String getProcedureSignature() {
337     return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE;
338   }
339 
340 }