View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.Collection;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Future;
26  import java.util.concurrent.RejectedExecutionException;
27  import java.util.concurrent.SynchronousQueue;
28  import java.util.concurrent.ThreadPoolExecutor;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.classification.InterfaceStability;
35  import org.apache.hadoop.hbase.DaemonThreadFactory;
36  import org.apache.hadoop.hbase.errorhandling.ForeignException;
37  
38  import com.google.common.collect.MapMaker;
39  
40  /**
41   * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
42   * specialized part of a {@link Procedure} that actually does procedure type-specific work
43   * and reports back to the coordinator as it completes each phase.
44   * <p>
45   * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
46   * currently running subprocedures are notify to failed since there is no longer a way to reach any
47   * other members or coordinators since the rpcs are down.
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Evolving
51  public class ProcedureMember implements Closeable {
52    private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
53  
54    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
55  
56    private final SubprocedureFactory builder;
57    private final ProcedureMemberRpcs rpcs;
58  
59    private final ConcurrentMap<String,Subprocedure> subprocs =
60        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
61    private final ExecutorService pool;
62  
63    /**
64     * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
65     *
66     * @param rpcs controller used to send notifications to the procedure coordinator
67     * @param pool thread pool to submit subprocedures
68     * @param factory class that creates instances of a subprocedure.
69     */
70    public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
71        SubprocedureFactory factory) {
72      this.pool = pool;
73      this.rpcs = rpcs;
74      this.builder = factory;
75    }
76  
77    /**
78     * Default thread pool for the procedure
79     *
80     * @param memberName
81     * @param procThreads the maximum number of threads to allow in the pool
82     */
83    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
84      return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
85    }
86  
87    /**
88     * Default thread pool for the procedure
89     *
90     * @param memberName
91     * @param procThreads the maximum number of threads to allow in the pool
92     * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
93     */
94    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
95        long keepAliveMillis) {
96      return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
97          new SynchronousQueue<Runnable>(),
98          new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
99    }
100 
101   /**
102    * Package exposed.  Not for public use.
103    *
104    * @return reference to the Procedure member's rpcs object
105    */
106   ProcedureMemberRpcs getRpcs() {
107     return rpcs;
108   }
109 
110 
111   /**
112    * This is separated from execution so that we can detect and handle the case where the
113    * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
114    * sent here)
115    * @param opName
116    * @param data
117    * @return subprocedure
118    */
119   public Subprocedure createSubprocedure(String opName, byte[] data) {
120     return builder.buildSubprocedure(opName, data);
121   }
122 
123   /**
124    * Submit an subprocedure for execution.  This starts the local acquire phase.
125    * @param subproc the subprocedure to execute.
126    * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
127    *         could not be started. In the latter case, the subprocedure holds a reference to
128    *         the exception that caused the failure.
129    */
130   public boolean submitSubprocedure(Subprocedure subproc) {
131      // if the submitted subprocedure was null, bail.
132     if (subproc == null) {
133       LOG.warn("Submitted null subprocedure, nothing to run here.");
134       return false;
135     }
136 
137     String procName = subproc.getName();
138     if (procName == null || procName.length() == 0) {
139       LOG.error("Subproc name cannot be null or the empty string");
140       return false;
141     }
142 
143     // make sure we aren't already running an subprocedure of that name
144     Subprocedure rsub;
145     synchronized (subprocs) {
146       rsub = subprocs.get(procName);
147     }
148     if (rsub != null) {
149       if (!rsub.isComplete()) {
150         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
151         return false;
152       }
153       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
154       subprocs.remove(procName);
155     }
156 
157     LOG.debug("Submitting new Subprocedure:" + procName);
158 
159     // kick off the subprocedure
160     Future<Void> future = null;
161     try {
162       future = this.pool.submit(subproc);
163       synchronized (subprocs) {
164         subprocs.put(procName, subproc);
165       }
166       return true;
167     } catch (RejectedExecutionException e) {
168       // the thread pool is full and we can't run the subprocedure
169       String msg = "Subprocedure pool is full!";
170       subproc.cancel(msg, e.getCause());
171 
172       // cancel all subprocedures proactively
173       if (future != null) {
174         future.cancel(true);
175       }
176     }
177 
178     LOG.error("Failed to start subprocedure '" + procName + "'");
179     return false;
180   }
181 
182    /**
183     * Notification that procedure coordinator has reached the global barrier
184     * @param procName name of the subprocedure that should start running the the in-barrier phase
185     */
186    public void receivedReachedGlobalBarrier(String procName) {
187      Subprocedure subproc = subprocs.get(procName);
188      if (subproc == null) {
189        LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
190        return;
191      }
192      subproc.receiveReachedGlobalBarrier();
193    }
194 
195   /**
196    * Best effort attempt to close the threadpool via Thread.interrupt.
197    */
198   @Override
199   public void close() throws IOException {
200     // have to use shutdown now to break any latch waiting
201     pool.shutdownNow();
202   }
203 
204   /**
205    * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
206    * @param timeoutMs timeout limit in millis
207    * @return true if successfully, false if bailed due to timeout.
208    * @throws InterruptedException
209    */
210   boolean closeAndWait(long timeoutMs) throws InterruptedException {
211     pool.shutdown();
212     return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
213   }
214 
215   /**
216    * The connection to the rest of the procedure group (member and coordinator) has been
217    * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
218    * other members since we cannot reach them anymore.
219    * @param message description of the error
220    * @param cause the actual cause of the failure
221    *
222    * TODO i'm tempted to just remove this code completely and treat it like any other abort.
223    * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
224    */
225   public void controllerConnectionFailure(final String message, final IOException cause) {
226     Collection<Subprocedure> toNotify = subprocs.values();
227     LOG.error(message, cause);
228     for (Subprocedure sub : toNotify) {
229       // TODO notify the elements, if they aren't null
230       sub.cancel(message, cause);
231     }
232   }
233 
234   /**
235    * Send abort to the specified procedure
236    * @param procName name of the procedure to about
237    * @param ee exception information about the abort
238    */
239   public void receiveAbortProcedure(String procName, ForeignException ee) {
240     LOG.debug("Request received to abort procedure " + procName, ee);
241     // if we know about the procedure, notify it
242     Subprocedure sub = subprocs.get(procName);
243     if (sub == null) {
244       LOG.info("Received abort on procedure with no local subprocedure " + procName +
245           ", ignoring it.", ee);
246       return; // Procedure has already completed
247     }
248     LOG.error("Propagating foreign exception to subprocedure " + sub.getName(), ee);
249     sub.monitor.receive(ee);
250   }
251 }