001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
036import org.apache.hadoop.hbase.procedure2.util.StringUtils;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
044
045/**
046 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
047 * count threshold. Creates its own threadpool to run RPCs with timeout.
048 * <ul>
049 * <li>Each server queue has a dispatch buffer</li>
050 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
051 * </ul>
052 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done,
053 * call {@link #stop()}.
054 */
055@InterfaceAudience.Private
056public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
057  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
058
059  public static final String THREAD_POOL_SIZE_CONF_KEY =
060      "hbase.procedure.remote.dispatcher.threadpool.size";
061  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
062
063  public static final String DISPATCH_DELAY_CONF_KEY =
064      "hbase.procedure.remote.dispatcher.delay.msec";
065  private static final int DEFAULT_DISPATCH_DELAY = 150;
066
067  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
068      "hbase.procedure.remote.dispatcher.max.queue.size";
069  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
070
071  private final AtomicBoolean running = new AtomicBoolean(false);
072  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
073      new ConcurrentHashMap<TRemote, BufferNode>();
074
075  private final int operationDelay;
076  private final int queueMaxSize;
077  private final int corePoolSize;
078
079  private TimeoutExecutorThread timeoutExecutor;
080  private ThreadPoolExecutor threadPool;
081
082  protected RemoteProcedureDispatcher(Configuration conf) {
083    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
084    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
085    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
086  }
087
088  public boolean start() {
089    if (running.getAndSet(true)) {
090      LOG.warn("Already running");
091      return false;
092    }
093
094    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " +
095        "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
096
097    // Create the timeout executor
098    timeoutExecutor = new TimeoutExecutorThread();
099    timeoutExecutor.start();
100
101    // Create the thread pool that will execute RPCs
102    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
103      Threads.newDaemonThreadFactory(this.getClass().getSimpleName(),
104          getUncaughtExceptionHandler()));
105    return true;
106  }
107
108  public boolean stop() {
109    if (!running.getAndSet(false)) {
110      return false;
111    }
112
113    LOG.info("Stopping procedure remote dispatcher");
114
115    // send stop signals
116    timeoutExecutor.sendStopSignal();
117    threadPool.shutdownNow();
118    return true;
119  }
120
121  public void join() {
122    assert !running.get() : "expected not running";
123
124    // wait the timeout executor
125    timeoutExecutor.awaitTermination();
126    timeoutExecutor = null;
127
128    // wait for the thread pool to terminate
129    threadPool.shutdownNow();
130    try {
131      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
132        LOG.warn("Waiting for thread-pool to terminate");
133      }
134    } catch (InterruptedException e) {
135      LOG.warn("Interrupted while waiting for thread-pool termination", e);
136    }
137  }
138
139  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
140
141  // ============================================================================================
142  //  Node Helpers
143  // ============================================================================================
144  /**
145   * Add a node that will be able to execute remote procedures
146   * @param key the node identifier
147   */
148  public void addNode(final TRemote key) {
149    assert key != null: "Tried to add a node with a null key";
150    nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
151  }
152
153  /**
154   * Add a remote rpc.
155   * @param key the node identifier
156   */
157  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
158          throws NullTargetServerDispatchException, NoServerDispatchException,
159          NoNodeDispatchException {
160    if (key == null) {
161      throw new NullTargetServerDispatchException(rp.toString());
162    }
163    BufferNode node = nodeMap.get(key);
164    if (node == null) {
165      // If null here, it means node has been removed because it crashed. This happens when server
166      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
167      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
168    }
169    node.add(rp);
170    // Check our node still in the map; could have been removed by #removeNode.
171    if (!nodeMap.containsValue(node)) {
172      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
173    }
174  }
175
176  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
177    BufferNode node = nodeMap.get(key);
178    if (node == null) {
179      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
180        key);
181      return;
182    }
183    node.operationCompleted(rp);
184  }
185
186  /**
187   * Remove a remote node
188   * @param key the node identifier
189   */
190  public boolean removeNode(final TRemote key) {
191    final BufferNode node = nodeMap.remove(key);
192    if (node == null) {
193      return false;
194    }
195
196    node.abortOperationsInQueue();
197    return true;
198  }
199
200  // ============================================================================================
201  //  Task Helpers
202  // ============================================================================================
203  protected final void submitTask(Runnable task) {
204    threadPool.execute(task);
205  }
206
207  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
208    timeoutExecutor.add(new DelayedTask(task, delay, unit));
209  }
210
211  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
212  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
213
214  /**
215   * Data structure with reference to remote operation.
216   */
217  public static abstract class RemoteOperation {
218    private final RemoteProcedure remoteProcedure;
219
220    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
221      this.remoteProcedure = remoteProcedure;
222    }
223
224    public RemoteProcedure getRemoteProcedure() {
225      return remoteProcedure;
226    }
227  }
228
229  /**
230   * Remote procedure reference.
231   */
232  public interface RemoteProcedure<TEnv, TRemote> {
233    /**
234     * For building the remote operation.
235     * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been
236     * finished already. This is possible, as we may have already sent the procedure to RS but then
237     * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the
238     * procedure and execute it and then report back, before we retry again.
239     */
240    Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);
241
242    /**
243     * Called when the executeProcedure call is failed.
244     */
245    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
246
247    /**
248     * Called when RS tells the remote procedure is succeeded through the
249     * {@code reportProcedureDone} method.
250     */
251    void remoteOperationCompleted(TEnv env);
252
253    /**
254     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
255     * method.
256     */
257    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
258
259    /**
260     * Whether store this remote procedure in dispatched queue
261     * only OpenRegionProcedure and CloseRegionProcedure return false since they are
262     * not fully controlled by dispatcher
263     */
264    default boolean storeInDispatchedQueue() {
265      return true;
266    }
267  }
268
269  /**
270   * Account of what procedures are running on remote node.
271   */
272  public interface RemoteNode<TEnv, TRemote> {
273    TRemote getKey();
274
275    void add(RemoteProcedure<TEnv, TRemote> operation);
276
277    void dispatch();
278  }
279
280  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
281      final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
282    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
283    for (RemoteProcedure proc : remoteProcedures) {
284      Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
285      operation.ifPresent(op -> requestByType.put(op.getClass(), op));
286    }
287    return requestByType;
288  }
289
290  protected <T extends RemoteOperation> List<T> fetchType(
291      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
292    return (List<T>)requestByType.removeAll(type);
293  }
294
295  // ============================================================================================
296  //  Timeout Helpers
297  // ============================================================================================
298  private final class TimeoutExecutorThread extends Thread {
299    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
300
301    public TimeoutExecutorThread() {
302      super("ProcedureDispatcherTimeoutThread");
303    }
304
305    @Override
306    public void run() {
307      while (running.get()) {
308        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
309        if (task == null || task == DelayedUtil.DELAYED_POISON) {
310          // the executor may be shutting down, and the task is just the shutdown request
311          continue;
312        }
313        if (task instanceof DelayedTask) {
314          threadPool.execute(((DelayedTask) task).getObject());
315        } else {
316          ((BufferNode) task).dispatch();
317        }
318      }
319    }
320
321    public void add(final DelayedWithTimeout delayed) {
322      queue.add(delayed);
323    }
324
325    public void remove(final DelayedWithTimeout delayed) {
326      queue.remove(delayed);
327    }
328
329    public void sendStopSignal() {
330      queue.add(DelayedUtil.DELAYED_POISON);
331    }
332
333    public void awaitTermination() {
334      try {
335        final long startTime = EnvironmentEdgeManager.currentTime();
336        for (int i = 0; isAlive(); ++i) {
337          sendStopSignal();
338          join(250);
339          if (i > 0 && (i % 8) == 0) {
340            LOG.warn("Waiting termination of thread " + getName() + ", " +
341              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
342          }
343        }
344      } catch (InterruptedException e) {
345        LOG.warn(getName() + " join wait got interrupted", e);
346      }
347    }
348  }
349
350  // ============================================================================================
351  //  Internals Helpers
352  // ============================================================================================
353
354  /**
355   * Node that contains a set of RemoteProcedures
356   */
357  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
358      implements RemoteNode<TEnv, TRemote> {
359    private Set<RemoteProcedure> operations;
360    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
361
362    protected BufferNode(final TRemote key) {
363      super(key, 0);
364    }
365
366    @Override
367    public TRemote getKey() {
368      return getObject();
369    }
370
371    @Override
372    public synchronized void add(final RemoteProcedure operation) {
373      if (this.operations == null) {
374        this.operations = new HashSet<>();
375        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
376        timeoutExecutor.add(this);
377      }
378      this.operations.add(operation);
379      if (this.operations.size() > queueMaxSize) {
380        timeoutExecutor.remove(this);
381        dispatch();
382      }
383    }
384
385    @Override
386    public synchronized void dispatch() {
387      if (operations != null) {
388        remoteDispatch(getKey(), operations);
389        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
390            .forEach(operation -> dispatchedOperations.add(operation));
391        this.operations = null;
392      }
393    }
394
395    public synchronized void abortOperationsInQueue() {
396      if (operations != null) {
397        abortPendingOperations(getKey(), operations);
398        this.operations = null;
399      }
400      abortPendingOperations(getKey(), dispatchedOperations);
401      this.dispatchedOperations.clear();
402    }
403
404    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
405      this.dispatchedOperations.remove(remoteProcedure);
406    }
407
408    @Override
409    public String toString() {
410      return super.toString() + ", operations=" + this.operations;
411    }
412  }
413
414  /**
415   * Delayed object that holds a FutureTask.
416   * <p/>
417   * used to submit something later to the thread-pool.
418   */
419  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
420    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
421      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
422    }
423  }
424}