View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.procedure;
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.concurrent.ConcurrentMap;
23  import java.util.concurrent.ExecutorService;
24  import java.util.concurrent.RejectedExecutionException;
25  import java.util.concurrent.SynchronousQueue;
26  import java.util.concurrent.ThreadPoolExecutor;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.classification.InterfaceAudience;
32  import org.apache.hadoop.hbase.DaemonThreadFactory;
33  import org.apache.hadoop.hbase.errorhandling.ForeignException;
34  
35  import com.google.common.collect.MapMaker;
36  
37  /**
38   * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
39   * specialized part of a {@link Procedure} that actually does procedure type-specific work
40   * and reports back to the coordinator as it completes each phase.
41   */
42  @InterfaceAudience.Private
43  public class ProcedureMember implements Closeable {
44    private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
45  
46    final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
47  
48    private final SubprocedureFactory builder;
49    private final ProcedureMemberRpcs rpcs;
50  
51    private final ConcurrentMap<String,Subprocedure> subprocs =
52        new MapMaker().concurrencyLevel(4).weakValues().makeMap();
53    private final ExecutorService pool;
54  
55    /**
56     * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
57     *
58     * @param rpcs controller used to send notifications to the procedure coordinator
59     * @param pool thread pool to submit subprocedures
60     * @param factory class that creates instances of a subprocedure.
61     */
62    public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
63        SubprocedureFactory factory) {
64      this.pool = pool;
65      this.rpcs = rpcs;
66      this.builder = factory;
67    }
68  
69    /**
70     * Default thread pool for the procedure
71     *
72     * @param memberName
73     * @param procThreads the maximum number of threads to allow in the pool
74     */
75    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
76      return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
77    }
78  
79    /**
80     * Default thread pool for the procedure
81     *
82     * @param memberName
83     * @param procThreads the maximum number of threads to allow in the pool
84     * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
85     */
86    public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
87        long keepAliveMillis) {
88      return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
89          new SynchronousQueue<Runnable>(),
90          new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
91    }
92  
93    /**
94     * Package exposed.  Not for public use.
95     *
96     * @return reference to the Procedure member's rpcs object
97     */
98    ProcedureMemberRpcs getRpcs() {
99      return rpcs;
100   }
101 
102 
103   /**
104    * This is separated from execution so that we can detect and handle the case where the
105    * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
106    * sent here)
107    * @param opName
108    * @param data
109    * @return subprocedure
110    */
111   public Subprocedure createSubprocedure(String opName, byte[] data) {
112     return builder.buildSubprocedure(opName, data);
113   }
114 
115   /**
116    * Submit an subprocedure for execution.  This starts the local acquire phase.
117    * @param subproc the subprocedure to execute.
118    * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
119    *         could not be started. In the latter case, the subprocedure holds a reference to
120    *         the exception that caused the failure.
121    */
122   public boolean submitSubprocedure(Subprocedure subproc) {
123      // if the submitted subprocedure was null, bail.
124     if (subproc == null) {
125       LOG.warn("Submitted null subprocedure, nothing to run here.");
126       return false;
127     }
128 
129     String procName = subproc.getName();
130     if (procName == null || procName.length() == 0) {
131       LOG.error("Subproc name cannot be null or the empty string");
132       return false;
133     }
134 
135     // make sure we aren't already running an subprocedure of that name
136     Subprocedure rsub = subprocs.get(procName);
137     if (rsub != null) {
138       if (!rsub.isComplete()) {
139         LOG.error("Subproc '" + procName + "' is already running. Bailing out");
140         return false;
141       }
142       LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
143       if (!subprocs.remove(procName, rsub)) {
144         LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
145         return false;
146       }
147     }
148 
149     LOG.debug("Submitting new Subprocedure:" + procName);
150 
151     // kick off the subprocedure
152     try {
153       if (subprocs.putIfAbsent(procName, subproc) == null) {
154         this.pool.submit(subproc);
155         return true;
156       } else {
157         LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
158         return false;
159       }
160     } catch (RejectedExecutionException e) {
161       subprocs.remove(procName, subproc);
162 
163       // the thread pool is full and we can't run the subprocedure
164       String msg = "Subprocedure pool is full!";
165       subproc.cancel(msg, e.getCause());
166     }
167 
168     LOG.error("Failed to start subprocedure '" + procName + "'");
169     return false;
170   }
171 
172    /**
173     * Notification that procedure coordinator has reached the global barrier
174     * @param procName name of the subprocedure that should start running the in-barrier phase
175     */
176    public void receivedReachedGlobalBarrier(String procName) {
177      Subprocedure subproc = subprocs.get(procName);
178      if (subproc == null) {
179        LOG.warn("Unexpected reached glabal barrier message for Sub-Procedure '" + procName + "'");
180        return;
181      }
182      subproc.receiveReachedGlobalBarrier();
183    }
184 
185   /**
186    * Best effort attempt to close the threadpool via Thread.interrupt.
187    */
188   @Override
189   public void close() throws IOException {
190     // have to use shutdown now to break any latch waiting
191     pool.shutdownNow();
192   }
193 
194   /**
195    * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
196    * @param timeoutMs timeout limit in millis
197    * @return true if successfully, false if bailed due to timeout.
198    * @throws InterruptedException
199    */
200   boolean closeAndWait(long timeoutMs) throws InterruptedException {
201     pool.shutdown();
202     return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
203   }
204 
205   /**
206    * The connection to the rest of the procedure group (member and coordinator) has been
207    * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
208    * other members since we cannot reach them anymore.
209    * @param message description of the error
210    * @param cause the actual cause of the failure
211    * @param procName the name of the procedure we'd cancel due to the error.
212    */
213   public void controllerConnectionFailure(final String message, final Throwable cause,
214       final String procName) {
215     LOG.error(message, cause);
216     if (procName == null) {
217       return;
218     }
219     Subprocedure toNotify = subprocs.get(procName);
220     if (toNotify != null) {
221       toNotify.cancel(message, cause);
222     }
223   }
224 
225   /**
226    * Send abort to the specified procedure
227    * @param procName name of the procedure to about
228    * @param ee exception information about the abort
229    */
230   public void receiveAbortProcedure(String procName, ForeignException ee) {
231     LOG.debug("Request received to abort procedure " + procName, ee);
232     // if we know about the procedure, notify it
233     Subprocedure sub = subprocs.get(procName);
234     if (sub == null) {
235       LOG.info("Received abort on procedure with no local subprocedure " + procName +
236           ", ignoring it.", ee);
237       return; // Procedure has already completed
238     }
239     String msg = "Propagating foreign exception to subprocedure " + sub.getName();
240     LOG.error(msg, ee);
241     sub.cancel(msg, ee);
242   }
243 }