001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.hadoop.hbase.errorhandling.ForeignException;
030import org.apache.hadoop.hbase.util.Threads;
031import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
037
038/**
039 * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
040 * specialized part of a {@link Procedure} that actually does procedure type-specific work
041 * and reports back to the coordinator as it completes each phase.
042 */
043@InterfaceAudience.Private
044public class ProcedureMember implements Closeable {
045  private static final Logger LOG = LoggerFactory.getLogger(ProcedureMember.class);
046
047  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
048
049  private final SubprocedureFactory builder;
050  private final ProcedureMemberRpcs rpcs;
051
052  private final ConcurrentMap<String,Subprocedure> subprocs =
053      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
054  private final ExecutorService pool;
055
056  /**
057   * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
058   *
059   * @param rpcs controller used to send notifications to the procedure coordinator
060   * @param pool thread pool to submit subprocedures
061   * @param factory class that creates instances of a subprocedure.
062   */
063  public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
064      SubprocedureFactory factory) {
065    this.pool = pool;
066    this.rpcs = rpcs;
067    this.builder = factory;
068  }
069
070  /**
071   * Default thread pool for the procedure
072   *
073   * @param memberName
074   * @param procThreads the maximum number of threads to allow in the pool
075   */
076  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
077    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
078  }
079
080  /**
081   * Default thread pool for the procedure
082   *
083   * @param memberName
084   * @param procThreads the maximum number of threads to allow in the pool
085   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
086   */
087  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
088      long keepAliveMillis) {
089    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
090      new SynchronousQueue<>(),
091      new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
092        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
093  }
094
095  /**
096   * Package exposed.  Not for public use.
097   *
098   * @return reference to the Procedure member's rpcs object
099   */
100  ProcedureMemberRpcs getRpcs() {
101    return rpcs;
102  }
103
104
105  /**
106   * This is separated from execution so that we can detect and handle the case where the
107   * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
108   * sent here)
109   * @param opName
110   * @param data
111   * @return subprocedure
112   */
113  public Subprocedure createSubprocedure(String opName, byte[] data) {
114    return builder.buildSubprocedure(opName, data);
115  }
116
117  /**
118   * Submit an subprocedure for execution.  This starts the local acquire phase.
119   * @param subproc the subprocedure to execute.
120   * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
121   *         could not be started. In the latter case, the subprocedure holds a reference to
122   *         the exception that caused the failure.
123   */
124  @SuppressWarnings("FutureReturnValueIgnored")
125  public boolean submitSubprocedure(Subprocedure subproc) {
126     // if the submitted subprocedure was null, bail.
127    if (subproc == null) {
128      LOG.warn("Submitted null subprocedure, nothing to run here.");
129      return false;
130    }
131
132    String procName = subproc.getName();
133    if (procName == null || procName.length() == 0) {
134      LOG.error("Subproc name cannot be null or the empty string");
135      return false;
136    }
137
138    // make sure we aren't already running an subprocedure of that name
139    Subprocedure rsub = subprocs.get(procName);
140    if (rsub != null) {
141      if (!rsub.isComplete()) {
142        LOG.error("Subproc '" + procName + "' is already running. Bailing out");
143        return false;
144      }
145      LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
146      if (!subprocs.remove(procName, rsub)) {
147        LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
148        return false;
149      }
150    }
151
152    LOG.debug("Submitting new Subprocedure:" + procName);
153
154    // kick off the subprocedure
155    try {
156      if (subprocs.putIfAbsent(procName, subproc) == null) {
157        this.pool.submit(subproc);
158        return true;
159      } else {
160        LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
161        return false;
162      }
163    } catch (RejectedExecutionException e) {
164      subprocs.remove(procName, subproc);
165
166      // the thread pool is full and we can't run the subprocedure
167      String msg = "Subprocedure pool is full!";
168      subproc.cancel(msg, e.getCause());
169    }
170
171    LOG.error("Failed to start subprocedure '" + procName + "'");
172    return false;
173  }
174
175   /**
176    * Notification that procedure coordinator has reached the global barrier
177    * @param procName name of the subprocedure that should start running the in-barrier phase
178    */
179   public void receivedReachedGlobalBarrier(String procName) {
180     Subprocedure subproc = subprocs.get(procName);
181     if (subproc == null) {
182       LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
183       return;
184     }
185     if (LOG.isTraceEnabled()) {
186      LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
187     }
188     subproc.receiveReachedGlobalBarrier();
189   }
190
191  /**
192   * Best effort attempt to close the threadpool via Thread.interrupt.
193   */
194  @Override
195  public void close() throws IOException {
196    // have to use shutdown now to break any latch waiting
197    pool.shutdownNow();
198  }
199
200  /**
201   * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
202   * @param timeoutMs timeout limit in millis
203   * @return true if successfully, false if bailed due to timeout.
204   * @throws InterruptedException
205   */
206  boolean closeAndWait(long timeoutMs) throws InterruptedException {
207    pool.shutdown();
208    return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
209  }
210
211  /**
212   * The connection to the rest of the procedure group (member and coordinator) has been
213   * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
214   * other members since we cannot reach them anymore.
215   * @param message description of the error
216   * @param cause the actual cause of the failure
217   * @param procName the name of the procedure we'd cancel due to the error.
218   */
219  public void controllerConnectionFailure(final String message, final Throwable cause,
220      final String procName) {
221    LOG.error(message, cause);
222    if (procName == null) {
223      return;
224    }
225    Subprocedure toNotify = subprocs.get(procName);
226    if (toNotify != null) {
227      toNotify.cancel(message, cause);
228    }
229  }
230
231  /**
232   * Send abort to the specified procedure
233   * @param procName name of the procedure to about
234   * @param ee exception information about the abort
235   */
236  public void receiveAbortProcedure(String procName, ForeignException ee) {
237    LOG.debug("Request received to abort procedure " + procName, ee);
238    // if we know about the procedure, notify it
239    Subprocedure sub = subprocs.get(procName);
240    if (sub == null) {
241      LOG.info("Received abort on procedure with no local subprocedure " + procName +
242          ", ignoring it.", ee);
243      return; // Procedure has already completed
244    }
245    String msg = "Propagating foreign exception to subprocedure " + sub.getName();
246    LOG.error(msg, ee);
247    sub.cancel(msg, ee);
248  }
249}