001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Optional; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.DelayQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 035import org.apache.hadoop.hbase.procedure2.util.StringUtils; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 043import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 044 045/** 046 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit count 047 * threshold. Creates its own threadpool to run RPCs with timeout. 048 * <ul> 049 * <li>Each server queue has a dispatch buffer</li> 050 * <li>Once the dispatch buffer reaches a threshold-size/time we send 051 * <li> 052 * </ul> 053 * <p> 054 * Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, call {@link #stop()}. 055 */ 056@InterfaceAudience.Private 057public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 058 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 059 060 public static final String THREAD_POOL_SIZE_CONF_KEY = 061 "hbase.procedure.remote.dispatcher.threadpool.size"; 062 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 063 064 public static final String DISPATCH_DELAY_CONF_KEY = 065 "hbase.procedure.remote.dispatcher.delay.msec"; 066 private static final int DEFAULT_DISPATCH_DELAY = 150; 067 068 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 069 "hbase.procedure.remote.dispatcher.max.queue.size"; 070 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 071 072 private final AtomicBoolean running = new AtomicBoolean(false); 073 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 074 new ConcurrentHashMap<TRemote, BufferNode>(); 075 076 private final int operationDelay; 077 private final int queueMaxSize; 078 private final int corePoolSize; 079 080 private TimeoutExecutorThread timeoutExecutor; 081 private ThreadPoolExecutor threadPool; 082 083 protected RemoteProcedureDispatcher(Configuration conf) { 084 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 085 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 086 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 087 } 088 089 public boolean start() { 090 if (running.getAndSet(true)) { 091 LOG.warn("Already running"); 092 return false; 093 } 094 095 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " 096 + "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 097 098 // Create the timeout executor 099 timeoutExecutor = new TimeoutExecutorThread(); 100 timeoutExecutor.start(); 101 102 // Create the thread pool that will execute RPCs 103 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 104 new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d") 105 .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build()); 106 return true; 107 } 108 109 protected void setTimeoutExecutorUncaughtExceptionHandler(UncaughtExceptionHandler eh) { 110 timeoutExecutor.setUncaughtExceptionHandler(eh); 111 } 112 113 public boolean stop() { 114 if (!running.getAndSet(false)) { 115 return false; 116 } 117 118 LOG.info("Stopping procedure remote dispatcher"); 119 120 // send stop signals 121 timeoutExecutor.sendStopSignal(); 122 threadPool.shutdownNow(); 123 return true; 124 } 125 126 public void join() { 127 assert !running.get() : "expected not running"; 128 129 // wait the timeout executor 130 timeoutExecutor.awaitTermination(); 131 timeoutExecutor = null; 132 133 // wait for the thread pool to terminate 134 threadPool.shutdownNow(); 135 try { 136 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 137 LOG.warn("Waiting for thread-pool to terminate"); 138 } 139 } catch (InterruptedException e) { 140 LOG.warn("Interrupted while waiting for thread-pool termination", e); 141 } 142 } 143 144 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 145 146 // ============================================================================================ 147 // Node Helpers 148 // ============================================================================================ 149 /** 150 * Add a node that will be able to execute remote procedures 151 * @param key the node identifier 152 */ 153 public void addNode(final TRemote key) { 154 assert key != null : "Tried to add a node with a null key"; 155 nodeMap.computeIfAbsent(key, k -> new BufferNode(k)); 156 } 157 158 /** 159 * Add a remote rpc. 160 * @param key the node identifier 161 */ 162 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 163 throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException { 164 if (key == null) { 165 throw new NullTargetServerDispatchException(rp.toString()); 166 } 167 BufferNode node = nodeMap.get(key); 168 if (node == null) { 169 // If null here, it means node has been removed because it crashed. This happens when server 170 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 171 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 172 } 173 node.add(rp); 174 // Check our node still in the map; could have been removed by #removeNode. 175 if (!nodeMap.containsValue(node)) { 176 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 177 } 178 } 179 180 public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { 181 BufferNode node = nodeMap.get(key); 182 if (node == null) { 183 LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", 184 key); 185 return; 186 } 187 node.operationCompleted(rp); 188 } 189 190 /** 191 * Remove a remote node 192 * @param key the node identifier 193 */ 194 public boolean removeNode(final TRemote key) { 195 final BufferNode node = nodeMap.remove(key); 196 if (node == null) { 197 return false; 198 } 199 200 node.abortOperationsInQueue(); 201 return true; 202 } 203 204 // ============================================================================================ 205 // Task Helpers 206 // ============================================================================================ 207 protected final void submitTask(Runnable task) { 208 threadPool.execute(task); 209 } 210 211 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 212 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 213 } 214 215 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 216 217 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 218 219 /** 220 * Data structure with reference to remote operation. 221 */ 222 public static abstract class RemoteOperation { 223 private final RemoteProcedure remoteProcedure; 224 225 protected RemoteOperation(final RemoteProcedure remoteProcedure) { 226 this.remoteProcedure = remoteProcedure; 227 } 228 229 public RemoteProcedure getRemoteProcedure() { 230 return remoteProcedure; 231 } 232 } 233 234 /** 235 * Remote procedure reference. 236 */ 237 public interface RemoteProcedure<TEnv, TRemote> { 238 /** 239 * For building the remote operation. May be empty if no need to send remote call. Usually, this 240 * means the RemoteProcedure has been finished already. This is possible, as we may have already 241 * sent the procedure to RS but then the rpc connection is broken so the executeProcedures call 242 * fails, but the RS does receive the procedure and execute it and then report back, before we 243 * retry again. 244 */ 245 Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote); 246 247 /** 248 * Called when the executeProcedure call is failed. 249 */ 250 void remoteCallFailed(TEnv env, TRemote remote, IOException exception); 251 252 /** 253 * Called when RS tells the remote procedure is succeeded through the 254 * {@code reportProcedureDone} method. 255 */ 256 void remoteOperationCompleted(TEnv env); 257 258 /** 259 * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} 260 * method. 261 */ 262 void remoteOperationFailed(TEnv env, RemoteProcedureException error); 263 264 /** 265 * Whether store this remote procedure in dispatched queue only OpenRegionProcedure and 266 * CloseRegionProcedure return false since they are not fully controlled by dispatcher 267 */ 268 default boolean storeInDispatchedQueue() { 269 return true; 270 } 271 } 272 273 /** 274 * Account of what procedures are running on remote node. 275 */ 276 public interface RemoteNode<TEnv, TRemote> { 277 TRemote getKey(); 278 279 void add(RemoteProcedure<TEnv, TRemote> operation); 280 281 void dispatch(); 282 } 283 284 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 285 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 286 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 287 for (RemoteProcedure proc : remoteProcedures) { 288 Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote); 289 operation.ifPresent(op -> requestByType.put(op.getClass(), op)); 290 } 291 return requestByType; 292 } 293 294 protected <T extends RemoteOperation> List<T> fetchType( 295 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 296 return (List<T>) requestByType.removeAll(type); 297 } 298 299 // ============================================================================================ 300 // Timeout Helpers 301 // ============================================================================================ 302 private final class TimeoutExecutorThread extends Thread { 303 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 304 305 public TimeoutExecutorThread() { 306 super("ProcedureDispatcherTimeoutThread"); 307 } 308 309 @Override 310 public void run() { 311 while (running.get()) { 312 final DelayedWithTimeout task = 313 DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS); 314 if (task == null || task == DelayedUtil.DELAYED_POISON) { 315 if (task == null && queue.size() > 0) { 316 LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting" 317 + " elapsed. If this is repeated consistently, it means no element is getting expired" 318 + " from the queue and it might freeze the system. Queue: {}", queue); 319 } 320 // the executor may be shutting down, and the task is just the shutdown request 321 continue; 322 } 323 if (task instanceof DelayedTask) { 324 threadPool.execute(((DelayedTask) task).getObject()); 325 } else { 326 ((BufferNode) task).dispatch(); 327 } 328 } 329 } 330 331 public void add(final DelayedWithTimeout delayed) { 332 queue.add(delayed); 333 } 334 335 public void remove(final DelayedWithTimeout delayed) { 336 queue.remove(delayed); 337 } 338 339 public void sendStopSignal() { 340 queue.add(DelayedUtil.DELAYED_POISON); 341 } 342 343 public void awaitTermination() { 344 try { 345 final long startTime = EnvironmentEdgeManager.currentTime(); 346 for (int i = 0; isAlive(); ++i) { 347 sendStopSignal(); 348 join(250); 349 if (i > 0 && (i % 8) == 0) { 350 LOG.warn("Waiting termination of thread " + getName() + ", " 351 + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 352 } 353 } 354 } catch (InterruptedException e) { 355 LOG.warn(getName() + " join wait got interrupted", e); 356 } 357 } 358 } 359 360 // ============================================================================================ 361 // Internals Helpers 362 // ============================================================================================ 363 364 /** 365 * Node that contains a set of RemoteProcedures 366 */ 367 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 368 implements RemoteNode<TEnv, TRemote> { 369 private Set<RemoteProcedure> operations; 370 private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); 371 372 protected BufferNode(final TRemote key) { 373 super(key, 0); 374 } 375 376 @Override 377 public TRemote getKey() { 378 return getObject(); 379 } 380 381 @Override 382 public synchronized void add(final RemoteProcedure operation) { 383 if (this.operations == null) { 384 this.operations = new HashSet<>(); 385 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 386 timeoutExecutor.add(this); 387 } 388 this.operations.add(operation); 389 if (this.operations.size() > queueMaxSize) { 390 timeoutExecutor.remove(this); 391 dispatch(); 392 } 393 } 394 395 @Override 396 public synchronized void dispatch() { 397 if (operations != null) { 398 remoteDispatch(getKey(), operations); 399 operations.stream().filter(operation -> operation.storeInDispatchedQueue()) 400 .forEach(operation -> dispatchedOperations.add(operation)); 401 this.operations = null; 402 } 403 } 404 405 public synchronized void abortOperationsInQueue() { 406 if (operations != null) { 407 abortPendingOperations(getKey(), operations); 408 this.operations = null; 409 } 410 abortPendingOperations(getKey(), dispatchedOperations); 411 this.dispatchedOperations.clear(); 412 } 413 414 public synchronized void operationCompleted(final RemoteProcedure remoteProcedure) { 415 this.dispatchedOperations.remove(remoteProcedure); 416 } 417 418 @Override 419 public String toString() { 420 return super.toString() + ", operations=" + this.operations; 421 } 422 } 423 424 /** 425 * Delayed object that holds a FutureTask. 426 * <p/> 427 * used to submit something later to the thread-pool. 428 */ 429 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 430 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 431 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 432 } 433 } 434}