001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.DelayQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 036import org.apache.hadoop.hbase.procedure2.util.StringUtils; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 045 046/** 047 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit 048 * count threshold. Creates its own threadpool to run RPCs with timeout. 049 * <ul> 050 * <li>Each server queue has a dispatch buffer</li> 051 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> 052 * </ul> 053 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, 054 * call {@link #stop()}. 055 */ 056@InterfaceAudience.Private 057public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 058 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 059 060 public static final String THREAD_POOL_SIZE_CONF_KEY = 061 "hbase.procedure.remote.dispatcher.threadpool.size"; 062 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 063 064 public static final String DISPATCH_DELAY_CONF_KEY = 065 "hbase.procedure.remote.dispatcher.delay.msec"; 066 private static final int DEFAULT_DISPATCH_DELAY = 150; 067 068 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 069 "hbase.procedure.remote.dispatcher.max.queue.size"; 070 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 071 072 private final AtomicBoolean running = new AtomicBoolean(false); 073 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 074 new ConcurrentHashMap<TRemote, BufferNode>(); 075 076 private final int operationDelay; 077 private final int queueMaxSize; 078 private final int corePoolSize; 079 080 private TimeoutExecutorThread timeoutExecutor; 081 private ThreadPoolExecutor threadPool; 082 083 protected RemoteProcedureDispatcher(Configuration conf) { 084 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 085 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 086 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 087 } 088 089 public boolean start() { 090 if (running.getAndSet(true)) { 091 LOG.warn("Already running"); 092 return false; 093 } 094 095 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " + 096 "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 097 098 // Create the timeout executor 099 timeoutExecutor = new TimeoutExecutorThread(); 100 timeoutExecutor.start(); 101 102 // Create the thread pool that will execute RPCs 103 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 104 new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d") 105 .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build()); 106 return true; 107 } 108 109 public boolean stop() { 110 if (!running.getAndSet(false)) { 111 return false; 112 } 113 114 LOG.info("Stopping procedure remote dispatcher"); 115 116 // send stop signals 117 timeoutExecutor.sendStopSignal(); 118 threadPool.shutdownNow(); 119 return true; 120 } 121 122 public void join() { 123 assert !running.get() : "expected not running"; 124 125 // wait the timeout executor 126 timeoutExecutor.awaitTermination(); 127 timeoutExecutor = null; 128 129 // wait for the thread pool to terminate 130 threadPool.shutdownNow(); 131 try { 132 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 133 LOG.warn("Waiting for thread-pool to terminate"); 134 } 135 } catch (InterruptedException e) { 136 LOG.warn("Interrupted while waiting for thread-pool termination", e); 137 } 138 } 139 140 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 141 142 // ============================================================================================ 143 // Node Helpers 144 // ============================================================================================ 145 /** 146 * Add a node that will be able to execute remote procedures 147 * @param key the node identifier 148 */ 149 public void addNode(final TRemote key) { 150 assert key != null: "Tried to add a node with a null key"; 151 final BufferNode newNode = new BufferNode(key); 152 nodeMap.putIfAbsent(key, newNode); 153 } 154 155 /** 156 * Add a remote rpc. 157 * @param key the node identifier 158 */ 159 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 160 throws NullTargetServerDispatchException, NoServerDispatchException, 161 NoNodeDispatchException { 162 if (key == null) { 163 throw new NullTargetServerDispatchException(rp.toString()); 164 } 165 BufferNode node = nodeMap.get(key); 166 if (node == null) { 167 // If null here, it means node has been removed because it crashed. This happens when server 168 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 169 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 170 } 171 node.add(rp); 172 // Check our node still in the map; could have been removed by #removeNode. 173 if (!nodeMap.containsValue(node)) { 174 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 175 } 176 } 177 178 public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { 179 BufferNode node = nodeMap.get(key); 180 if (node == null) { 181 LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", 182 key); 183 return; 184 } 185 node.operationCompleted(rp); 186 } 187 188 /** 189 * Remove a remote node 190 * @param key the node identifier 191 */ 192 public boolean removeNode(final TRemote key) { 193 final BufferNode node = nodeMap.remove(key); 194 if (node == null) { 195 return false; 196 } 197 198 node.abortOperationsInQueue(); 199 return true; 200 } 201 202 // ============================================================================================ 203 // Task Helpers 204 // ============================================================================================ 205 protected final void submitTask(Runnable task) { 206 threadPool.execute(task); 207 } 208 209 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 210 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 211 } 212 213 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 214 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 215 216 /** 217 * Data structure with reference to remote operation. 218 */ 219 public static abstract class RemoteOperation { 220 private final RemoteProcedure remoteProcedure; 221 222 protected RemoteOperation(final RemoteProcedure remoteProcedure) { 223 this.remoteProcedure = remoteProcedure; 224 } 225 226 public RemoteProcedure getRemoteProcedure() { 227 return remoteProcedure; 228 } 229 } 230 231 /** 232 * Remote procedure reference. 233 */ 234 public interface RemoteProcedure<TEnv, TRemote> { 235 /** 236 * For building the remote operation. 237 * May be empty if no need to send remote call. Usually, this means the RemoteProcedure has been 238 * finished already. This is possible, as we may have already sent the procedure to RS but then 239 * the rpc connection is broken so the executeProcedures call fails, but the RS does receive the 240 * procedure and execute it and then report back, before we retry again. 241 */ 242 Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote); 243 244 /** 245 * Called when the executeProcedure call is failed. 246 */ 247 void remoteCallFailed(TEnv env, TRemote remote, IOException exception); 248 249 /** 250 * Called when RS tells the remote procedure is succeeded through the 251 * {@code reportProcedureDone} method. 252 */ 253 void remoteOperationCompleted(TEnv env); 254 255 /** 256 * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} 257 * method. 258 */ 259 void remoteOperationFailed(TEnv env, RemoteProcedureException error); 260 261 /** 262 * Whether store this remote procedure in dispatched queue 263 * only OpenRegionProcedure and CloseRegionProcedure return false since they are 264 * not fully controlled by dispatcher 265 */ 266 default boolean storeInDispatchedQueue() { 267 return true; 268 } 269 } 270 271 /** 272 * Account of what procedures are running on remote node. 273 */ 274 public interface RemoteNode<TEnv, TRemote> { 275 TRemote getKey(); 276 277 void add(RemoteProcedure<TEnv, TRemote> operation); 278 279 void dispatch(); 280 } 281 282 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 283 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 284 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 285 for (RemoteProcedure proc : remoteProcedures) { 286 Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote); 287 operation.ifPresent(op -> requestByType.put(op.getClass(), op)); 288 } 289 return requestByType; 290 } 291 292 protected <T extends RemoteOperation> List<T> fetchType( 293 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 294 return (List<T>)requestByType.removeAll(type); 295 } 296 297 // ============================================================================================ 298 // Timeout Helpers 299 // ============================================================================================ 300 private final class TimeoutExecutorThread extends Thread { 301 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 302 303 public TimeoutExecutorThread() { 304 super("ProcedureDispatcherTimeoutThread"); 305 } 306 307 @Override 308 public void run() { 309 while (running.get()) { 310 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 311 if (task == null || task == DelayedUtil.DELAYED_POISON) { 312 // the executor may be shutting down, and the task is just the shutdown request 313 continue; 314 } 315 if (task instanceof DelayedTask) { 316 threadPool.execute(((DelayedTask) task).getObject()); 317 } else { 318 ((BufferNode) task).dispatch(); 319 } 320 } 321 } 322 323 public void add(final DelayedWithTimeout delayed) { 324 queue.add(delayed); 325 } 326 327 public void remove(final DelayedWithTimeout delayed) { 328 queue.remove(delayed); 329 } 330 331 public void sendStopSignal() { 332 queue.add(DelayedUtil.DELAYED_POISON); 333 } 334 335 public void awaitTermination() { 336 try { 337 final long startTime = EnvironmentEdgeManager.currentTime(); 338 for (int i = 0; isAlive(); ++i) { 339 sendStopSignal(); 340 join(250); 341 if (i > 0 && (i % 8) == 0) { 342 LOG.warn("Waiting termination of thread " + getName() + ", " + 343 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 344 } 345 } 346 } catch (InterruptedException e) { 347 LOG.warn(getName() + " join wait got interrupted", e); 348 } 349 } 350 } 351 352 // ============================================================================================ 353 // Internals Helpers 354 // ============================================================================================ 355 356 /** 357 * Node that contains a set of RemoteProcedures 358 */ 359 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 360 implements RemoteNode<TEnv, TRemote> { 361 private Set<RemoteProcedure> operations; 362 private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); 363 364 protected BufferNode(final TRemote key) { 365 super(key, 0); 366 } 367 368 @Override 369 public TRemote getKey() { 370 return getObject(); 371 } 372 373 @Override 374 public synchronized void add(final RemoteProcedure operation) { 375 if (this.operations == null) { 376 this.operations = new HashSet<>(); 377 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 378 timeoutExecutor.add(this); 379 } 380 this.operations.add(operation); 381 if (this.operations.size() > queueMaxSize) { 382 timeoutExecutor.remove(this); 383 dispatch(); 384 } 385 } 386 387 @Override 388 public synchronized void dispatch() { 389 if (operations != null) { 390 remoteDispatch(getKey(), operations); 391 operations.stream().filter(operation -> operation.storeInDispatchedQueue()) 392 .forEach(operation -> dispatchedOperations.add(operation)); 393 this.operations = null; 394 } 395 } 396 397 public synchronized void abortOperationsInQueue() { 398 if (operations != null) { 399 abortPendingOperations(getKey(), operations); 400 this.operations = null; 401 } 402 abortPendingOperations(getKey(), dispatchedOperations); 403 this.dispatchedOperations.clear(); 404 } 405 406 public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){ 407 this.dispatchedOperations.remove(remoteProcedure); 408 } 409 410 @Override 411 public String toString() { 412 return super.toString() + ", operations=" + this.operations; 413 } 414 } 415 416 /** 417 * Delayed object that holds a FutureTask. 418 * <p/> 419 * used to submit something later to the thread-pool. 420 */ 421 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 422 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 423 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 424 } 425 }; 426}