001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.DelayQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 036import org.apache.hadoop.hbase.procedure2.util.StringUtils; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 044 045/** 046 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit 047 * count threshold. Creates its own threadpool to run RPCs with timeout. 048 * <ul> 049 * <li>Each server queue has a dispatch buffer</li> 050 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> 051 * </ul> 052 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, 053 * call {@link #stop()}. 054 */ 055@InterfaceAudience.Private 056public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 057 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 058 059 public static final String THREAD_POOL_SIZE_CONF_KEY = 060 "hbase.procedure.remote.dispatcher.threadpool.size"; 061 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 062 063 public static final String DISPATCH_DELAY_CONF_KEY = 064 "hbase.procedure.remote.dispatcher.delay.msec"; 065 private static final int DEFAULT_DISPATCH_DELAY = 150; 066 067 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 068 "hbase.procedure.remote.dispatcher.max.queue.size"; 069 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 070 071 private final AtomicBoolean running = new AtomicBoolean(false); 072 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 073 new ConcurrentHashMap<TRemote, BufferNode>(); 074 075 private final int operationDelay; 076 private final int queueMaxSize; 077 private final int corePoolSize; 078 079 private TimeoutExecutorThread timeoutExecutor; 080 private ThreadPoolExecutor threadPool; 081 082 protected RemoteProcedureDispatcher(Configuration conf) { 083 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 084 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 085 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 086 } 087 088 public boolean start() { 089 if (running.getAndSet(true)) { 090 LOG.warn("Already running"); 091 return false; 092 } 093 094 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " + 095 "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 096 097 // Create the timeout executor 098 timeoutExecutor = new TimeoutExecutorThread(); 099 timeoutExecutor.start(); 100 101 // Create the thread pool that will execute RPCs 102 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 103 Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), 104 getUncaughtExceptionHandler())); 105 return true; 106 } 107 108 public boolean stop() { 109 if (!running.getAndSet(false)) { 110 return false; 111 } 112 113 LOG.info("Stopping procedure remote dispatcher"); 114 115 // send stop signals 116 timeoutExecutor.sendStopSignal(); 117 threadPool.shutdownNow(); 118 return true; 119 } 120 121 public void join() { 122 assert !running.get() : "expected not running"; 123 124 // wait the timeout executor 125 timeoutExecutor.awaitTermination(); 126 timeoutExecutor = null; 127 128 // wait for the thread pool to terminate 129 threadPool.shutdownNow(); 130 try { 131 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 132 LOG.warn("Waiting for thread-pool to terminate"); 133 } 134 } catch (InterruptedException e) { 135 LOG.warn("Interrupted while waiting for thread-pool termination", e); 136 } 137 } 138 139 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 140 141 // ============================================================================================ 142 // Node Helpers 143 // ============================================================================================ 144 /** 145 * Add a node that will be able to execute remote procedures 146 * @param key the node identifier 147 */ 148 public void addNode(final TRemote key) { 149 assert key != null: "Tried to add a node with a null key"; 150 final BufferNode newNode = new BufferNode(key); 151 nodeMap.putIfAbsent(key, newNode); 152 } 153 154 /** 155 * Add a remote rpc. 156 * @param key the node identifier 157 */ 158 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 159 throws NullTargetServerDispatchException, NoServerDispatchException, 160 NoNodeDispatchException { 161 if (key == null) { 162 throw new NullTargetServerDispatchException(rp.toString()); 163 } 164 BufferNode node = nodeMap.get(key); 165 if (node == null) { 166 // If null here, it means node has been removed because it crashed. This happens when server 167 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 168 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 169 } 170 node.add(rp); 171 // Check our node still in the map; could have been removed by #removeNode. 172 if (!nodeMap.containsValue(node)) { 173 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 174 } 175 } 176 177 public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { 178 BufferNode node = nodeMap.get(key); 179 if (node == null) { 180 LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", 181 key); 182 return; 183 } 184 node.operationCompleted(rp); 185 } 186 187 /** 188 * Remove a remote node 189 * @param key the node identifier 190 */ 191 public boolean removeNode(final TRemote key) { 192 final BufferNode node = nodeMap.remove(key); 193 if (node == null) { 194 return false; 195 } 196 197 node.abortOperationsInQueue(); 198 return true; 199 } 200 201 // ============================================================================================ 202 // Task Helpers 203 // ============================================================================================ 204 protected final void submitTask(Runnable task) { 205 threadPool.execute(task); 206 } 207 208 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 209 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 210 } 211 212 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 213 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 214 215 /** 216 * Data structure with reference to remote operation. 217 */ 218 public static abstract class RemoteOperation { 219 private final RemoteProcedure remoteProcedure; 220 221 protected RemoteOperation(final RemoteProcedure remoteProcedure) { 222 this.remoteProcedure = remoteProcedure; 223 } 224 225 public RemoteProcedure getRemoteProcedure() { 226 return remoteProcedure; 227 } 228 } 229 230 /** 231 * Remote procedure reference. 232 */ 233 public interface RemoteProcedure<TEnv, TRemote> { 234 /** 235 * For building the remote operation. 236 * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been 237 * finished already. This is possible, as we may have already sent the procedure to RS but then 238 * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the 239 * procedure and execute it and then report back, before we retry again. 240 */ 241 Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote); 242 243 /** 244 * Called when the executeProcedure call is failed. 245 */ 246 void remoteCallFailed(TEnv env, TRemote remote, IOException exception); 247 248 /** 249 * Called when RS tells the remote procedure is succeeded through the 250 * {@code reportProcedureDone} method. 251 */ 252 void remoteOperationCompleted(TEnv env); 253 254 /** 255 * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} 256 * method. 257 */ 258 void remoteOperationFailed(TEnv env, RemoteProcedureException error); 259 260 /** 261 * Whether store this remote procedure in dispatched queue 262 * only OpenRegionProcedure and CloseRegionProcedure return false since they are 263 * not fully controlled by dispatcher 264 */ 265 default boolean storeInDispatchedQueue() { 266 return true; 267 } 268 } 269 270 /** 271 * Account of what procedures are running on remote node. 272 */ 273 public interface RemoteNode<TEnv, TRemote> { 274 TRemote getKey(); 275 276 void add(RemoteProcedure<TEnv, TRemote> operation); 277 278 void dispatch(); 279 } 280 281 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 282 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 283 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 284 for (RemoteProcedure proc : remoteProcedures) { 285 Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote); 286 operation.ifPresent(op -> requestByType.put(op.getClass(), op)); 287 } 288 return requestByType; 289 } 290 291 protected <T extends RemoteOperation> List<T> fetchType( 292 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 293 return (List<T>)requestByType.removeAll(type); 294 } 295 296 // ============================================================================================ 297 // Timeout Helpers 298 // ============================================================================================ 299 private final class TimeoutExecutorThread extends Thread { 300 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 301 302 public TimeoutExecutorThread() { 303 super("ProcedureDispatcherTimeoutThread"); 304 } 305 306 @Override 307 public void run() { 308 while (running.get()) { 309 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 310 if (task == null || task == DelayedUtil.DELAYED_POISON) { 311 // the executor may be shutting down, and the task is just the shutdown request 312 continue; 313 } 314 if (task instanceof DelayedTask) { 315 threadPool.execute(((DelayedTask) task).getObject()); 316 } else { 317 ((BufferNode) task).dispatch(); 318 } 319 } 320 } 321 322 public void add(final DelayedWithTimeout delayed) { 323 queue.add(delayed); 324 } 325 326 public void remove(final DelayedWithTimeout delayed) { 327 queue.remove(delayed); 328 } 329 330 public void sendStopSignal() { 331 queue.add(DelayedUtil.DELAYED_POISON); 332 } 333 334 public void awaitTermination() { 335 try { 336 final long startTime = EnvironmentEdgeManager.currentTime(); 337 for (int i = 0; isAlive(); ++i) { 338 sendStopSignal(); 339 join(250); 340 if (i > 0 && (i % 8) == 0) { 341 LOG.warn("Waiting termination of thread " + getName() + ", " + 342 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 343 } 344 } 345 } catch (InterruptedException e) { 346 LOG.warn(getName() + " join wait got interrupted", e); 347 } 348 } 349 } 350 351 // ============================================================================================ 352 // Internals Helpers 353 // ============================================================================================ 354 355 /** 356 * Node that contains a set of RemoteProcedures 357 */ 358 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 359 implements RemoteNode<TEnv, TRemote> { 360 private Set<RemoteProcedure> operations; 361 private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); 362 363 protected BufferNode(final TRemote key) { 364 super(key, 0); 365 } 366 367 @Override 368 public TRemote getKey() { 369 return getObject(); 370 } 371 372 @Override 373 public synchronized void add(final RemoteProcedure operation) { 374 if (this.operations == null) { 375 this.operations = new HashSet<>(); 376 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 377 timeoutExecutor.add(this); 378 } 379 this.operations.add(operation); 380 if (this.operations.size() > queueMaxSize) { 381 timeoutExecutor.remove(this); 382 dispatch(); 383 } 384 } 385 386 @Override 387 public synchronized void dispatch() { 388 if (operations != null) { 389 remoteDispatch(getKey(), operations); 390 operations.stream().filter(operation -> operation.storeInDispatchedQueue()) 391 .forEach(operation -> dispatchedOperations.add(operation)); 392 this.operations = null; 393 } 394 } 395 396 public synchronized void abortOperationsInQueue() { 397 if (operations != null) { 398 abortPendingOperations(getKey(), operations); 399 this.operations = null; 400 } 401 abortPendingOperations(getKey(), dispatchedOperations); 402 this.dispatchedOperations.clear(); 403 } 404 405 public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){ 406 this.dispatchedOperations.remove(remoteProcedure); 407 } 408 409 @Override 410 public String toString() { 411 return super.toString() + ", operations=" + this.operations; 412 } 413 } 414 415 /** 416 * Delayed object that holds a FutureTask. 417 * <p/> 418 * used to submit something later to the thread-pool. 419 */ 420 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 421 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 422 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 423 } 424 }; 425}