001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure;
019
020import java.io.Closeable;
021import java.io.IOException;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.RejectedExecutionException;
025import java.util.concurrent.SynchronousQueue;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import org.apache.hadoop.hbase.errorhandling.ForeignException;
029import org.apache.hadoop.hbase.util.Threads;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036
037/**
038 * Process to kick off and manage a running {@link Subprocedure} on a member. This is the
039 * specialized part of a {@link Procedure} that actually does procedure type-specific work
040 * and reports back to the coordinator as it completes each phase.
041 */
042@InterfaceAudience.Private
043public class ProcedureMember implements Closeable {
044  private static final Logger LOG = LoggerFactory.getLogger(ProcedureMember.class);
045
046  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
047
048  private final SubprocedureFactory builder;
049  private final ProcedureMemberRpcs rpcs;
050
051  private final ConcurrentMap<String,Subprocedure> subprocs =
052      new MapMaker().concurrencyLevel(4).weakValues().makeMap();
053  private final ExecutorService pool;
054
055  /**
056   * Instantiate a new ProcedureMember.  This is a slave that executes subprocedures.
057   *
058   * @param rpcs controller used to send notifications to the procedure coordinator
059   * @param pool thread pool to submit subprocedures
060   * @param factory class that creates instances of a subprocedure.
061   */
062  public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool,
063      SubprocedureFactory factory) {
064    this.pool = pool;
065    this.rpcs = rpcs;
066    this.builder = factory;
067  }
068
069  /**
070   * Default thread pool for the procedure
071   *
072   * @param memberName
073   * @param procThreads the maximum number of threads to allow in the pool
074   */
075  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
076    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
077  }
078
079  /**
080   * Default thread pool for the procedure
081   *
082   * @param memberName
083   * @param procThreads the maximum number of threads to allow in the pool
084   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks
085   */
086  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
087      long keepAliveMillis) {
088    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
089      new SynchronousQueue<>(),
090      new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d")
091        .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
092  }
093
094  /**
095   * Package exposed.  Not for public use.
096   *
097   * @return reference to the Procedure member's rpcs object
098   */
099  ProcedureMemberRpcs getRpcs() {
100    return rpcs;
101  }
102
103
104  /**
105   * This is separated from execution so that we can detect and handle the case where the
106   * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being
107   * sent here)
108   * @param opName
109   * @param data
110   * @return subprocedure
111   */
112  public Subprocedure createSubprocedure(String opName, byte[] data) {
113    return builder.buildSubprocedure(opName, data);
114  }
115
116  /**
117   * Submit an subprocedure for execution.  This starts the local acquire phase.
118   * @param subproc the subprocedure to execute.
119   * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it
120   *         could not be started. In the latter case, the subprocedure holds a reference to
121   *         the exception that caused the failure.
122   */
123  public boolean submitSubprocedure(Subprocedure subproc) {
124     // if the submitted subprocedure was null, bail.
125    if (subproc == null) {
126      LOG.warn("Submitted null subprocedure, nothing to run here.");
127      return false;
128    }
129
130    String procName = subproc.getName();
131    if (procName == null || procName.length() == 0) {
132      LOG.error("Subproc name cannot be null or the empty string");
133      return false;
134    }
135
136    // make sure we aren't already running an subprocedure of that name
137    Subprocedure rsub = subprocs.get(procName);
138    if (rsub != null) {
139      if (!rsub.isComplete()) {
140        LOG.error("Subproc '" + procName + "' is already running. Bailing out");
141        return false;
142      }
143      LOG.warn("A completed old subproc "  +  procName + " is still present, removing");
144      if (!subprocs.remove(procName, rsub)) {
145        LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
146        return false;
147      }
148    }
149
150    LOG.debug("Submitting new Subprocedure:" + procName);
151
152    // kick off the subprocedure
153    try {
154      if (subprocs.putIfAbsent(procName, subproc) == null) {
155        this.pool.submit(subproc);
156        return true;
157      } else {
158        LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
159        return false;
160      }
161    } catch (RejectedExecutionException e) {
162      subprocs.remove(procName, subproc);
163
164      // the thread pool is full and we can't run the subprocedure
165      String msg = "Subprocedure pool is full!";
166      subproc.cancel(msg, e.getCause());
167    }
168
169    LOG.error("Failed to start subprocedure '" + procName + "'");
170    return false;
171  }
172
173   /**
174    * Notification that procedure coordinator has reached the global barrier
175    * @param procName name of the subprocedure that should start running the in-barrier phase
176    */
177   public void receivedReachedGlobalBarrier(String procName) {
178     Subprocedure subproc = subprocs.get(procName);
179     if (subproc == null) {
180       LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'");
181       return;
182     }
183     if (LOG.isTraceEnabled()) {
184      LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'");
185     }
186     subproc.receiveReachedGlobalBarrier();
187   }
188
189  /**
190   * Best effort attempt to close the threadpool via Thread.interrupt.
191   */
192  @Override
193  public void close() throws IOException {
194    // have to use shutdown now to break any latch waiting
195    pool.shutdownNow();
196  }
197
198  /**
199   * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing
200   * @param timeoutMs timeout limit in millis
201   * @return true if successfully, false if bailed due to timeout.
202   * @throws InterruptedException
203   */
204  boolean closeAndWait(long timeoutMs) throws InterruptedException {
205    pool.shutdown();
206    return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
207  }
208
209  /**
210   * The connection to the rest of the procedure group (member and coordinator) has been
211   * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify
212   * other members since we cannot reach them anymore.
213   * @param message description of the error
214   * @param cause the actual cause of the failure
215   * @param procName the name of the procedure we'd cancel due to the error.
216   */
217  public void controllerConnectionFailure(final String message, final Throwable cause,
218      final String procName) {
219    LOG.error(message, cause);
220    if (procName == null) {
221      return;
222    }
223    Subprocedure toNotify = subprocs.get(procName);
224    if (toNotify != null) {
225      toNotify.cancel(message, cause);
226    }
227  }
228
229  /**
230   * Send abort to the specified procedure
231   * @param procName name of the procedure to about
232   * @param ee exception information about the abort
233   */
234  public void receiveAbortProcedure(String procName, ForeignException ee) {
235    LOG.debug("Request received to abort procedure " + procName, ee);
236    // if we know about the procedure, notify it
237    Subprocedure sub = subprocs.get(procName);
238    if (sub == null) {
239      LOG.info("Received abort on procedure with no local subprocedure " + procName +
240          ", ignoring it.", ee);
241      return; // Procedure has already completed
242    }
243    String msg = "Propagating foreign exception to subprocedure " + sub.getName();
244    LOG.error(msg, ee);
245    sub.cancel(msg, ee);
246  }
247}