001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032import org.apache.hadoop.hbase.DaemonThreadFactory;
033import org.apache.hadoop.hbase.errorhandling.ForeignException;
034
035import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
036
037/**
038 * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
039 * specialized part of a {@link Procedure} that actually does procedure type-specific work
040 * and reports back to the coordinator as it completes each phase.
041 */
042@InterfaceAudience.Private
043public class ProcedureMember implements Closeable {
044  private static final Logger LOG = LoggerFactory.getLogger(ProcedureMember.class);
045
046  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
047
048  private final SubprocedureFactory builder;
049  private final ProcedureMemberRpcs rpcs;
050
051  private final ConcurrentMap<String,Subprocedure> subprocs =
052      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
053  private final ExecutorService pool;
054
055  /**
056   * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
057   *
058   * @param rpcs controller used to send notifications to the procedure coordinator
059   * @param pool thread pool to submit subprocedures
060   * @param factory class that creates instances of a subprocedure.
061   */
062  public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
063      SubprocedureFactory factory) {
064    this.pool = pool;
065    this.rpcs = rpcs;
066    this.builder = factory;
067  }
068
069  /**
070   * Default thread pool for the procedure
071   *
072   * @param memberName
073   * @param procThreads the maximum number of threads to allow in the pool
074   */
075  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
076    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
077  }
078
079  /**
080   * Default thread pool for the procedure
081   *
082   * @param memberName
083   * @param procThreads the maximum number of threads to allow in the pool
084   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
085   */
086  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
087      long keepAliveMillis) {
088    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
089        new SynchronousQueue<>(),
090        new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
091  }
092
093  /**
094   * Package exposed.  Not for public use.
095   *
096   * @return reference to the Procedure member's rpcs object
097   */
098  ProcedureMemberRpcs getRpcs() {
099    return rpcs;
100  }
101
102
103  /**
104   * This is separated from execution so that we can detect and handle the case where the
105   * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
106   * sent here)
107   * @param opName
108   * @param data
109   * @return subprocedure
110   */
111  public Subprocedure createSubprocedure(String opName, byte[] data) {
112    return builder.buildSubprocedure(opName, data);
113  }
114
115  /**
116   * Submit an subprocedure for execution.  This starts the local acquire phase.
117   * @param subproc the subprocedure to execute.
118   * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
119   *         could not be started. In the latter case, the subprocedure holds a reference to
120   *         the exception that caused the failure.
121   */
122  public boolean submitSubprocedure(Subprocedure subproc) {
123     // if the submitted subprocedure was null, bail.
124    if (subproc == null) {
125      LOG.warn("Submitted null subprocedure, nothing to run here.");
126      return false;
127    }
128
129    String procName = subproc.getName();
130    if (procName == null || procName.length() == 0) {
131      LOG.error("Subproc name cannot be null or the empty string");
132      return false;
133    }
134
135    // make sure we aren't already running an subprocedure of that name
136    Subprocedure rsub = subprocs.get(procName);
137    if (rsub != null) {
138      if (!rsub.isComplete()) {
139        LOG.error("Subproc '" + procName + "' is already running. Bailing out");
140        return false;
141      }
142      LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
143      if (!subprocs.remove(procName, rsub)) {
144        LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
145        return false;
146      }
147    }
148
149    LOG.debug("Submitting new Subprocedure:" + procName);
150
151    // kick off the subprocedure
152    try {
153      if (subprocs.putIfAbsent(procName, subproc) == null) {
154        this.pool.submit(subproc);
155        return true;
156      } else {
157        LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
158        return false;
159      }
160    } catch (RejectedExecutionException e) {
161      subprocs.remove(procName, subproc);
162
163      // the thread pool is full and we can't run the subprocedure
164      String msg = "Subprocedure pool is full!";
165      subproc.cancel(msg, e.getCause());
166    }
167
168    LOG.error("Failed to start subprocedure '" + procName + "'");
169    return false;
170  }
171
172   /**
173    * Notification that procedure coordinator has reached the global barrier
174    * @param procName name of the subprocedure that should start running the in-barrier phase
175    */
176   public void receivedReachedGlobalBarrier(String procName) {
177     Subprocedure subproc = subprocs.get(procName);
178     if (subproc == null) {
179       LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
180       return;
181     }
182     if (LOG.isTraceEnabled()) {
183      LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
184     }
185     subproc.receiveReachedGlobalBarrier();
186   }
187
188  /**
189   * Best effort attempt to close the threadpool via Thread.interrupt.
190   */
191  @Override
192  public void close() throws IOException {
193    // have to use shutdown now to break any latch waiting
194    pool.shutdownNow();
195  }
196
197  /**
198   * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
199   * @param timeoutMs timeout limit in millis
200   * @return true if successfully, false if bailed due to timeout.
201   * @throws InterruptedException
202   */
203  boolean closeAndWait(long timeoutMs) throws InterruptedException {
204    pool.shutdown();
205    return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
206  }
207
208  /**
209   * The connection to the rest of the procedure group (member and coordinator) has been
210   * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
211   * other members since we cannot reach them anymore.
212   * @param message description of the error
213   * @param cause the actual cause of the failure
214   * @param procName the name of the procedure we'd cancel due to the error.
215   */
216  public void controllerConnectionFailure(final String message, final Throwable cause,
217      final String procName) {
218    LOG.error(message, cause);
219    if (procName == null) {
220      return;
221    }
222    Subprocedure toNotify = subprocs.get(procName);
223    if (toNotify != null) {
224      toNotify.cancel(message, cause);
225    }
226  }
227
228  /**
229   * Send abort to the specified procedure
230   * @param procName name of the procedure to about
231   * @param ee exception information about the abort
232   */
233  public void receiveAbortProcedure(String procName, ForeignException ee) {
234    LOG.debug("Request received to abort procedure " + procName, ee);
235    // if we know about the procedure, notify it
236    Subprocedure sub = subprocs.get(procName);
237    if (sub == null) {
238      LOG.info("Received abort on procedure with no local subprocedure " + procName +
239          ", ignoring it.", ee);
240      return; // Procedure has already completed
241    }
242    String msg = "Propagating foreign exception to subprocedure " + sub.getName();
243    LOG.error(msg, ee);
244    sub.cancel(msg, ee);
245  }
246}