View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.concurrent.ConcurrentMap;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.SynchronousQueue;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.DaemonThreadFactory;
33  import org.apache.hadoop.hbase.errorhandling.ForeignException;
34  
35  import com.google.common.collect.MapMaker;
36  
37  /**
38   * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
39   * specialized part of a {@link Procedure} that actually does procedure type-specific work
40   * and reports back to the coordinator as it completes each phase.
41   */
42  @InterfaceAudience.Private
43  public class ProcedureMember implements Closeable {
44    private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
45  
46    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
47  
48    private final SubprocedureFactory builder;
49    private final ProcedureMemberRpcs rpcs;
50  
51    private final ConcurrentMap<String,Subprocedure> subprocs =
52        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
53    private final ExecutorService pool;
54  
55    /**
56     * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
57     *
58     * @param rpcs controller used to send notifications to the procedure coordinator
59     * @param pool thread pool to submit subprocedures
60     * @param factory class that creates instances of a subprocedure.
61     */
62    public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
63        SubprocedureFactory factory) {
64      this.pool = pool;
65      this.rpcs = rpcs;
66      this.builder = factory;
67    }
68  
69    /**
70     * Default thread pool for the procedure
71     *
72     * @param memberName
73     * @param procThreads the maximum number of threads to allow in the pool
74     */
75    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
76      return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
77    }
78  
79    /**
80     * Default thread pool for the procedure
81     *
82     * @param memberName
83     * @param procThreads the maximum number of threads to allow in the pool
84     * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
85     */
86    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
87        long keepAliveMillis) {
88      return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
89          new SynchronousQueue<Runnable>(),
90          new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
91    }
92  
93    /**
94     * Package exposed.  Not for public use.
95     *
96     * @return reference to the Procedure member's rpcs object
97     */
98    ProcedureMemberRpcs getRpcs() {
99      return rpcs;
100   }
101 
102 
103   /**
104    * This is separated from execution so that we can detect and handle the case where the
105    * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
106    * sent here)
107    * @param opName
108    * @param data
109    * @return subprocedure
110    */
111   public Subprocedure createSubprocedure(String opName, byte[] data) {
112     return builder.buildSubprocedure(opName, data);
113   }
114 
115   /**
116    * Submit an subprocedure for execution.  This starts the local acquire phase.
117    * @param subproc the subprocedure to execute.
118    * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
119    *         could not be started. In the latter case, the subprocedure holds a reference to
120    *         the exception that caused the failure.
121    */
122   public boolean submitSubprocedure(Subprocedure subproc) {
123      // if the submitted subprocedure was null, bail.
124     if (subproc == null) {
125       LOG.warn("Submitted null subprocedure, nothing to run here.");
126       return false;
127     }
128 
129     String procName = subproc.getName();
130     if (procName == null || procName.length() == 0) {
131       LOG.error("Subproc name cannot be null or the empty string");
132       return false;
133     }
134 
135     // make sure we aren't already running an subprocedure of that name
136     Subprocedure rsub = subprocs.get(procName);
137     if (rsub != null) {
138       if (!rsub.isComplete()) {
139         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
140         return false;
141       }
142       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
143       if (!subprocs.remove(procName, rsub)) {
144         LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
145         return false;
146       }
147     }
148 
149     LOG.debug("Submitting new Subprocedure:" + procName);
150 
151     // kick off the subprocedure
152     try {
153       if (subprocs.putIfAbsent(procName, subproc) == null) {
154         this.pool.submit(subproc);
155         return true;
156       } else {
157         LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
158         return false;
159       }
160     } catch (RejectedExecutionException e) {
161       subprocs.remove(procName, subproc);
162 
163       // the thread pool is full and we can't run the subprocedure
164       String msg = "Subprocedure pool is full!";
165       subproc.cancel(msg, e.getCause());
166     }
167 
168     LOG.error("Failed to start subprocedure '" + procName + "'");
169     return false;
170   }
171 
172    /**
173     * Notification that procedure coordinator has reached the global barrier
174     * @param procName name of the subprocedure that should start running the in-barrier phase
175     */
176    public void receivedReachedGlobalBarrier(String procName) {
177      Subprocedure subproc = subprocs.get(procName);
178      if (subproc == null) {
179        LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
180        return;
181      }
182      if (LOG.isTraceEnabled()) {
183       LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
184      }
185      subproc.receiveReachedGlobalBarrier();
186    }
187 
188   /**
189    * Best effort attempt to close the threadpool via Thread.interrupt.
190    */
191   @Override
192   public void close() throws IOException {
193     // have to use shutdown now to break any latch waiting
194     pool.shutdownNow();
195   }
196 
197   /**
198    * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
199    * @param timeoutMs timeout limit in millis
200    * @return true if successfully, false if bailed due to timeout.
201    * @throws InterruptedException
202    */
203   boolean closeAndWait(long timeoutMs) throws InterruptedException {
204     pool.shutdown();
205     return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
206   }
207 
208   /**
209    * The connection to the rest of the procedure group (member and coordinator) has been
210    * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
211    * other members since we cannot reach them anymore.
212    * @param message description of the error
213    * @param cause the actual cause of the failure
214    * @param procName the name of the procedure we'd cancel due to the error.
215    */
216   public void controllerConnectionFailure(final String message, final Throwable cause,
217       final String procName) {
218     LOG.error(message, cause);
219     if (procName == null) {
220       return;
221     }
222     Subprocedure toNotify = subprocs.get(procName);
223     if (toNotify != null) {
224       toNotify.cancel(message, cause);
225     }
226   }
227 
228   /**
229    * Send abort to the specified procedure
230    * @param procName name of the procedure to about
231    * @param ee exception information about the abort
232    */
233   public void receiveAbortProcedure(String procName, ForeignException ee) {
234     LOG.debug("Request received to abort procedure " + procName, ee);
235     // if we know about the procedure, notify it
236     Subprocedure sub = subprocs.get(procName);
237     if (sub == null) {
238       LOG.info("Received abort on procedure with no local subprocedure " + procName +
239           ", ignoring it.", ee);
240       return; // Procedure has already completed
241     }
242     String msg = "Propagating foreign exception to subprocedure " + sub.getName();
243     LOG.error(msg, ee);
244     sub.cancel(msg, ee);
245   }
246 }