001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.DelayQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 035import org.apache.hadoop.hbase.procedure2.util.StringUtils; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 043 044/** 045 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit 046 * count threshold. Creates its own threadpool to run RPCs with timeout. 047 * <ul> 048 * <li>Each server queue has a dispatch buffer</li> 049 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> 050 * </ul> 051 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, 052 * call {@link #stop()}. 053 */ 054@InterfaceAudience.Private 055public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 056 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 057 058 public static final String THREAD_POOL_SIZE_CONF_KEY = 059 "hbase.procedure.remote.dispatcher.threadpool.size"; 060 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 061 062 public static final String DISPATCH_DELAY_CONF_KEY = 063 "hbase.procedure.remote.dispatcher.delay.msec"; 064 private static final int DEFAULT_DISPATCH_DELAY = 150; 065 066 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 067 "hbase.procedure.remote.dispatcher.max.queue.size"; 068 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 069 070 private final AtomicBoolean running = new AtomicBoolean(false); 071 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 072 new ConcurrentHashMap<TRemote, BufferNode>(); 073 074 private final int operationDelay; 075 private final int queueMaxSize; 076 private final int corePoolSize; 077 078 private TimeoutExecutorThread timeoutExecutor; 079 private ThreadPoolExecutor threadPool; 080 081 protected RemoteProcedureDispatcher(Configuration conf) { 082 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 083 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 084 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 085 } 086 087 public boolean start() { 088 if (running.getAndSet(true)) { 089 LOG.warn("Already running"); 090 return false; 091 } 092 093 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " + 094 "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 095 096 // Create the timeout executor 097 timeoutExecutor = new TimeoutExecutorThread(); 098 timeoutExecutor.start(); 099 100 // Create the thread pool that will execute RPCs 101 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 102 Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), 103 getUncaughtExceptionHandler())); 104 return true; 105 } 106 107 public boolean stop() { 108 if (!running.getAndSet(false)) { 109 return false; 110 } 111 112 LOG.info("Stopping procedure remote dispatcher"); 113 114 // send stop signals 115 timeoutExecutor.sendStopSignal(); 116 threadPool.shutdownNow(); 117 return true; 118 } 119 120 public void join() { 121 assert !running.get() : "expected not running"; 122 123 // wait the timeout executor 124 timeoutExecutor.awaitTermination(); 125 timeoutExecutor = null; 126 127 // wait for the thread pool to terminate 128 threadPool.shutdownNow(); 129 try { 130 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 131 LOG.warn("Waiting for thread-pool to terminate"); 132 } 133 } catch (InterruptedException e) { 134 LOG.warn("Interrupted while waiting for thread-pool termination", e); 135 } 136 } 137 138 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 139 140 // ============================================================================================ 141 // Node Helpers 142 // ============================================================================================ 143 /** 144 * Add a node that will be able to execute remote procedures 145 * @param key the node identifier 146 */ 147 public void addNode(final TRemote key) { 148 assert key != null: "Tried to add a node with a null key"; 149 final BufferNode newNode = new BufferNode(key); 150 nodeMap.putIfAbsent(key, newNode); 151 } 152 153 /** 154 * Add a remote rpc. 155 * @param key the node identifier 156 */ 157 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 158 throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException { 159 if (key == null) { 160 throw new NullTargetServerDispatchException(rp.toString()); 161 } 162 BufferNode node = nodeMap.get(key); 163 if (node == null) { 164 // If null here, it means node has been removed because it crashed. This happens when server 165 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 166 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 167 } 168 node.add(rp); 169 // Check our node still in the map; could have been removed by #removeNode. 170 if (!nodeMap.containsValue(node)) { 171 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 172 } 173 } 174 175 public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { 176 BufferNode node = nodeMap.get(key); 177 if (node == null) { 178 LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", 179 key); 180 return; 181 } 182 node.operationCompleted(rp); 183 } 184 185 /** 186 * Remove a remote node 187 * @param key the node identifier 188 */ 189 public boolean removeNode(final TRemote key) { 190 final BufferNode node = nodeMap.remove(key); 191 if (node == null) return false; 192 node.abortOperationsInQueue(); 193 return true; 194 } 195 196 // ============================================================================================ 197 // Task Helpers 198 // ============================================================================================ 199 protected final void submitTask(Runnable task) { 200 threadPool.execute(task); 201 } 202 203 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 204 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 205 } 206 207 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 208 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 209 210 /** 211 * Data structure with reference to remote operation. 212 */ 213 public static abstract class RemoteOperation { 214 private final RemoteProcedure remoteProcedure; 215 216 protected RemoteOperation(final RemoteProcedure remoteProcedure) { 217 this.remoteProcedure = remoteProcedure; 218 } 219 220 public RemoteProcedure getRemoteProcedure() { 221 return remoteProcedure; 222 } 223 } 224 225 /** 226 * Remote procedure reference. 227 */ 228 public interface RemoteProcedure<TEnv, TRemote> { 229 /** 230 * For building the remote operation. 231 */ 232 RemoteOperation remoteCallBuild(TEnv env, TRemote remote); 233 234 /** 235 * Called when the executeProcedure call is failed. 236 */ 237 boolean remoteCallFailed(TEnv env, TRemote remote, IOException exception); 238 239 /** 240 * Called when RS tells the remote procedure is succeeded through the 241 * {@code reportProcedureDone} method. 242 */ 243 void remoteOperationCompleted(TEnv env); 244 245 /** 246 * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} 247 * method. 248 */ 249 void remoteOperationFailed(TEnv env, RemoteProcedureException error); 250 251 /** 252 * Whether store this remote procedure in dispatched queue 253 * only OpenRegionProcedure and CloseRegionProcedure return false since they are 254 * not fully controlled by dispatcher 255 */ 256 default boolean storeInDispatchedQueue() { 257 return true; 258 } 259 260 } 261 262 /** 263 * Account of what procedures are running on remote node. 264 */ 265 public interface RemoteNode<TEnv, TRemote> { 266 TRemote getKey(); 267 268 void add(RemoteProcedure<TEnv, TRemote> operation); 269 270 void dispatch(); 271 } 272 273 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 274 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 275 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 276 for (RemoteProcedure proc : remoteProcedures) { 277 RemoteOperation operation = proc.remoteCallBuild(env, remote); 278 requestByType.put(operation.getClass(), operation); 279 } 280 return requestByType; 281 } 282 283 protected <T extends RemoteOperation> List<T> fetchType( 284 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 285 return (List<T>)requestByType.removeAll(type); 286 } 287 288 // ============================================================================================ 289 // Timeout Helpers 290 // ============================================================================================ 291 private final class TimeoutExecutorThread extends Thread { 292 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 293 294 public TimeoutExecutorThread() { 295 super("ProcedureDispatcherTimeoutThread"); 296 } 297 298 @Override 299 public void run() { 300 while (running.get()) { 301 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 302 if (task == null || task == DelayedUtil.DELAYED_POISON) { 303 // the executor may be shutting down, and the task is just the shutdown request 304 continue; 305 } 306 if (task instanceof DelayedTask) { 307 threadPool.execute(((DelayedTask) task).getObject()); 308 } else { 309 ((BufferNode) task).dispatch(); 310 } 311 } 312 } 313 314 public void add(final DelayedWithTimeout delayed) { 315 queue.add(delayed); 316 } 317 318 public void remove(final DelayedWithTimeout delayed) { 319 queue.remove(delayed); 320 } 321 322 public void sendStopSignal() { 323 queue.add(DelayedUtil.DELAYED_POISON); 324 } 325 326 public void awaitTermination() { 327 try { 328 final long startTime = EnvironmentEdgeManager.currentTime(); 329 for (int i = 0; isAlive(); ++i) { 330 sendStopSignal(); 331 join(250); 332 if (i > 0 && (i % 8) == 0) { 333 LOG.warn("Waiting termination of thread " + getName() + ", " + 334 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 335 } 336 } 337 } catch (InterruptedException e) { 338 LOG.warn(getName() + " join wait got interrupted", e); 339 } 340 } 341 } 342 343 // ============================================================================================ 344 // Internals Helpers 345 // ============================================================================================ 346 347 /** 348 * Node that contains a set of RemoteProcedures 349 */ 350 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 351 implements RemoteNode<TEnv, TRemote> { 352 private Set<RemoteProcedure> operations; 353 private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); 354 355 protected BufferNode(final TRemote key) { 356 super(key, 0); 357 } 358 359 @Override 360 public TRemote getKey() { 361 return getObject(); 362 } 363 364 @Override 365 public synchronized void add(final RemoteProcedure operation) { 366 if (this.operations == null) { 367 this.operations = new HashSet<>(); 368 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 369 timeoutExecutor.add(this); 370 } 371 this.operations.add(operation); 372 if (this.operations.size() > queueMaxSize) { 373 timeoutExecutor.remove(this); 374 dispatch(); 375 } 376 } 377 378 @Override 379 public synchronized void dispatch() { 380 if (operations != null) { 381 remoteDispatch(getKey(), operations); 382 operations.stream().filter(operation -> operation.storeInDispatchedQueue()) 383 .forEach(operation -> dispatchedOperations.add(operation)); 384 this.operations = null; 385 } 386 } 387 388 public synchronized void abortOperationsInQueue() { 389 if (operations != null) { 390 abortPendingOperations(getKey(), operations); 391 this.operations = null; 392 } 393 abortPendingOperations(getKey(), dispatchedOperations); 394 this.dispatchedOperations.clear(); 395 } 396 397 public synchronized void operationCompleted(final RemoteProcedure remoteProcedure){ 398 this.dispatchedOperations.remove(remoteProcedure); 399 } 400 401 @Override 402 public String toString() { 403 return super.toString() + ", operations=" + this.operations; 404 } 405 } 406 407 /** 408 * Delayed object that holds a FutureTask. 409 * <p/> 410 * used to submit something later to the thread-pool. 411 */ 412 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 413 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 414 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 415 } 416 }; 417}