001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025import java.util.concurrent.ConcurrentMap;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.RejectedExecutionException;
028import java.util.concurrent.SynchronousQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import org.apache.hadoop.hbase.errorhandling.ForeignException;
032import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040
041/**
042 * This is the master side of a distributed complex procedure execution.
043 * <p>
044 * The {@link Procedure} is generic and subclassing or customization shouldn't be necessary -- any
045 * customization should happen just in {@link Subprocedure}s.
046 */
047@InterfaceAudience.Private
048public class ProcedureCoordinator {
049  private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class);
050
051  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
052  final static long TIMEOUT_MILLIS_DEFAULT = 60000;
053  final static long WAKE_MILLIS_DEFAULT = 500;
054
055  private final ProcedureCoordinatorRpcs rpcs;
056  private final ExecutorService pool;
057  private final long wakeTimeMillis;
058  private final long timeoutMillis;
059
060  // Running procedure table. Maps procedure name to running procedure reference
061  private final ConcurrentMap<String, Procedure> procedures =
062    new MapMaker().concurrencyLevel(4).weakValues().makeMap();
063
064  /**
065   * Create and start a ProcedureCoordinator. The rpc object registers the ProcedureCoordinator and
066   * starts any threads in this constructor.
067   * @param pool Used for executing procedures.
068   */
069  // Only used in tests. SimpleMasterProcedureManager is a test class.
070  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
071    this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
072  }
073
074  /**
075   * Create and start a ProcedureCoordinator. The rpc object registers the ProcedureCoordinator and
076   * starts any threads in this constructor.
077   * @param pool Used for executing procedures.
078   */
079  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
080    long timeoutMillis, long wakeTimeMillis) {
081    this.timeoutMillis = timeoutMillis;
082    this.wakeTimeMillis = wakeTimeMillis;
083    this.rpcs = rpcs;
084    this.pool = pool;
085    this.rpcs.start(this);
086  }
087
088  /**
089   * Default thread pool for the procedure
090   * @param opThreads the maximum number of threads to allow in the pool
091   */
092  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
093    return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
094  }
095
096  /**
097   * Default thread pool for the procedure
098   * @param opThreads       the maximum number of threads to allow in the pool
099   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
100   */
101  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
102    long keepAliveMillis) {
103    return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
104      new SynchronousQueue<>(),
105      new ThreadFactoryBuilder().setNameFormat("(" + coordName + ")-proc-coordinator-pool-%d")
106        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
107  }
108
109  /**
110   * Shutdown the thread pools and release rpc resources
111   */
112  public void close() throws IOException {
113    // have to use shutdown now to break any latch waiting
114    pool.shutdownNow();
115    rpcs.close();
116  }
117
118  /**
119   * Submit an procedure to kick off its dependent subprocedures.
120   * @param proc Procedure to execute
121   * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the procedure
122   *         or any subprocedures could not be started. Failure could be due to submitting a
123   *         procedure multiple times (or one with the same name), or some sort of IO problem. On
124   *         errors, the procedure's monitor holds a reference to the exception that caused the
125   *         failure.
126   */
127  @SuppressWarnings("FutureReturnValueIgnored")
128  boolean submitProcedure(Procedure proc) {
129    // if the submitted procedure was null, then we don't want to run it
130    if (proc == null) {
131      return false;
132    }
133    String procName = proc.getName();
134
135    // make sure we aren't already running a procedure of that name
136    Procedure oldProc = procedures.get(procName);
137    if (oldProc != null) {
138      // procedures are always eventually completed on both successful and failed execution
139      try {
140        if (!oldProc.isCompleted()) {
141          LOG.warn("Procedure " + procName + " currently running.  Rejecting new request");
142          return false;
143        } else {
144          LOG.debug("Procedure " + procName
145            + " was in running list but was completed.  Accepting new attempt.");
146          if (!procedures.remove(procName, oldProc)) {
147            LOG.warn("Procedure " + procName
148              + " has been resubmitted by another thread. Rejecting this request.");
149            return false;
150          }
151        }
152      } catch (ForeignException e) {
153        LOG.debug("Procedure " + procName
154          + " was in running list but has exception.  Accepting new attempt.");
155        if (!procedures.remove(procName, oldProc)) {
156          LOG.warn("Procedure " + procName
157            + " has been resubmitted by another thread. Rejecting this request.");
158          return false;
159        }
160      }
161    }
162
163    // kick off the procedure's execution in a separate thread
164    try {
165      if (this.procedures.putIfAbsent(procName, proc) == null) {
166        LOG.debug("Submitting procedure " + procName);
167        this.pool.submit(proc);
168        return true;
169      } else {
170        LOG.error(
171          "Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
172        return false;
173      }
174    } catch (RejectedExecutionException e) {
175      LOG.warn("Procedure " + procName + " rejected by execution pool.  Propagating error.", e);
176      // Remove the procedure from the list since is not started
177      this.procedures.remove(procName, proc);
178      // the thread pool is full and we can't run the procedure
179      proc.receive(new ForeignException(procName, e));
180    }
181    return false;
182  }
183
184  /**
185   * The connection to the rest of the procedure group (members and coordinator) has been
186   * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other
187   * members since we cannot reach them anymore.
188   * @param message description of the error
189   * @param cause   the actual cause of the failure
190   */
191  void rpcConnectionFailure(final String message, final IOException cause) {
192    Collection<Procedure> toNotify = procedures.values();
193
194    boolean isTraceEnabled = LOG.isTraceEnabled();
195    LOG.debug("received connection failure: " + message, cause);
196    for (Procedure proc : toNotify) {
197      if (proc == null) {
198        continue;
199      }
200      // notify the elements, if they aren't null
201      if (isTraceEnabled) {
202        LOG.trace("connection failure - notify procedure: " + proc.getName());
203      }
204      proc.receive(new ForeignException(proc.getName(), cause));
205    }
206  }
207
208  /**
209   * Abort the procedure with the given name
210   * @param procName name of the procedure to abort
211   * @param reason   serialized information about the abort
212   */
213  public void abortProcedure(String procName, ForeignException reason) {
214    LOG.debug("abort procedure " + procName, reason);
215    // if we know about the Procedure, notify it
216    Procedure proc = procedures.get(procName);
217    if (proc == null) {
218      return;
219    }
220    proc.receive(reason);
221  }
222
223  /**
224   * Exposed for hooking with unit tests.
225   * @return the newly created procedure
226   */
227  Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
228    List<String> expectedMembers) {
229    // build the procedure
230    return new Procedure(this, fed, wakeTimeMillis, timeoutMillis, procName, procArgs,
231      expectedMembers);
232  }
233
234  /**
235   * Kick off the named procedure Currently only one procedure with the same type and name is
236   * allowed to run at a time.
237   * @param procName        name of the procedure to start
238   * @param procArgs        arguments for the procedure
239   * @param expectedMembers expected members to start
240   * @return handle to the running procedure, if it was started correctly, <tt>null</tt> otherwise.
241   *         Null could be due to submitting a procedure multiple times (or one with the same name),
242   *         or runtime exception. Check the procedure's monitor that holds a reference to the
243   *         exception that caused the failure.
244   */
245  public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
246    List<String> expectedMembers) {
247    Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers);
248    if (!this.submitProcedure(proc)) {
249      LOG.error("Failed to submit procedure '" + procName + "'");
250      return null;
251    }
252    return proc;
253  }
254
255  /**
256   * Notification that the procedure had the specified member acquired its part of the barrier via
257   * {@link Subprocedure#acquireBarrier()}.
258   * @param procName name of the procedure that acquired
259   * @param member   name of the member that acquired
260   */
261  void memberAcquiredBarrier(String procName, final String member) {
262    Procedure proc = procedures.get(procName);
263    if (proc == null) {
264      LOG.warn(
265        "Member '" + member + "' is trying to acquire an unknown procedure '" + procName + "'");
266      return;
267    }
268    if (LOG.isTraceEnabled()) {
269      LOG.trace("Member '" + member + "' acquired procedure '" + procName + "'");
270    }
271    proc.barrierAcquiredByMember(member);
272  }
273
274  /**
275   * Notification that the procedure had another member finished executing its in-barrier subproc
276   * via {@link Subprocedure#insideBarrier()}.
277   * @param procName       name of the subprocedure that finished
278   * @param member         name of the member that executed and released its barrier
279   * @param dataFromMember the data that the member returned along with the notification
280   */
281  void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) {
282    Procedure proc = procedures.get(procName);
283    if (proc == null) {
284      LOG.warn(
285        "Member '" + member + "' is trying to release an unknown procedure '" + procName + "'");
286      return;
287    }
288    if (LOG.isTraceEnabled()) {
289      LOG.trace("Member '" + member + "' released procedure '" + procName + "'");
290    }
291    proc.barrierReleasedByMember(member, dataFromMember);
292  }
293
294  /** Returns the rpcs implementation for all current procedures */
295  ProcedureCoordinatorRpcs getRpcs() {
296    return rpcs;
297  }
298
299  /**
300   * Returns the procedure. This Procedure is a live instance so should not be modified but can be
301   * inspected.
302   * @param name Name of the procedure
303   * @return Procedure or null if not present any more
304   */
305  public Procedure getProcedure(String name) {
306    return procedures.get(name);
307  }
308
309  /** Returns Return set of all procedure names. */
310  public Set<String> getProcedureNames() {
311    return new HashSet<>(procedures.keySet());
312  }
313}