001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.RejectedExecutionException;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.DaemonThreadFactory;
036import org.apache.hadoop.hbase.errorhandling.ForeignException;
037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
038
039import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
040
041/**
042 * This is the master side of a distributed complex procedure execution.
043 * <p>
044 * The {@link Procedure} is generic and subclassing or customization shouldn't be
045 * necessary -- any customization should happen just in {@link Subprocedure}s.
046 */
047@InterfaceAudience.Private
048public class ProcedureCoordinator {
049  private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class);
050
051  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
052  final static long TIMEOUT_MILLIS_DEFAULT = 60000;
053  final static long WAKE_MILLIS_DEFAULT = 500;
054
055  private final ProcedureCoordinatorRpcs rpcs;
056  private final ExecutorService pool;
057  private final long wakeTimeMillis;
058  private final long timeoutMillis;
059
060  // Running procedure table.  Maps procedure name to running procedure reference
061  private final ConcurrentMap<String, Procedure> procedures =
062      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
063
064  /**
065   * Create and start a ProcedureCoordinator.
066   *
067   * The rpc object registers the ProcedureCoordinator and starts any threads in this
068   * constructor.
069   *
070   * @param rpcs
071   * @param pool Used for executing procedures.
072   */
073  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
074    this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
075  }
076
077  /**
078   * Create and start a ProcedureCoordinator.
079   *
080   * The rpc object registers the ProcedureCoordinator and starts any threads in
081   * this constructor.
082   *
083   * @param rpcs
084   * @param pool Used for executing procedures.
085   * @param timeoutMillis
086   */
087  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
088      long timeoutMillis, long wakeTimeMillis) {
089    this.timeoutMillis = timeoutMillis;
090    this.wakeTimeMillis = wakeTimeMillis;
091    this.rpcs = rpcs;
092    this.pool = pool;
093    this.rpcs.start(this);
094  }
095
096  /**
097   * Default thread pool for the procedure
098   *
099   * @param coordName
100   * @param opThreads the maximum number of threads to allow in the pool
101   */
102  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
103    return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
104  }
105
106  /**
107   * Default thread pool for the procedure
108   *
109   * @param coordName
110   * @param opThreads the maximum number of threads to allow in the pool
111   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
112   */
113  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
114      long keepAliveMillis) {
115    return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
116        new SynchronousQueue<>(),
117        new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
118  }
119
120  /**
121   * Shutdown the thread pools and release rpc resources
122   * @throws IOException
123   */
124  public void close() throws IOException {
125    // have to use shutdown now to break any latch waiting
126    pool.shutdownNow();
127    rpcs.close();
128  }
129
130  /**
131   * Submit an procedure to kick off its dependent subprocedures.
132   * @param proc Procedure to execute
133   * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the
134   *         procedure or any subprocedures could not be started.  Failure could be due to
135   *         submitting a procedure multiple times (or one with the same name), or some sort
136   *         of IO problem.  On errors, the procedure's monitor holds a reference to the exception
137   *         that caused the failure.
138   */
139  boolean submitProcedure(Procedure proc) {
140    // if the submitted procedure was null, then we don't want to run it
141    if (proc == null) {
142      return false;
143    }
144    String procName = proc.getName();
145
146    // make sure we aren't already running a procedure of that name
147    Procedure oldProc = procedures.get(procName);
148    if (oldProc != null) {
149      // procedures are always eventually completed on both successful and failed execution
150      try {
151        if (!oldProc.isCompleted()) {
152          LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
153          return false;
154        } else {
155          LOG.debug("Procedure " + procName
156              + " was in running list but was completed.  Accepting new attempt.");
157          if (!procedures.remove(procName, oldProc)) {
158            LOG.warn("Procedure " + procName
159                + " has been resubmitted by another thread. Rejecting this request.");
160            return false;
161          }
162        }
163      } catch (ForeignException e) {
164        LOG.debug("Procedure " + procName
165            + " was in running list but has exception.  Accepting new attempt.");
166        if (!procedures.remove(procName, oldProc)) {
167          LOG.warn("Procedure " + procName
168              + " has been resubmitted by another thread. Rejecting this request.");
169          return false;
170        }
171      }
172    }
173
174    // kick off the procedure's execution in a separate thread
175    try {
176      if (this.procedures.putIfAbsent(procName, proc) == null) {
177        LOG.debug("Submitting procedure " + procName);
178        this.pool.submit(proc);
179        return true;
180      } else {
181        LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
182        return false;
183      }
184    } catch (RejectedExecutionException e) {
185      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
186      // Remove the procedure from the list since is not started
187      this.procedures.remove(procName, proc);
188      // the thread pool is full and we can't run the procedure
189      proc.receive(new ForeignException(procName, e));
190    }
191    return false;
192  }
193
194  /**
195   * The connection to the rest of the procedure group (members and coordinator) has been
196   * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
197   * members since we cannot reach them anymore.
198   * @param message description of the error
199   * @param cause the actual cause of the failure
200   */
201  void rpcConnectionFailure(final String message, final IOException cause) {
202    Collection<Procedure> toNotify = procedures.values();
203
204    boolean isTraceEnabled = LOG.isTraceEnabled();
205    LOG.debug("received connection failure: " + message, cause);
206    for (Procedure proc : toNotify) {
207      if (proc == null) {
208        continue;
209      }
210      // notify the elements, if they aren't null
211      if (isTraceEnabled) {
212        LOG.trace("connection failure - notify procedure: " + proc.getName());
213      }
214      proc.receive(new ForeignException(proc.getName(), cause));
215    }
216  }
217
218  /**
219   * Abort the procedure with the given name
220   * @param procName name of the procedure to abort
221   * @param reason serialized information about the abort
222   */
223  public void abortProcedure(String procName, ForeignException reason) {
224    LOG.debug("abort procedure " + procName, reason);
225    // if we know about the Procedure, notify it
226    Procedure proc = procedures.get(procName);
227    if (proc == null) {
228      return;
229    }
230    proc.receive(reason);
231  }
232
233  /**
234   * Exposed for hooking with unit tests.
235   * @param procName
236   * @param procArgs
237   * @param expectedMembers
238   * @return the newly created procedure
239   */
240  Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
241      List<String> expectedMembers) {
242    // build the procedure
243    return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
244        procName, procArgs, expectedMembers);
245  }
246
247  /**
248   * Kick off the named procedure
249   * Currently only one procedure with the same type and name is allowed to run at a time.
250   * @param procName name of the procedure to start
251   * @param procArgs arguments for the procedure
252   * @param expectedMembers expected members to start
253   * @return handle to the running procedure, if it was started correctly,
254   *         <tt>null</tt> otherwise.
255   *         Null could be due to submitting a procedure multiple times
256   *         (or one with the same name), or runtime exception.
257   *         Check the procedure's monitor that holds a reference to the exception
258   *         that caused the failure.
259   */
260  public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
261      List<String> expectedMembers) {
262    Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
263    if (!this.submitProcedure(proc)) {
264      LOG.error("Failed to submit procedure '" + procName + "'");
265      return null;
266    }
267    return proc;
268  }
269
270  /**
271   * Notification that the procedure had the specified member acquired its part of the barrier
272   * via {@link Subprocedure#acquireBarrier()}.
273   * @param procName name of the procedure that acquired
274   * @param member name of the member that acquired
275   */
276  void memberAcquiredBarrier(String procName, final String member) {
277    Procedure proc = procedures.get(procName);
278    if (proc == null) {
279      LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'");
280      return;
281    }
282    if (LOG.isTraceEnabled()) {
283      LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'");
284    }
285    proc.barrierAcquiredByMember(member);
286  }
287
288  /**
289   * Notification that the procedure had another member finished executing its in-barrier subproc
290   * via {@link Subprocedure#insideBarrier()}.
291   * @param procName name of the subprocedure that finished
292   * @param member name of the member that executed and released its barrier
293   * @param dataFromMember the data that the member returned along with the notification
294   */
295  void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
296    Procedure proc = procedures.get(procName);
297    if (proc == null) {
298      LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'");
299      return;
300    }
301    if (LOG.isTraceEnabled()) {
302      LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'");
303    }
304    proc.barrierReleasedByMember(member, dataFromMember);
305  }
306
307  /**
308   * @return the rpcs implementation for all current procedures
309   */
310  ProcedureCoordinatorRpcs getRpcs() {
311    return rpcs;
312  }
313
314  /**
315   * Returns the procedure.  This Procedure is a live instance so should not be modified but can
316   * be inspected.
317   * @param name Name of the procedure
318   * @return Procedure or null if not present any more
319   */
320  public Procedure getProcedure(String name) {
321    return procedures.get(name);
322  }
323
324  /**
325   * @return Return set of all procedure names.
326   */
327  public Set<String> getProcedureNames() {
328    return new HashSet<>(procedures.keySet());
329  }
330}