001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.RejectedExecutionException;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hadoop.hbase.DaemonThreadFactory;
037import org.apache.hadoop.hbase.errorhandling.ForeignException;
038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
041
042/**
043 * This is the master side of a distributed complex procedure execution.
044 * <p>
045 * The {@link Procedure} is generic and subclassing or customization shouldn't be
046 * necessary -- any customization should happen just in {@link Subprocedure}s.
047 */
048@InterfaceAudience.Private
049public class ProcedureCoordinator {
050  private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class);
051
052  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
053  final static long TIMEOUT_MILLIS_DEFAULT = 60000;
054  final static long WAKE_MILLIS_DEFAULT = 500;
055
056  private final ProcedureCoordinatorRpcs rpcs;
057  private final ExecutorService pool;
058  private final long wakeTimeMillis;
059  private final long timeoutMillis;
060
061  // Running procedure table.  Maps procedure name to running procedure reference
062  private final ConcurrentMap<String, Procedure> procedures =
063      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
064
065  /**
066   * Create and start a ProcedureCoordinator.
067   *
068   * The rpc object registers the ProcedureCoordinator and starts any threads in this
069   * constructor.
070   *
071   * @param pool Used for executing procedures.
072   */
073  @VisibleForTesting // Only used in tests. SimpleMasterProcedureManager is a test class.
074  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
075    this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
076  }
077
078  /**
079   * Create and start a ProcedureCoordinator.
080   *
081   * The rpc object registers the ProcedureCoordinator and starts any threads in
082   * this constructor.
083   *
084   * @param pool Used for executing procedures.
085   */
086  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
087      long timeoutMillis, long wakeTimeMillis) {
088    this.timeoutMillis = timeoutMillis;
089    this.wakeTimeMillis = wakeTimeMillis;
090    this.rpcs = rpcs;
091    this.pool = pool;
092    this.rpcs.start(this);
093  }
094
095  /**
096   * Default thread pool for the procedure
097   *
098   * @param coordName
099   * @param opThreads the maximum number of threads to allow in the pool
100   */
101  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
102    return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
103  }
104
105  /**
106   * Default thread pool for the procedure
107   *
108   * @param coordName
109   * @param opThreads the maximum number of threads to allow in the pool
110   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
111   */
112  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
113      long keepAliveMillis) {
114    return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
115        new SynchronousQueue<>(),
116        new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
117  }
118
119  /**
120   * Shutdown the thread pools and release rpc resources
121   * @throws IOException
122   */
123  public void close() throws IOException {
124    // have to use shutdown now to break any latch waiting
125    pool.shutdownNow();
126    rpcs.close();
127  }
128
129  /**
130   * Submit an procedure to kick off its dependent subprocedures.
131   * @param proc Procedure to execute
132   * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
133   *         procedure or any subprocedures could not be started.  Failure could be due to
134   *         submitting a procedure multiple times (or one with the same name), or some sort
135   *         of IO problem.  On errors, the procedure's monitor holds a reference to the exception
136   *         that caused the failure.
137   */
138  boolean submitProcedure(Procedure proc) {
139    // if the submitted procedure was null, then we don't want to run it
140    if (proc == null) {
141      return false;
142    }
143    String procName = proc.getName();
144
145    // make sure we aren't already running a procedure of that name
146    Procedure oldProc = procedures.get(procName);
147    if (oldProc != null) {
148      // procedures are always eventually completed on both successful and failed execution
149      try {
150        if (!oldProc.isCompleted()) {
151          LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
152          return false;
153        } else {
154          LOG.debug("Procedure " + procName
155              + " was in running list but was completed.  Accepting new attempt.");
156          if (!procedures.remove(procName, oldProc)) {
157            LOG.warn("Procedure " + procName
158                + " has been resubmitted by another thread. Rejecting this request.");
159            return false;
160          }
161        }
162      } catch (ForeignException e) {
163        LOG.debug("Procedure " + procName
164            + " was in running list but has exception.  Accepting new attempt.");
165        if (!procedures.remove(procName, oldProc)) {
166          LOG.warn("Procedure " + procName
167              + " has been resubmitted by another thread. Rejecting this request.");
168          return false;
169        }
170      }
171    }
172
173    // kick off the procedure's execution in a separate thread
174    try {
175      if (this.procedures.putIfAbsent(procName, proc) == null) {
176        LOG.debug("Submitting procedure " + procName);
177        this.pool.submit(proc);
178        return true;
179      } else {
180        LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
181        return false;
182      }
183    } catch (RejectedExecutionException e) {
184      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
185      // Remove the procedure from the list since is not started
186      this.procedures.remove(procName, proc);
187      // the thread pool is full and we can't run the procedure
188      proc.receive(new ForeignException(procName, e));
189    }
190    return false;
191  }
192
193  /**
194   * The connection to the rest of the procedure group (members and coordinator) has been
195   * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
196   * members since we cannot reach them anymore.
197   * @param message description of the error
198   * @param cause the actual cause of the failure
199   */
200  void rpcConnectionFailure(final String message, final IOException cause) {
201    Collection<Procedure> toNotify = procedures.values();
202
203    boolean isTraceEnabled = LOG.isTraceEnabled();
204    LOG.debug("received connection failure: " + message, cause);
205    for (Procedure proc : toNotify) {
206      if (proc == null) {
207        continue;
208      }
209      // notify the elements, if they aren't null
210      if (isTraceEnabled) {
211        LOG.trace("connection failure - notify procedure: " + proc.getName());
212      }
213      proc.receive(new ForeignException(proc.getName(), cause));
214    }
215  }
216
217  /**
218   * Abort the procedure with the given name
219   * @param procName name of the procedure to abort
220   * @param reason serialized information about the abort
221   */
222  public void abortProcedure(String procName, ForeignException reason) {
223    LOG.debug("abort procedure " + procName, reason);
224    // if we know about the Procedure, notify it
225    Procedure proc = procedures.get(procName);
226    if (proc == null) {
227      return;
228    }
229    proc.receive(reason);
230  }
231
232  /**
233   * Exposed for hooking with unit tests.
234   * @param procName
235   * @param procArgs
236   * @param expectedMembers
237   * @return the newly created procedure
238   */
239  Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
240      List<String> expectedMembers) {
241    // build the procedure
242    return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
243        procName, procArgs, expectedMembers);
244  }
245
246  /**
247   * Kick off the named procedure
248   * Currently only one procedure with the same type and name is allowed to run at a time.
249   * @param procName name of the procedure to start
250   * @param procArgs arguments for the procedure
251   * @param expectedMembers expected members to start
252   * @return handle to the running procedure, if it was started correctly,
253   *         <tt>null</tt> otherwise.
254   *         Null could be due to submitting a procedure multiple times
255   *         (or one with the same name), or runtime exception.
256   *         Check the procedure's monitor that holds a reference to the exception
257   *         that caused the failure.
258   */
259  public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
260      List<String> expectedMembers) {
261    Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
262    if (!this.submitProcedure(proc)) {
263      LOG.error("Failed to submit procedure '" + procName + "'");
264      return null;
265    }
266    return proc;
267  }
268
269  /**
270   * Notification that the procedure had the specified member acquired its part of the barrier
271   * via {@link Subprocedure#acquireBarrier()}.
272   * @param procName name of the procedure that acquired
273   * @param member name of the member that acquired
274   */
275  void memberAcquiredBarrier(String procName, final String member) {
276    Procedure proc = procedures.get(procName);
277    if (proc == null) {
278      LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
279      return;
280    }
281    if (LOG.isTraceEnabled()) {
282      LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'");
283    }
284    proc.barrierAcquiredByMember(member);
285  }
286
287  /**
288   * Notification that the procedure had another member finished executing its in-barrier subproc
289   * via {@link Subprocedure#insideBarrier()}.
290   * @param procName name of the subprocedure that finished
291   * @param member name of the member that executed and released its barrier
292   * @param dataFromMember the data that the member returned along with the notification
293   */
294  void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
295    Procedure proc = procedures.get(procName);
296    if (proc == null) {
297      LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
298      return;
299    }
300    if (LOG.isTraceEnabled()) {
301      LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'");
302    }
303    proc.barrierReleasedByMember(member, dataFromMember);
304  }
305
306  /**
307   * @return the rpcs implementation for all current procedures
308   */
309  ProcedureCoordinatorRpcs getRpcs() {
310    return rpcs;
311  }
312
313  /**
314   * Returns the procedure.  This Procedure is a live instance so should not be modified but can
315   * be inspected.
316   * @param name Name of the procedure
317   * @return Procedure or null if not present any more
318   */
319  public Procedure getProcedure(String name) {
320    return procedures.get(name);
321  }
322
323  /**
324   * @return Return set of all procedure names.
325   */
326  public Set<String> getProcedureNames() {
327    return new HashSet<>(procedures.keySet());
328  }
329}