001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.lang.Thread.UncaughtExceptionHandler;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Optional;
025import java.util.Set;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.DelayQueue;
028import java.util.concurrent.ThreadPoolExecutor;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
035import org.apache.hadoop.hbase.procedure2.util.StringUtils;
036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
037import org.apache.hadoop.hbase.util.Threads;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
044
045/**
046 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit count
047 * threshold. Creates its own threadpool to run RPCs with timeout.
048 * <ul>
049 * <li>Each server queue has a dispatch buffer</li>
050 * <li>Once the dispatch buffer reaches a threshold-size/time we send
051 * <li>
052 * </ul>
053 * <p>
054 * Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, call {@link #stop()}.
055 */
056@InterfaceAudience.Private
057public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
058  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
059
060  public static final String THREAD_POOL_SIZE_CONF_KEY =
061    "hbase.procedure.remote.dispatcher.threadpool.size";
062  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
063
064  public static final String DISPATCH_DELAY_CONF_KEY =
065    "hbase.procedure.remote.dispatcher.delay.msec";
066  private static final int DEFAULT_DISPATCH_DELAY = 150;
067
068  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
069    "hbase.procedure.remote.dispatcher.max.queue.size";
070  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
071
072  private final AtomicBoolean running = new AtomicBoolean(false);
073  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
074    new ConcurrentHashMap<TRemote, BufferNode>();
075
076  private final int operationDelay;
077  private final int queueMaxSize;
078  private final int corePoolSize;
079
080  private TimeoutExecutorThread timeoutExecutor;
081  private ThreadPoolExecutor threadPool;
082
083  protected RemoteProcedureDispatcher(Configuration conf) {
084    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
085    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
086    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
087  }
088
089  public boolean start() {
090    if (running.getAndSet(true)) {
091      LOG.warn("Already running");
092      return false;
093    }
094
095    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, "
096      + "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
097
098    // Create the timeout executor
099    timeoutExecutor = new TimeoutExecutorThread();
100    timeoutExecutor.start();
101
102    // Create the thread pool that will execute RPCs
103    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
104      new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
105        .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
106    return true;
107  }
108
109  protected void setTimeoutExecutorUncaughtExceptionHandler(UncaughtExceptionHandler eh) {
110    timeoutExecutor.setUncaughtExceptionHandler(eh);
111  }
112
113  public boolean stop() {
114    if (!running.getAndSet(false)) {
115      return false;
116    }
117
118    LOG.info("Stopping procedure remote dispatcher");
119
120    // send stop signals
121    timeoutExecutor.sendStopSignal();
122    threadPool.shutdownNow();
123    return true;
124  }
125
126  public void join() {
127    assert !running.get() : "expected not running";
128
129    // wait the timeout executor
130    timeoutExecutor.awaitTermination();
131    timeoutExecutor = null;
132
133    // wait for the thread pool to terminate
134    threadPool.shutdownNow();
135    try {
136      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
137        LOG.warn("Waiting for thread-pool to terminate");
138      }
139    } catch (InterruptedException e) {
140      LOG.warn("Interrupted while waiting for thread-pool termination", e);
141    }
142  }
143
144  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
145
146  // ============================================================================================
147  // Node Helpers
148  // ============================================================================================
149  /**
150   * Add a node that will be able to execute remote procedures
151   * @param key the node identifier
152   */
153  public void addNode(final TRemote key) {
154    assert key != null : "Tried to add a node with a null key";
155    nodeMap.computeIfAbsent(key, k -> new BufferNode(k));
156  }
157
158  /**
159   * Add a remote rpc.
160   * @param key the node identifier
161   */
162  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
163    throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
164    if (key == null) {
165      throw new NullTargetServerDispatchException(rp.toString());
166    }
167    BufferNode node = nodeMap.get(key);
168    if (node == null) {
169      // If null here, it means node has been removed because it crashed. This happens when server
170      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
171      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
172    }
173    node.add(rp);
174    // Check our node still in the map; could have been removed by #removeNode.
175    if (!nodeMap.containsValue(node)) {
176      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
177    }
178  }
179
180  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
181    BufferNode node = nodeMap.get(key);
182    if (node == null) {
183      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
184        key);
185      return;
186    }
187    node.operationCompleted(rp);
188  }
189
190  /**
191   * Remove a remote node
192   * @param key the node identifier
193   */
194  public boolean removeNode(final TRemote key) {
195    final BufferNode node = nodeMap.remove(key);
196    if (node == null) {
197      return false;
198    }
199
200    node.abortOperationsInQueue();
201    return true;
202  }
203
204  // ============================================================================================
205  // Task Helpers
206  // ============================================================================================
207  protected final void submitTask(Runnable task) {
208    threadPool.execute(task);
209  }
210
211  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
212    timeoutExecutor.add(new DelayedTask(task, delay, unit));
213  }
214
215  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
216
217  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
218
219  /**
220   * Data structure with reference to remote operation.
221   */
222  public static abstract class RemoteOperation {
223    private final RemoteProcedure remoteProcedure;
224
225    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
226      this.remoteProcedure = remoteProcedure;
227    }
228
229    public RemoteProcedure getRemoteProcedure() {
230      return remoteProcedure;
231    }
232  }
233
234  /**
235   * Remote procedure reference.
236   */
237  public interface RemoteProcedure<TEnv, TRemote> {
238    /**
239     * For building the remote operation. May be empty if no need to send remote call. Usually, this
240     * means the RemoteProcedure has been finished already. This is possible, as we may have already
241     * sent the procedure to RS but then the rpc connection is broken so the executeProcedures call
242     * fails, but the RS does receive the procedure and execute it and then report back, before we
243     * retry again.
244     */
245    Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);
246
247    /**
248     * Called when the executeProcedure call is failed.
249     */
250    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
251
252    /**
253     * Called when RS tells the remote procedure is succeeded through the
254     * {@code reportProcedureDone} method.
255     */
256    void remoteOperationCompleted(TEnv env);
257
258    /**
259     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
260     * method.
261     */
262    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
263
264    /**
265     * Whether store this remote procedure in dispatched queue only OpenRegionProcedure and
266     * CloseRegionProcedure return false since they are not fully controlled by dispatcher
267     */
268    default boolean storeInDispatchedQueue() {
269      return true;
270    }
271  }
272
273  /**
274   * Account of what procedures are running on remote node.
275   */
276  public interface RemoteNode<TEnv, TRemote> {
277    TRemote getKey();
278
279    void add(RemoteProcedure<TEnv, TRemote> operation);
280
281    void dispatch();
282  }
283
284  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
285    final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
286    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
287    for (RemoteProcedure proc : remoteProcedures) {
288      Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
289      operation.ifPresent(op -> requestByType.put(op.getClass(), op));
290    }
291    return requestByType;
292  }
293
294  protected <T extends RemoteOperation> List<T> fetchType(
295    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
296    return (List<T>) requestByType.removeAll(type);
297  }
298
299  // ============================================================================================
300  // Timeout Helpers
301  // ============================================================================================
302  private final class TimeoutExecutorThread extends Thread {
303    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
304
305    public TimeoutExecutorThread() {
306      super("ProcedureDispatcherTimeoutThread");
307    }
308
309    @Override
310    public void run() {
311      while (running.get()) {
312        final DelayedWithTimeout task =
313          DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS);
314        if (task == null || task == DelayedUtil.DELAYED_POISON) {
315          if (task == null && queue.size() > 0) {
316            LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting"
317              + " elapsed. If this is repeated consistently, it means no element is getting expired"
318              + " from the queue and it might freeze the system. Queue: {}", queue);
319          }
320          // the executor may be shutting down, and the task is just the shutdown request
321          continue;
322        }
323        if (task instanceof DelayedTask) {
324          threadPool.execute(((DelayedTask) task).getObject());
325        } else {
326          ((BufferNode) task).dispatch();
327        }
328      }
329    }
330
331    public void add(final DelayedWithTimeout delayed) {
332      queue.add(delayed);
333    }
334
335    public void remove(final DelayedWithTimeout delayed) {
336      queue.remove(delayed);
337    }
338
339    public void sendStopSignal() {
340      queue.add(DelayedUtil.DELAYED_POISON);
341    }
342
343    public void awaitTermination() {
344      try {
345        final long startTime = EnvironmentEdgeManager.currentTime();
346        for (int i = 0; isAlive(); ++i) {
347          sendStopSignal();
348          join(250);
349          if (i > 0 && (i % 8) == 0) {
350            LOG.warn("Waiting termination of thread " + getName() + ", "
351              + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
352          }
353        }
354      } catch (InterruptedException e) {
355        LOG.warn(getName() + " join wait got interrupted", e);
356      }
357    }
358  }
359
360  // ============================================================================================
361  // Internals Helpers
362  // ============================================================================================
363
364  /**
365   * Node that contains a set of RemoteProcedures
366   */
367  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
368    implements RemoteNode<TEnv, TRemote> {
369    private Set<RemoteProcedure> operations;
370    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
371
372    protected BufferNode(final TRemote key) {
373      super(key, 0);
374    }
375
376    @Override
377    public TRemote getKey() {
378      return getObject();
379    }
380
381    @Override
382    public synchronized void add(final RemoteProcedure operation) {
383      if (this.operations == null) {
384        this.operations = new HashSet<>();
385        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
386        timeoutExecutor.add(this);
387      }
388      this.operations.add(operation);
389      if (this.operations.size() > queueMaxSize) {
390        timeoutExecutor.remove(this);
391        dispatch();
392      }
393    }
394
395    @Override
396    public synchronized void dispatch() {
397      if (operations != null) {
398        remoteDispatch(getKey(), operations);
399        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
400          .forEach(operation -> dispatchedOperations.add(operation));
401        this.operations = null;
402      }
403    }
404
405    public synchronized void abortOperationsInQueue() {
406      if (operations != null) {
407        abortPendingOperations(getKey(), operations);
408        this.operations = null;
409      }
410      abortPendingOperations(getKey(), dispatchedOperations);
411      this.dispatchedOperations.clear();
412    }
413
414    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure) {
415      this.dispatchedOperations.remove(remoteProcedure);
416    }
417
418    @Override
419    public String toString() {
420      return super.toString() + ", operations=" + this.operations;
421    }
422  }
423
424  /**
425   * Delayed object that holds a FutureTask.
426   * <p/>
427   * used to submit something later to the thread-pool.
428   */
429  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
430    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
431      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
432    }
433  }
434}