View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.Collection;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.SynchronousQueue;
27  import java.util.concurrent.ThreadPoolExecutor;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.hbase.classification.InterfaceAudience;
33  import org.apache.hadoop.hbase.DaemonThreadFactory;
34  import org.apache.hadoop.hbase.errorhandling.ForeignException;
35  
36  import com.google.common.collect.MapMaker;
37  
38  /**
39   * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
40   * specialized part of a {@link Procedure} that actually does procedure type-specific work
41   * and reports back to the coordinator as it completes each phase.
42   * <p>
43   * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
44   * currently running subprocedures are notify to failed since there is no longer a way to reach any
45   * other members or coordinators since the rpcs are down.
46   */
47  @InterfaceAudience.Private
48  public class ProcedureMember implements Closeable {
49    private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
50  
51    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
52  
53    private final SubprocedureFactory builder;
54    private final ProcedureMemberRpcs rpcs;
55  
56    private final ConcurrentMap<String,Subprocedure> subprocs =
57        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
58    private final ExecutorService pool;
59  
60    /**
61     * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
62     *
63     * @param rpcs controller used to send notifications to the procedure coordinator
64     * @param pool thread pool to submit subprocedures
65     * @param factory class that creates instances of a subprocedure.
66     */
67    public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
68        SubprocedureFactory factory) {
69      this.pool = pool;
70      this.rpcs = rpcs;
71      this.builder = factory;
72    }
73  
74    /**
75     * Default thread pool for the procedure
76     *
77     * @param memberName
78     * @param procThreads the maximum number of threads to allow in the pool
79     */
80    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
81      return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
82    }
83  
84    /**
85     * Default thread pool for the procedure
86     *
87     * @param memberName
88     * @param procThreads the maximum number of threads to allow in the pool
89     * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
90     */
91    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
92        long keepAliveMillis) {
93      return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
94          new SynchronousQueue<Runnable>(),
95          new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
96    }
97  
98    /**
99     * Package exposed.  Not for public use.
100    *
101    * @return reference to the Procedure member's rpcs object
102    */
103   ProcedureMemberRpcs getRpcs() {
104     return rpcs;
105   }
106 
107 
108   /**
109    * This is separated from execution so that we can detect and handle the case where the
110    * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
111    * sent here)
112    * @param opName
113    * @param data
114    * @return subprocedure
115    */
116   public Subprocedure createSubprocedure(String opName, byte[] data) {
117     return builder.buildSubprocedure(opName, data);
118   }
119 
120   /**
121    * Submit an subprocedure for execution.  This starts the local acquire phase.
122    * @param subproc the subprocedure to execute.
123    * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
124    *         could not be started. In the latter case, the subprocedure holds a reference to
125    *         the exception that caused the failure.
126    */
127   public boolean submitSubprocedure(Subprocedure subproc) {
128      // if the submitted subprocedure was null, bail.
129     if (subproc == null) {
130       LOG.warn("Submitted null subprocedure, nothing to run here.");
131       return false;
132     }
133 
134     String procName = subproc.getName();
135     if (procName == null || procName.length() == 0) {
136       LOG.error("Subproc name cannot be null or the empty string");
137       return false;
138     }
139 
140     // make sure we aren't already running an subprocedure of that name
141     Subprocedure rsub = subprocs.get(procName);
142     if (rsub != null) {
143       if (!rsub.isComplete()) {
144         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
145         return false;
146       }
147       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
148       if (!subprocs.remove(procName, rsub)) {
149         LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
150         return false;
151       }
152     }
153 
154     LOG.debug("Submitting new Subprocedure:" + procName);
155 
156     // kick off the subprocedure
157     try {
158       if (subprocs.putIfAbsent(procName, subproc) == null) {
159         this.pool.submit(subproc);
160         return true;
161       } else {
162         LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
163         return false;
164       }
165     } catch (RejectedExecutionException e) {
166       subprocs.remove(procName, subproc);
167 
168       // the thread pool is full and we can't run the subprocedure
169       String msg = "Subprocedure pool is full!";
170       subproc.cancel(msg, e.getCause());
171     }
172 
173     LOG.error("Failed to start subprocedure '" + procName + "'");
174     return false;
175   }
176 
177    /**
178     * Notification that procedure coordinator has reached the global barrier
179     * @param procName name of the subprocedure that should start running the in-barrier phase
180     */
181    public void receivedReachedGlobalBarrier(String procName) {
182      Subprocedure subproc = subprocs.get(procName);
183      if (subproc == null) {
184        LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
185        return;
186      }
187      subproc.receiveReachedGlobalBarrier();
188    }
189 
190   /**
191    * Best effort attempt to close the threadpool via Thread.interrupt.
192    */
193   @Override
194   public void close() throws IOException {
195     // have to use shutdown now to break any latch waiting
196     pool.shutdownNow();
197   }
198 
199   /**
200    * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
201    * @param timeoutMs timeout limit in millis
202    * @return true if successfully, false if bailed due to timeout.
203    * @throws InterruptedException
204    */
205   boolean closeAndWait(long timeoutMs) throws InterruptedException {
206     pool.shutdown();
207     return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
208   }
209 
210   /**
211    * The connection to the rest of the procedure group (member and coordinator) has been
212    * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
213    * other members since we cannot reach them anymore.
214    * @param message description of the error
215    * @param cause the actual cause of the failure
216    *
217    * TODO i'm tempted to just remove this code completely and treat it like any other abort.
218    * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
219    */
220   public void controllerConnectionFailure(final String message, final IOException cause) {
221     Collection<Subprocedure> toNotify = subprocs.values();
222     LOG.error(message, cause);
223     for (Subprocedure sub : toNotify) {
224       // TODO notify the elements, if they aren't null
225       sub.cancel(message, cause);
226     }
227   }
228 
229   /**
230    * Send abort to the specified procedure
231    * @param procName name of the procedure to about
232    * @param ee exception information about the abort
233    */
234   public void receiveAbortProcedure(String procName, ForeignException ee) {
235     LOG.debug("Request received to abort procedure " + procName, ee);
236     // if we know about the procedure, notify it
237     Subprocedure sub = subprocs.get(procName);
238     if (sub == null) {
239       LOG.info("Received abort on procedure with no local subprocedure " + procName +
240           ", ignoring it.", ee);
241       return; // Procedure has already completed
242     }
243     String msg = "Propagating foreign exception to subprocedure " + sub.getName();
244     LOG.error(msg, ee);
245     sub.cancel(msg, ee);
246   }
247 }