001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
036import org.apache.hadoop.hbase.procedure2.util.StringUtils;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
044
045/**
046 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
047 * count threshold. Creates its own threadpool to run RPCs with timeout.
048 * <ul>
049 * <li>Each server queue has a dispatch buffer</li>
050 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
051 * </ul>
052 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done,
053 * call {@link #stop()}.
054 */
055@InterfaceAudience.Private
056public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
057  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
058
059  public static final String THREAD_POOL_SIZE_CONF_KEY =
060      "hbase.procedure.remote.dispatcher.threadpool.size";
061  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
062
063  public static final String DISPATCH_DELAY_CONF_KEY =
064      "hbase.procedure.remote.dispatcher.delay.msec";
065  private static final int DEFAULT_DISPATCH_DELAY = 150;
066
067  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
068      "hbase.procedure.remote.dispatcher.max.queue.size";
069  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
070
071  private final AtomicBoolean running = new AtomicBoolean(false);
072  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
073      new ConcurrentHashMap<TRemote, BufferNode>();
074
075  private final int operationDelay;
076  private final int queueMaxSize;
077  private final int corePoolSize;
078
079  private TimeoutExecutorThread timeoutExecutor;
080  private ThreadPoolExecutor threadPool;
081
082  protected RemoteProcedureDispatcher(Configuration conf) {
083    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
084    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
085    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
086  }
087
088  public boolean start() {
089    if (running.getAndSet(true)) {
090      LOG.warn("Already running");
091      return false;
092    }
093
094    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " +
095        "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
096
097    // Create the timeout executor
098    timeoutExecutor = new TimeoutExecutorThread();
099    timeoutExecutor.start();
100
101    // Create the thread pool that will execute RPCs
102    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
103      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
104          getUncaughtExceptionHandler()));
105    return true;
106  }
107
108  public boolean stop() {
109    if (!running.getAndSet(false)) {
110      return false;
111    }
112
113    LOG.info("Stopping procedure remote dispatcher");
114
115    // send stop signals
116    timeoutExecutor.sendStopSignal();
117    threadPool.shutdownNow();
118    return true;
119  }
120
121  public void join() {
122    assert !running.get() : "expected not running";
123
124    // wait the timeout executor
125    timeoutExecutor.awaitTermination();
126    timeoutExecutor = null;
127
128    // wait for the thread pool to terminate
129    threadPool.shutdownNow();
130    try {
131      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
132        LOG.warn("Waiting for thread-pool to terminate");
133      }
134    } catch (InterruptedException e) {
135      LOG.warn("Interrupted while waiting for thread-pool termination", e);
136    }
137  }
138
139  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
140
141  // ============================================================================================
142  //  Node Helpers
143  // ============================================================================================
144  /**
145   * Add a node that will be able to execute remote procedures
146   * @param key the node identifier
147   */
148  public void addNode(final TRemote key) {
149    assert key != null: "Tried to add a node with a null key";
150    final BufferNode newNode = new BufferNode(key);
151    nodeMap.putIfAbsent(key, newNode);
152  }
153
154  /**
155   * Add a remote rpc.
156   * @param key the node identifier
157   */
158  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
159          throws NullTargetServerDispatchException, NoServerDispatchException,
160          NoNodeDispatchException {
161    if (key == null) {
162      throw new NullTargetServerDispatchException(rp.toString());
163    }
164    BufferNode node = nodeMap.get(key);
165    if (node == null) {
166      // If null here, it means node has been removed because it crashed. This happens when server
167      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
168      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
169    }
170    node.add(rp);
171    // Check our node still in the map; could have been removed by #removeNode.
172    if (!nodeMap.containsValue(node)) {
173      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
174    }
175  }
176
177  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
178    BufferNode node = nodeMap.get(key);
179    if (node == null) {
180      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
181        key);
182      return;
183    }
184    node.operationCompleted(rp);
185  }
186
187  /**
188   * Remove a remote node
189   * @param key the node identifier
190   */
191  public boolean removeNode(final TRemote key) {
192    final BufferNode node = nodeMap.remove(key);
193    if (node == null) {
194      return false;
195    }
196
197    node.abortOperationsInQueue();
198    return true;
199  }
200
201  // ============================================================================================
202  //  Task Helpers
203  // ============================================================================================
204  protected final void submitTask(Runnable task) {
205    threadPool.execute(task);
206  }
207
208  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
209    timeoutExecutor.add(new DelayedTask(task, delay, unit));
210  }
211
212  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
213  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
214
215  /**
216   * Data structure with reference to remote operation.
217   */
218  public static abstract class RemoteOperation {
219    private final RemoteProcedure remoteProcedure;
220
221    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
222      this.remoteProcedure = remoteProcedure;
223    }
224
225    public RemoteProcedure getRemoteProcedure() {
226      return remoteProcedure;
227    }
228  }
229
230  /**
231   * Remote procedure reference.
232   */
233  public interface RemoteProcedure<TEnv, TRemote> {
234    /**
235     * For building the remote operation.
236     * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been
237     * finished already. This is possible, as we may have already sent the procedure to RS but then
238     * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the
239     * procedure and execute it and then report back, before we retry again.
240     */
241    Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);
242
243    /**
244     * Called when the executeProcedure call is failed.
245     */
246    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
247
248    /**
249     * Called when RS tells the remote procedure is succeeded through the
250     * {@code reportProcedureDone} method.
251     */
252    void remoteOperationCompleted(TEnv env);
253
254    /**
255     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
256     * method.
257     */
258    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
259
260    /**
261     * Whether store this remote procedure in dispatched queue
262     * only OpenRegionProcedure and CloseRegionProcedure return false since they are
263     * not fully controlled by dispatcher
264     */
265    default boolean storeInDispatchedQueue() {
266      return true;
267    }
268  }
269
270  /**
271   * Account of what procedures are running on remote node.
272   */
273  public interface RemoteNode<TEnv, TRemote> {
274    TRemote getKey();
275
276    void add(RemoteProcedure<TEnv, TRemote> operation);
277
278    void dispatch();
279  }
280
281  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
282      final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
283    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
284    for (RemoteProcedure proc : remoteProcedures) {
285      Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
286      operation.ifPresent(op -> requestByType.put(op.getClass(), op));
287    }
288    return requestByType;
289  }
290
291  protected <T extends RemoteOperation> List<T> fetchType(
292      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
293    return (List<T>)requestByType.removeAll(type);
294  }
295
296  // ============================================================================================
297  //  Timeout Helpers
298  // ============================================================================================
299  private final class TimeoutExecutorThread extends Thread {
300    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
301
302    public TimeoutExecutorThread() {
303      super("ProcedureDispatcherTimeoutThread");
304    }
305
306    @Override
307    public void run() {
308      while (running.get()) {
309        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
310        if (task == null || task == DelayedUtil.DELAYED_POISON) {
311          // the executor may be shutting down, and the task is just the shutdown request
312          continue;
313        }
314        if (task instanceof DelayedTask) {
315          threadPool.execute(((DelayedTask) task).getObject());
316        } else {
317          ((BufferNode) task).dispatch();
318        }
319      }
320    }
321
322    public void add(final DelayedWithTimeout delayed) {
323      queue.add(delayed);
324    }
325
326    public void remove(final DelayedWithTimeout delayed) {
327      queue.remove(delayed);
328    }
329
330    public void sendStopSignal() {
331      queue.add(DelayedUtil.DELAYED_POISON);
332    }
333
334    public void awaitTermination() {
335      try {
336        final long startTime = EnvironmentEdgeManager.currentTime();
337        for (int i = 0; isAlive(); ++i) {
338          sendStopSignal();
339          join(250);
340          if (i > 0 && (i % 8) == 0) {
341            LOG.warn("Waiting termination of thread " + getName() + ", " +
342              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
343          }
344        }
345      } catch (InterruptedException e) {
346        LOG.warn(getName() + " join wait got interrupted", e);
347      }
348    }
349  }
350
351  // ============================================================================================
352  //  Internals Helpers
353  // ============================================================================================
354
355  /**
356   * Node that contains a set of RemoteProcedures
357   */
358  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
359      implements RemoteNode<TEnv, TRemote> {
360    private Set<RemoteProcedure> operations;
361    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
362
363    protected BufferNode(final TRemote key) {
364      super(key, 0);
365    }
366
367    @Override
368    public TRemote getKey() {
369      return getObject();
370    }
371
372    @Override
373    public synchronized void add(final RemoteProcedure operation) {
374      if (this.operations == null) {
375        this.operations = new HashSet<>();
376        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
377        timeoutExecutor.add(this);
378      }
379      this.operations.add(operation);
380      if (this.operations.size() > queueMaxSize) {
381        timeoutExecutor.remove(this);
382        dispatch();
383      }
384    }
385
386    @Override
387    public synchronized void dispatch() {
388      if (operations != null) {
389        remoteDispatch(getKey(), operations);
390        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
391            .forEach(operation -> dispatchedOperations.add(operation));
392        this.operations = null;
393      }
394    }
395
396    public synchronized void abortOperationsInQueue() {
397      if (operations != null) {
398        abortPendingOperations(getKey(), operations);
399        this.operations = null;
400      }
401      abortPendingOperations(getKey(), dispatchedOperations);
402      this.dispatchedOperations.clear();
403    }
404
405    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
406      this.dispatchedOperations.remove(remoteProcedure);
407    }
408
409    @Override
410    public String toString() {
411      return super.toString() + ", operations=" + this.operations;
412    }
413  }
414
415  /**
416   * Delayed object that holds a FutureTask.
417   * <p/>
418   * used to submit something later to the thread-pool.
419   */
420  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
421    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
422      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
423    }
424  };
425}