001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2;
020
021import java.io.IOException;
022import java.lang.Thread.UncaughtExceptionHandler;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Optional;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.DelayQueue;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp;
035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
036import org.apache.hadoop.hbase.procedure2.util.StringUtils;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
045
046/**
047 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit
048 * count threshold. Creates its own threadpool to run RPCs with timeout.
049 * <ul>
050 * <li>Each server queue has a dispatch buffer</li>
051 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li>
052 * </ul>
053 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done,
054 * call {@link #stop()}.
055 */
056@InterfaceAudience.Private
057public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
058  private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
059
060  public static final String THREAD_POOL_SIZE_CONF_KEY =
061      "hbase.procedure.remote.dispatcher.threadpool.size";
062  private static final int DEFAULT_THREAD_POOL_SIZE = 128;
063
064  public static final String DISPATCH_DELAY_CONF_KEY =
065      "hbase.procedure.remote.dispatcher.delay.msec";
066  private static final int DEFAULT_DISPATCH_DELAY = 150;
067
068  public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY =
069      "hbase.procedure.remote.dispatcher.max.queue.size";
070  private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
071
072  private final AtomicBoolean running = new AtomicBoolean(false);
073  private final ConcurrentHashMap<TRemote, BufferNode> nodeMap =
074      new ConcurrentHashMap<TRemote, BufferNode>();
075
076  private final int operationDelay;
077  private final int queueMaxSize;
078  private final int corePoolSize;
079
080  private TimeoutExecutorThread timeoutExecutor;
081  private ThreadPoolExecutor threadPool;
082
083  protected RemoteProcedureDispatcher(Configuration conf) {
084    this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE);
085    this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY);
086    this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE);
087  }
088
089  public boolean start() {
090    if (running.getAndSet(true)) {
091      LOG.warn("Already running");
092      return false;
093    }
094
095    LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " +
096        "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay);
097
098    // Create the timeout executor
099    timeoutExecutor = new TimeoutExecutorThread();
100    timeoutExecutor.start();
101
102    // Create the thread pool that will execute RPCs
103    threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS,
104      new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d")
105        .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build());
106    return true;
107  }
108
109  public boolean stop() {
110    if (!running.getAndSet(false)) {
111      return false;
112    }
113
114    LOG.info("Stopping procedure remote dispatcher");
115
116    // send stop signals
117    timeoutExecutor.sendStopSignal();
118    threadPool.shutdownNow();
119    return true;
120  }
121
122  public void join() {
123    assert !running.get() : "expected not running";
124
125    // wait the timeout executor
126    timeoutExecutor.awaitTermination();
127    timeoutExecutor = null;
128
129    // wait for the thread pool to terminate
130    threadPool.shutdownNow();
131    try {
132      while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
133        LOG.warn("Waiting for thread-pool to terminate");
134      }
135    } catch (InterruptedException e) {
136      LOG.warn("Interrupted while waiting for thread-pool termination", e);
137    }
138  }
139
140  protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler();
141
142  // ============================================================================================
143  //  Node Helpers
144  // ============================================================================================
145  /**
146   * Add a node that will be able to execute remote procedures
147   * @param key the node identifier
148   */
149  public void addNode(final TRemote key) {
150    assert key != null: "Tried to add a node with a null key";
151    final BufferNode newNode = new BufferNode(key);
152    nodeMap.putIfAbsent(key, newNode);
153  }
154
155  /**
156   * Add a remote rpc.
157   * @param key the node identifier
158   */
159  public void addOperationToNode(final TRemote key, RemoteProcedure rp)
160          throws NullTargetServerDispatchException, NoServerDispatchException,
161          NoNodeDispatchException {
162    if (key == null) {
163      throw new NullTargetServerDispatchException(rp.toString());
164    }
165    BufferNode node = nodeMap.get(key);
166    if (node == null) {
167      // If null here, it means node has been removed because it crashed. This happens when server
168      // is expired in ServerManager. ServerCrashProcedure may or may not have run.
169      throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
170    }
171    node.add(rp);
172    // Check our node still in the map; could have been removed by #removeNode.
173    if (!nodeMap.containsValue(node)) {
174      throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
175    }
176  }
177
178  public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) {
179    BufferNode node = nodeMap.get(key);
180    if (node == null) {
181      LOG.warn("since no node for this key {}, we can't removed the finished remote procedure",
182        key);
183      return;
184    }
185    node.operationCompleted(rp);
186  }
187
188  /**
189   * Remove a remote node
190   * @param key the node identifier
191   */
192  public boolean removeNode(final TRemote key) {
193    final BufferNode node = nodeMap.remove(key);
194    if (node == null) {
195      return false;
196    }
197
198    node.abortOperationsInQueue();
199    return true;
200  }
201
202  // ============================================================================================
203  //  Task Helpers
204  // ============================================================================================
205  protected final void submitTask(Runnable task) {
206    threadPool.execute(task);
207  }
208
209  protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
210    timeoutExecutor.add(new DelayedTask(task, delay, unit));
211  }
212
213  protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations);
214  protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations);
215
216  /**
217   * Data structure with reference to remote operation.
218   */
219  public static abstract class RemoteOperation {
220    private final RemoteProcedure remoteProcedure;
221
222    protected RemoteOperation(final RemoteProcedure remoteProcedure) {
223      this.remoteProcedure = remoteProcedure;
224    }
225
226    public RemoteProcedure getRemoteProcedure() {
227      return remoteProcedure;
228    }
229  }
230
231  /**
232   * Remote procedure reference.
233   */
234  public interface RemoteProcedure<TEnv, TRemote> {
235    /**
236     * For building the remote operation.
237     * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been
238     * finished already. This is possible, as we may have already sent the procedure to RS but then
239     * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the
240     * procedure and execute it and then report back, before we retry again.
241     */
242    Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote);
243
244    /**
245     * Called when the executeProcedure call is failed.
246     */
247    void remoteCallFailed(TEnv env, TRemote remote, IOException exception);
248
249    /**
250     * Called when RS tells the remote procedure is succeeded through the
251     * {@code reportProcedureDone} method.
252     */
253    void remoteOperationCompleted(TEnv env);
254
255    /**
256     * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone}
257     * method.
258     */
259    void remoteOperationFailed(TEnv env, RemoteProcedureException error);
260
261    /**
262     * Whether store this remote procedure in dispatched queue
263     * only OpenRegionProcedure and CloseRegionProcedure return false since they are
264     * not fully controlled by dispatcher
265     */
266    default boolean storeInDispatchedQueue() {
267      return true;
268    }
269  }
270
271  /**
272   * Account of what procedures are running on remote node.
273   */
274  public interface RemoteNode<TEnv, TRemote> {
275    TRemote getKey();
276
277    void add(RemoteProcedure<TEnv, TRemote> operation);
278
279    void dispatch();
280  }
281
282  protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env,
283      final TRemote remote, final Set<RemoteProcedure> remoteProcedures) {
284    final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create();
285    for (RemoteProcedure proc : remoteProcedures) {
286      Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
287      operation.ifPresent(op -> requestByType.put(op.getClass(), op));
288    }
289    return requestByType;
290  }
291
292  protected <T extends RemoteOperation> List<T> fetchType(
293      final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) {
294    return (List<T>)requestByType.removeAll(type);
295  }
296
297  // ============================================================================================
298  //  Timeout Helpers
299  // ============================================================================================
300  private final class TimeoutExecutorThread extends Thread {
301    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>();
302
303    public TimeoutExecutorThread() {
304      super("ProcedureDispatcherTimeoutThread");
305    }
306
307    @Override
308    public void run() {
309      while (running.get()) {
310        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
311        if (task == null || task == DelayedUtil.DELAYED_POISON) {
312          // the executor may be shutting down, and the task is just the shutdown request
313          continue;
314        }
315        if (task instanceof DelayedTask) {
316          threadPool.execute(((DelayedTask) task).getObject());
317        } else {
318          ((BufferNode) task).dispatch();
319        }
320      }
321    }
322
323    public void add(final DelayedWithTimeout delayed) {
324      queue.add(delayed);
325    }
326
327    public void remove(final DelayedWithTimeout delayed) {
328      queue.remove(delayed);
329    }
330
331    public void sendStopSignal() {
332      queue.add(DelayedUtil.DELAYED_POISON);
333    }
334
335    public void awaitTermination() {
336      try {
337        final long startTime = EnvironmentEdgeManager.currentTime();
338        for (int i = 0; isAlive(); ++i) {
339          sendStopSignal();
340          join(250);
341          if (i > 0 && (i % 8) == 0) {
342            LOG.warn("Waiting termination of thread " + getName() + ", " +
343              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
344          }
345        }
346      } catch (InterruptedException e) {
347        LOG.warn(getName() + " join wait got interrupted", e);
348      }
349    }
350  }
351
352  // ============================================================================================
353  //  Internals Helpers
354  // ============================================================================================
355
356  /**
357   * Node that contains a set of RemoteProcedures
358   */
359  protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote>
360      implements RemoteNode<TEnv, TRemote> {
361    private Set<RemoteProcedure> operations;
362    private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>();
363
364    protected BufferNode(final TRemote key) {
365      super(key, 0);
366    }
367
368    @Override
369    public TRemote getKey() {
370      return getObject();
371    }
372
373    @Override
374    public synchronized void add(final RemoteProcedure operation) {
375      if (this.operations == null) {
376        this.operations = new HashSet<>();
377        setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay);
378        timeoutExecutor.add(this);
379      }
380      this.operations.add(operation);
381      if (this.operations.size() > queueMaxSize) {
382        timeoutExecutor.remove(this);
383        dispatch();
384      }
385    }
386
387    @Override
388    public synchronized void dispatch() {
389      if (operations != null) {
390        remoteDispatch(getKey(), operations);
391        operations.stream().filter(operation -> operation.storeInDispatchedQueue())
392            .forEach(operation -> dispatchedOperations.add(operation));
393        this.operations = null;
394      }
395    }
396
397    public synchronized void abortOperationsInQueue() {
398      if (operations != null) {
399        abortPendingOperations(getKey(), operations);
400        this.operations = null;
401      }
402      abortPendingOperations(getKey(), dispatchedOperations);
403      this.dispatchedOperations.clear();
404    }
405
406    public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){
407      this.dispatchedOperations.remove(remoteProcedure);
408    }
409
410    @Override
411    public String toString() {
412      return super.toString() + ", operations=" + this.operations;
413    }
414  }
415
416  /**
417   * Delayed object that holds a FutureTask.
418   * <p/>
419   * used to submit something later to the thread-pool.
420   */
421  private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> {
422    public DelayedTask(Runnable task, long delay, TimeUnit unit) {
423      super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
424    }
425  };
426}