001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.hbase.errorhandling.ForeignException;
029import org.apache.hadoop.hbase.util.Threads;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036
037/**
038 * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
039 * specialized part of a {@link Procedure} that actually does procedure type-specific work and
040 * reports back to the coordinator as it completes each phase.
041 */
042@InterfaceAudience.Private
043public class ProcedureMember implements Closeable {
044  private static final Logger LOG = LoggerFactory.getLogger(ProcedureMember.class);
045
046  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
047
048  private final SubprocedureFactory builder;
049  private final ProcedureMemberRpcs rpcs;
050
051  private final ConcurrentMap<String, Subprocedure> subprocs =
052    new MapMaker().concurrencyLevel(4).weakValues().makeMap();
053  private final ExecutorService pool;
054
055  /**
056   * Instantiate a new ProcedureMember. This is a slave that executes subprocedures.
057   * @param rpcs    controller used to send notifications to the procedure coordinator
058   * @param pool    thread pool to submit subprocedures
059   * @param factory class that creates instances of a subprocedure.
060   */
061  public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
062    SubprocedureFactory factory) {
063    this.pool = pool;
064    this.rpcs = rpcs;
065    this.builder = factory;
066  }
067
068  /**
069   * Default thread pool for the procedure
070   * @param procThreads the maximum number of threads to allow in the pool
071   */
072  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
073    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
074  }
075
076  /**
077   * Default thread pool for the procedure
078   * @param procThreads     the maximum number of threads to allow in the pool
079   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
080   */
081  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
082    long keepAliveMillis) {
083    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
084      new SynchronousQueue<>(),
085      new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
086        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
087  }
088
089  /**
090   * Package exposed. Not for public use.
091   * @return reference to the Procedure member's rpcs object
092   */
093  ProcedureMemberRpcs getRpcs() {
094    return rpcs;
095  }
096
097  /**
098   * This is separated from execution so that we can detect and handle the case where the
099   * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
100   * sent here)
101   */
102  public Subprocedure createSubprocedure(String opName, byte[] data) {
103    return builder.buildSubprocedure(opName, data);
104  }
105
106  /**
107   * Submit an subprocedure for execution. This starts the local acquire phase.
108   * @param subproc the subprocedure to execute.
109   * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it could not
110   *         be started. In the latter case, the subprocedure holds a reference to the exception
111   *         that caused the failure.
112   */
113  @SuppressWarnings("FutureReturnValueIgnored")
114  public boolean submitSubprocedure(Subprocedure subproc) {
115    // if the submitted subprocedure was null, bail.
116    if (subproc == null) {
117      LOG.warn("Submitted null subprocedure, nothing to run here.");
118      return false;
119    }
120
121    String procName = subproc.getName();
122    if (procName == null || procName.length() == 0) {
123      LOG.error("Subproc name cannot be null or the empty string");
124      return false;
125    }
126
127    // make sure we aren't already running an subprocedure of that name
128    Subprocedure rsub = subprocs.get(procName);
129    if (rsub != null) {
130      if (!rsub.isComplete()) {
131        LOG.error("Subproc '" + procName + "' is already running. Bailing out");
132        return false;
133      }
134      LOG.warn("A completed old subproc " + procName + " is still present, removing");
135      if (!subprocs.remove(procName, rsub)) {
136        LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
137        return false;
138      }
139    }
140
141    LOG.debug("Submitting new Subprocedure:" + procName);
142
143    // kick off the subprocedure
144    try {
145      if (subprocs.putIfAbsent(procName, subproc) == null) {
146        this.pool.submit(subproc);
147        return true;
148      } else {
149        LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
150        return false;
151      }
152    } catch (RejectedExecutionException e) {
153      subprocs.remove(procName, subproc);
154
155      // the thread pool is full and we can't run the subprocedure
156      String msg = "Subprocedure pool is full!";
157      subproc.cancel(msg, e.getCause());
158    }
159
160    LOG.error("Failed to start subprocedure '" + procName + "'");
161    return false;
162  }
163
164  /**
165   * Notification that procedure coordinator has reached the global barrier
166   * @param procName name of the subprocedure that should start running the in-barrier phase
167   */
168  public void receivedReachedGlobalBarrier(String procName) {
169    Subprocedure subproc = subprocs.get(procName);
170    if (subproc == null) {
171      LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
172      return;
173    }
174    if (LOG.isTraceEnabled()) {
175      LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
176    }
177    subproc.receiveReachedGlobalBarrier();
178  }
179
180  /**
181   * Best effort attempt to close the threadpool via Thread.interrupt.
182   */
183  @Override
184  public void close() throws IOException {
185    // have to use shutdown now to break any latch waiting
186    pool.shutdownNow();
187  }
188
189  /**
190   * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
191   * @param timeoutMs timeout limit in millis
192   * @return true if successfully, false if bailed due to timeout.
193   */
194  boolean closeAndWait(long timeoutMs) throws InterruptedException {
195    pool.shutdown();
196    return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
197  }
198
199  /**
200   * The connection to the rest of the procedure group (member and coordinator) has been
201   * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
202   * other members since we cannot reach them anymore.
203   * @param message  description of the error
204   * @param cause    the actual cause of the failure
205   * @param procName the name of the procedure we'd cancel due to the error.
206   */
207  public void controllerConnectionFailure(final String message, final Throwable cause,
208    final String procName) {
209    LOG.error(message, cause);
210    if (procName == null) {
211      return;
212    }
213    Subprocedure toNotify = subprocs.get(procName);
214    if (toNotify != null) {
215      toNotify.cancel(message, cause);
216    }
217  }
218
219  /**
220   * Send abort to the specified procedure
221   * @param procName name of the procedure to about
222   * @param ee       exception information about the abort
223   */
224  public void receiveAbortProcedure(String procName, ForeignException ee) {
225    LOG.debug("Request received to abort procedure " + procName, ee);
226    // if we know about the procedure, notify it
227    Subprocedure sub = subprocs.get(procName);
228    if (sub == null) {
229      LOG.info(
230        "Received abort on procedure with no local subprocedure " + procName + ", ignoring it.",
231        ee);
232      return; // Procedure has already completed
233    }
234    String msg = "Propagating foreign exception to subprocedure " + sub.getName();
235    LOG.error(msg, ee);
236    sub.cancel(msg, ee);
237  }
238}