View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.Collection;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.RejectedExecutionException;
27  import java.util.concurrent.SynchronousQueue;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.hbase.DaemonThreadFactory;
35  import org.apache.hadoop.hbase.errorhandling.ForeignException;
36  
37  import com.google.common.collect.MapMaker;
38  
39  /**
40   * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
41   * specialized part of a {@link Procedure} that actually does procedure type-specific work
42   * and reports back to the coordinator as it completes each phase.
43   * <p>
44   * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
45   * currently running subprocedures are notify to failed since there is no longer a way to reach any
46   * other members or coordinators since the rpcs are down.
47   */
48  @InterfaceAudience.Private
49  public class ProcedureMember implements Closeable {
50    private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
51  
52    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
53  
54    private final SubprocedureFactory builder;
55    private final ProcedureMemberRpcs rpcs;
56  
57    private final ConcurrentMap<String,Subprocedure> subprocs =
58        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
59    private final ExecutorService pool;
60  
61    /**
62     * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
63     *
64     * @param rpcs controller used to send notifications to the procedure coordinator
65     * @param pool thread pool to submit subprocedures
66     * @param factory class that creates instances of a subprocedure.
67     */
68    public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
69        SubprocedureFactory factory) {
70      this.pool = pool;
71      this.rpcs = rpcs;
72      this.builder = factory;
73    }
74  
75    /**
76     * Default thread pool for the procedure
77     *
78     * @param memberName
79     * @param procThreads the maximum number of threads to allow in the pool
80     */
81    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
82      return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
83    }
84  
85    /**
86     * Default thread pool for the procedure
87     *
88     * @param memberName
89     * @param procThreads the maximum number of threads to allow in the pool
90     * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
91     */
92    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
93        long keepAliveMillis) {
94      return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
95          new SynchronousQueue<Runnable>(),
96          new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
97    }
98  
99    /**
100    * Package exposed.  Not for public use.
101    *
102    * @return reference to the Procedure member's rpcs object
103    */
104   ProcedureMemberRpcs getRpcs() {
105     return rpcs;
106   }
107 
108 
109   /**
110    * This is separated from execution so that we can detect and handle the case where the
111    * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
112    * sent here)
113    * @param opName
114    * @param data
115    * @return subprocedure
116    */
117   public Subprocedure createSubprocedure(String opName, byte[] data) {
118     return builder.buildSubprocedure(opName, data);
119   }
120 
121   /**
122    * Submit an subprocedure for execution.  This starts the local acquire phase.
123    * @param subproc the subprocedure to execute.
124    * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
125    *         could not be started. In the latter case, the subprocedure holds a reference to
126    *         the exception that caused the failure.
127    */
128   public boolean submitSubprocedure(Subprocedure subproc) {
129      // if the submitted subprocedure was null, bail.
130     if (subproc == null) {
131       LOG.warn("Submitted null subprocedure, nothing to run here.");
132       return false;
133     }
134 
135     String procName = subproc.getName();
136     if (procName == null || procName.length() == 0) {
137       LOG.error("Subproc name cannot be null or the empty string");
138       return false;
139     }
140 
141     // make sure we aren't already running an subprocedure of that name
142     Subprocedure rsub;
143     synchronized (subprocs) {
144       rsub = subprocs.get(procName);
145     }
146     if (rsub != null) {
147       if (!rsub.isComplete()) {
148         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
149         return false;
150       }
151       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
152       subprocs.remove(procName);
153     }
154 
155     LOG.debug("Submitting new Subprocedure:" + procName);
156 
157     // kick off the subprocedure
158     Future<Void> future = null;
159     try {
160       synchronized (subprocs) {
161         subprocs.put(procName, subproc);
162       }
163       future = this.pool.submit(subproc);
164       return true;
165     } catch (RejectedExecutionException e) {
166       synchronized (subprocs) {
167         subprocs.remove(procName);
168       }
169       // the thread pool is full and we can't run the subprocedure
170       String msg = "Subprocedure pool is full!";
171       subproc.cancel(msg, e.getCause());
172 
173       // cancel all subprocedures proactively
174       if (future != null) {
175         future.cancel(true);
176       }
177     }
178 
179     LOG.error("Failed to start subprocedure '" + procName + "'");
180     return false;
181   }
182 
183    /**
184     * Notification that procedure coordinator has reached the global barrier
185     * @param procName name of the subprocedure that should start running the the in-barrier phase
186     */
187    public void receivedReachedGlobalBarrier(String procName) {
188      Subprocedure subproc = subprocs.get(procName);
189      if (subproc == null) {
190        LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
191        return;
192      }
193      subproc.receiveReachedGlobalBarrier();
194    }
195 
196   /**
197    * Best effort attempt to close the threadpool via Thread.interrupt.
198    */
199   @Override
200   public void close() throws IOException {
201     // have to use shutdown now to break any latch waiting
202     pool.shutdownNow();
203   }
204 
205   /**
206    * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
207    * @param timeoutMs timeout limit in millis
208    * @return true if successfully, false if bailed due to timeout.
209    * @throws InterruptedException
210    */
211   boolean closeAndWait(long timeoutMs) throws InterruptedException {
212     pool.shutdown();
213     return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
214   }
215 
216   /**
217    * The connection to the rest of the procedure group (member and coordinator) has been
218    * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
219    * other members since we cannot reach them anymore.
220    * @param message description of the error
221    * @param cause the actual cause of the failure
222    *
223    * TODO i'm tempted to just remove this code completely and treat it like any other abort.
224    * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
225    */
226   public void controllerConnectionFailure(final String message, final IOException cause) {
227     Collection<Subprocedure> toNotify = subprocs.values();
228     LOG.error(message, cause);
229     for (Subprocedure sub : toNotify) {
230       // TODO notify the elements, if they aren't null
231       sub.cancel(message, cause);
232     }
233   }
234 
235   /**
236    * Send abort to the specified procedure
237    * @param procName name of the procedure to about
238    * @param ee exception information about the abort
239    */
240   public void receiveAbortProcedure(String procName, ForeignException ee) {
241     LOG.debug("Request received to abort procedure " + procName, ee);
242     // if we know about the procedure, notify it
243     Subprocedure sub = subprocs.get(procName);
244     if (sub == null) {
245       LOG.info("Received abort on procedure with no local subprocedure " + procName +
246           ", ignoring it.", ee);
247       return; // Procedure has already completed
248     }
249     String msg = "Propagating foreign exception to subprocedure " + sub.getName();
250     LOG.error(msg, ee);
251     sub.cancel(msg, ee);
252   }
253 }