001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.DelayQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
035import org.apache.hadoop.hbase.procedure2.util.StringUtils;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
043
044/**
045 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
046 * count threshold. Creates its own threadpool to run RPCs with timeout.
047 * <ul>
048 * <li>Each server queue has a dispatch buffer</li>
049 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
050 * </ul>
051 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done,
052 * call {@link #stop()}.
053 */
054@InterfaceAudience.Private
055public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
056  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
057
058  public static final String THREAD_POOL_SIZE_CONF_KEY =
059      "hbase.procedure.remote.dispatcher.threadpool.size";
060  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
061
062  public static final String DISPATCH_DELAY_CONF_KEY =
063      "hbase.procedure.remote.dispatcher.delay.msec";
064  private static final int DEFAULT_DISPATCH_DELAY = 150;
065
066  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
067      "hbase.procedure.remote.dispatcher.max.queue.size";
068  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
069
070  private final AtomicBoolean running = new AtomicBoolean(false);
071  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
072      new ConcurrentHashMap<TRemote, BufferNode>();
073
074  private final int operationDelay;
075  private final int queueMaxSize;
076  private final int corePoolSize;
077
078  private TimeoutExecutorThread timeoutExecutor;
079  private ThreadPoolExecutor threadPool;
080
081  protected RemoteProcedureDispatcher(Configuration conf) {
082    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
083    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
084    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
085  }
086
087  public boolean start() {
088    if (running.getAndSet(true)) {
089      LOG.warn("Already running");
090      return false;
091    }
092
093    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " +
094        "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
095
096    // Create the timeout executor
097    timeoutExecutor = new TimeoutExecutorThread();
098    timeoutExecutor.start();
099
100    // Create the thread pool that will execute RPCs
101    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
102      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
103          getUncaughtExceptionHandler()));
104    return true;
105  }
106
107  public boolean stop() {
108    if (!running.getAndSet(false)) {
109      return false;
110    }
111
112    LOG.info("Stopping procedure remote dispatcher");
113
114    // send stop signals
115    timeoutExecutor.sendStopSignal();
116    threadPool.shutdownNow();
117    return true;
118  }
119
120  public void join() {
121    assert !running.get() : "expected not running";
122
123    // wait the timeout executor
124    timeoutExecutor.awaitTermination();
125    timeoutExecutor = null;
126
127    // wait for the thread pool to terminate
128    threadPool.shutdownNow();
129    try {
130      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
131        LOG.warn("Waiting for thread-pool to terminate");
132      }
133    } catch (InterruptedException e) {
134      LOG.warn("Interrupted while waiting for thread-pool termination", e);
135    }
136  }
137
138  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
139
140  // ============================================================================================
141  //  Node Helpers
142  // ============================================================================================
143  /**
144   * Add a node that will be able to execute remote procedures
145   * @param key the node identifier
146   */
147  public void addNode(final TRemote key) {
148    assert key != null: "Tried to add a node with a null key";
149    final BufferNode newNode = new BufferNode(key);
150    nodeMap.putIfAbsent(key, newNode);
151  }
152
153  /**
154   * Add a remote rpc.
155   * @param key the node identifier
156   */
157  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
158  throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
159    if (key == null) {
160      throw new NullTargetServerDispatchException(rp.toString());
161    }
162    BufferNode node = nodeMap.get(key);
163    if (node == null) {
164      // If null here, it means node has been removed because it crashed. This happens when server
165      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
166      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
167    }
168    node.add(rp);
169    // Check our node still in the map; could have been removed by #removeNode.
170    if (!nodeMap.containsValue(node)) {
171      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
172    }
173  }
174
175  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
176    BufferNode node = nodeMap.get(key);
177    if (node == null) {
178      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
179        key);
180      return;
181    }
182    node.operationCompleted(rp);
183  }
184
185  /**
186   * Remove a remote node
187   * @param key the node identifier
188   */
189  public boolean removeNode(final TRemote key) {
190    final BufferNode node = nodeMap.remove(key);
191    if (node == null) return false;
192    node.abortOperationsInQueue();
193    return true;
194  }
195
196  // ============================================================================================
197  //  Task Helpers
198  // ============================================================================================
199  protected final void submitTask(Runnable task) {
200    threadPool.execute(task);
201  }
202
203  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
204    timeoutExecutor.add(new DelayedTask(task, delay, unit));
205  }
206
207  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
208  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
209
210  /**
211   * Data structure with reference to remote operation.
212   */
213  public static abstract class RemoteOperation {
214    private final RemoteProcedure remoteProcedure;
215
216    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
217      this.remoteProcedure = remoteProcedure;
218    }
219
220    public RemoteProcedure getRemoteProcedure() {
221      return remoteProcedure;
222    }
223  }
224
225  /**
226   * Remote procedure reference.
227   */
228  public interface RemoteProcedure<TEnv, TRemote> {
229    /**
230     * For building the remote operation.
231     */
232    RemoteOperation remoteCallBuild(TEnv env, TRemote remote);
233
234    /**
235     * Called when the executeProcedure call is failed.
236     */
237    boolean remoteCallFailed(TEnv env, TRemote remote, IOException exception);
238
239    /**
240     * Called when RS tells the remote procedure is succeeded through the
241     * {@code reportProcedureDone} method.
242     */
243    void remoteOperationCompleted(TEnv env);
244
245    /**
246     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
247     * method.
248     */
249    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
250
251    /**
252     * Whether store this remote procedure in dispatched queue
253     * only OpenRegionProcedure and CloseRegionProcedure return false since they are
254     * not fully controlled by dispatcher
255     */
256    default boolean storeInDispatchedQueue() {
257      return true;
258    }
259
260  }
261
262  /**
263   * Account of what procedures are running on remote node.
264   */
265  public interface RemoteNode<TEnv, TRemote> {
266    TRemote getKey();
267
268    void add(RemoteProcedure<TEnv, TRemote> operation);
269
270    void dispatch();
271  }
272
273  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
274      final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
275    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
276    for (RemoteProcedure proc : remoteProcedures) {
277      RemoteOperation operation = proc.remoteCallBuild(env, remote);
278      requestByType.put(operation.getClass(), operation);
279    }
280    return requestByType;
281  }
282
283  protected <T extends RemoteOperation> List<T> fetchType(
284      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
285    return (List<T>)requestByType.removeAll(type);
286  }
287
288  // ============================================================================================
289  //  Timeout Helpers
290  // ============================================================================================
291  private final class TimeoutExecutorThread extends Thread {
292    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
293
294    public TimeoutExecutorThread() {
295      super("ProcedureDispatcherTimeoutThread");
296    }
297
298    @Override
299    public void run() {
300      while (running.get()) {
301        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
302        if (task == null || task == DelayedUtil.DELAYED_POISON) {
303          // the executor may be shutting down, and the task is just the shutdown request
304          continue;
305        }
306        if (task instanceof DelayedTask) {
307          threadPool.execute(((DelayedTask) task).getObject());
308        } else {
309          ((BufferNode) task).dispatch();
310        }
311      }
312    }
313
314    public void add(final DelayedWithTimeout delayed) {
315      queue.add(delayed);
316    }
317
318    public void remove(final DelayedWithTimeout delayed) {
319      queue.remove(delayed);
320    }
321
322    public void sendStopSignal() {
323      queue.add(DelayedUtil.DELAYED_POISON);
324    }
325
326    public void awaitTermination() {
327      try {
328        final long startTime = EnvironmentEdgeManager.currentTime();
329        for (int i = 0; isAlive(); ++i) {
330          sendStopSignal();
331          join(250);
332          if (i > 0 && (i % 8) == 0) {
333            LOG.warn("Waiting termination of thread " + getName() + ", " +
334              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
335          }
336        }
337      } catch (InterruptedException e) {
338        LOG.warn(getName() + " join wait got interrupted", e);
339      }
340    }
341  }
342
343  // ============================================================================================
344  //  Internals Helpers
345  // ============================================================================================
346
347  /**
348   * Node that contains a set of RemoteProcedures
349   */
350  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
351      implements RemoteNode<TEnv, TRemote> {
352    private Set<RemoteProcedure> operations;
353    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
354
355    protected BufferNode(final TRemote key) {
356      super(key, 0);
357    }
358
359    @Override
360    public TRemote getKey() {
361      return getObject();
362    }
363
364    @Override
365    public synchronized void add(final RemoteProcedure operation) {
366      if (this.operations == null) {
367        this.operations = new HashSet<>();
368        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
369        timeoutExecutor.add(this);
370      }
371      this.operations.add(operation);
372      if (this.operations.size() > queueMaxSize) {
373        timeoutExecutor.remove(this);
374        dispatch();
375      }
376    }
377
378    @Override
379    public synchronized void dispatch() {
380      if (operations != null) {
381        remoteDispatch(getKey(), operations);
382        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
383            .forEach(operation -> dispatchedOperations.add(operation));
384        this.operations = null;
385      }
386    }
387
388    public synchronized void abortOperationsInQueue() {
389      if (operations != null) {
390        abortPendingOperations(getKey(), operations);
391        this.operations = null;
392      }
393      abortPendingOperations(getKey(), dispatchedOperations);
394      this.dispatchedOperations.clear();
395    }
396
397    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
398      this.dispatchedOperations.remove(remoteProcedure);
399    }
400
401    @Override
402    public String toString() {
403      return super.toString() + ", operations=" + this.operations;
404    }
405  }
406
407  /**
408   * Delayed object that holds a FutureTask.
409   * <p/>
410   * used to submit something later to the thread-pool.
411   */
412  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
413    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
414      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
415    }
416  };
417}