001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Optional; 026import java.util.Set; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.DelayQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.atomic.AtomicBoolean; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 035import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 036import org.apache.hadoop.hbase.procedure2.util.StringUtils; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 044import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 045 046/** 047 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit count 048 * threshold. Creates its own threadpool to run RPCs with timeout. 049 * <ul> 050 * <li>Each server queue has a dispatch buffer</li> 051 * <li>Once the dispatch buffer reaches a threshold-size/time we send 052 * <li> 053 * </ul> 054 * <p> 055 * Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, call {@link #stop()}. 056 */ 057@InterfaceAudience.Private 058public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 059 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 060 061 public static final String THREAD_POOL_SIZE_CONF_KEY = 062 "hbase.procedure.remote.dispatcher.threadpool.size"; 063 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 064 065 public static final String DISPATCH_DELAY_CONF_KEY = 066 "hbase.procedure.remote.dispatcher.delay.msec"; 067 private static final int DEFAULT_DISPATCH_DELAY = 150; 068 069 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 070 "hbase.procedure.remote.dispatcher.max.queue.size"; 071 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 072 073 private final AtomicBoolean running = new AtomicBoolean(false); 074 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 075 new ConcurrentHashMap<TRemote, BufferNode>(); 076 077 private final int operationDelay; 078 private final int queueMaxSize; 079 private final int corePoolSize; 080 081 private TimeoutExecutorThread timeoutExecutor; 082 private ThreadPoolExecutor threadPool; 083 084 protected RemoteProcedureDispatcher(Configuration conf) { 085 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 086 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 087 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 088 } 089 090 public boolean start() { 091 if (running.getAndSet(true)) { 092 LOG.warn("Already running"); 093 return false; 094 } 095 096 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " 097 + "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 098 099 // Create the timeout executor 100 timeoutExecutor = new TimeoutExecutorThread(); 101 timeoutExecutor.start(); 102 103 // Create the thread pool that will execute RPCs 104 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 105 new ThreadFactoryBuilder().setNameFormat(this.getClass().getSimpleName() + "-pool-%d") 106 .setDaemon(true).setUncaughtExceptionHandler(getUncaughtExceptionHandler()).build()); 107 return true; 108 } 109 110 protected void setTimeoutExecutorUncaughtExceptionHandler(UncaughtExceptionHandler eh) { 111 timeoutExecutor.setUncaughtExceptionHandler(eh); 112 } 113 114 public boolean stop() { 115 if (!running.getAndSet(false)) { 116 return false; 117 } 118 119 LOG.info("Stopping procedure remote dispatcher"); 120 121 // send stop signals 122 timeoutExecutor.sendStopSignal(); 123 threadPool.shutdownNow(); 124 return true; 125 } 126 127 public void join() { 128 assert !running.get() : "expected not running"; 129 130 // wait the timeout executor 131 timeoutExecutor.awaitTermination(); 132 timeoutExecutor = null; 133 134 // wait for the thread pool to terminate 135 threadPool.shutdownNow(); 136 try { 137 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 138 LOG.warn("Waiting for thread-pool to terminate"); 139 } 140 } catch (InterruptedException e) { 141 LOG.warn("Interrupted while waiting for thread-pool termination", e); 142 } 143 } 144 145 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 146 147 // ============================================================================================ 148 // Node Helpers 149 // ============================================================================================ 150 /** 151 * Add a node that will be able to execute remote procedures 152 * @param key the node identifier 153 */ 154 public void addNode(final TRemote key) { 155 assert key != null : "Tried to add a node with a null key"; 156 nodeMap.computeIfAbsent(key, k -> new BufferNode(k)); 157 } 158 159 /** 160 * Add a remote rpc. 161 * @param key the node identifier 162 */ 163 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 164 throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException { 165 if (key == null) { 166 throw new NullTargetServerDispatchException(rp.toString()); 167 } 168 BufferNode node = nodeMap.get(key); 169 if (node == null) { 170 // If null here, it means node has been removed because it crashed. This happens when server 171 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 172 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 173 } 174 node.add(rp); 175 // Check our node still in the map; could have been removed by #removeNode. 176 if (!nodeMap.containsValue(node)) { 177 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 178 } 179 } 180 181 public void removeCompletedOperation(final TRemote key, RemoteProcedure rp) { 182 BufferNode node = nodeMap.get(key); 183 if (node == null) { 184 LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", 185 key); 186 return; 187 } 188 node.operationCompleted(rp); 189 } 190 191 /** 192 * Remove a remote node 193 * @param key the node identifier 194 */ 195 public boolean removeNode(final TRemote key) { 196 final BufferNode node = nodeMap.remove(key); 197 if (node == null) { 198 return false; 199 } 200 201 node.abortOperationsInQueue(); 202 return true; 203 } 204 205 // ============================================================================================ 206 // Task Helpers 207 // ============================================================================================ 208 protected final void submitTask(Runnable task) { 209 threadPool.execute(task); 210 } 211 212 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 213 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 214 } 215 216 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 217 218 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 219 220 /** 221 * Data structure with reference to remote operation. 222 */ 223 public static abstract class RemoteOperation { 224 private final RemoteProcedure remoteProcedure; 225 // active time of the master that sent this request, used for fencing 226 private final long initiatingMasterActiveTime; 227 228 protected RemoteOperation(final RemoteProcedure remoteProcedure, 229 long initiatingMasterActiveTime) { 230 this.remoteProcedure = remoteProcedure; 231 this.initiatingMasterActiveTime = initiatingMasterActiveTime; 232 } 233 234 public RemoteProcedure getRemoteProcedure() { 235 return remoteProcedure; 236 } 237 238 public long getInitiatingMasterActiveTime() { 239 return initiatingMasterActiveTime; 240 } 241 } 242 243 /** 244 * Remote procedure reference. 245 */ 246 public interface RemoteProcedure<TEnv, TRemote> { 247 /** 248 * For building the remote operation. May be empty if no need to send remote call. Usually, this 249 * means the RemoteProcedure has been finished already. This is possible, as we may have already 250 * sent the procedure to RS but then the rpc connection is broken so the executeProcedures call 251 * fails, but the RS does receive the procedure and execute it and then report back, before we 252 * retry again. 253 */ 254 Optional<RemoteOperation> remoteCallBuild(TEnv env, TRemote remote); 255 256 /** 257 * Called when the executeProcedure call is failed. 258 */ 259 void remoteCallFailed(TEnv env, TRemote remote, IOException exception); 260 261 /** 262 * Called when RS tells the remote procedure is succeeded through the 263 * {@code reportProcedureDone} method. 264 */ 265 void remoteOperationCompleted(TEnv env); 266 267 /** 268 * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} 269 * method. 270 */ 271 void remoteOperationFailed(TEnv env, RemoteProcedureException error); 272 273 /** 274 * Whether store this remote procedure in dispatched queue only OpenRegionProcedure and 275 * CloseRegionProcedure return false since they are not fully controlled by dispatcher 276 */ 277 default boolean storeInDispatchedQueue() { 278 return true; 279 } 280 } 281 282 /** 283 * Account of what procedures are running on remote node. 284 */ 285 public interface RemoteNode<TEnv, TRemote> { 286 TRemote getKey(); 287 288 void add(RemoteProcedure<TEnv, TRemote> operation); 289 290 void dispatch(); 291 } 292 293 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 294 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 295 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 296 for (RemoteProcedure proc : remoteProcedures) { 297 Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote); 298 operation.ifPresent(op -> requestByType.put(op.getClass(), op)); 299 } 300 return requestByType; 301 } 302 303 protected <T extends RemoteOperation> List<T> fetchType( 304 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 305 return (List<T>) requestByType.removeAll(type); 306 } 307 308 @RestrictedApi(explanation = "Should only be called in tests", link = "", 309 allowedOnPath = ".*/src/test/.*") 310 public boolean hasNode(TRemote key) { 311 return nodeMap.containsKey(key); 312 } 313 314 // ============================================================================================ 315 // Timeout Helpers 316 // ============================================================================================ 317 private final class TimeoutExecutorThread extends Thread { 318 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 319 320 public TimeoutExecutorThread() { 321 super("ProcedureDispatcherTimeoutThread"); 322 } 323 324 @Override 325 public void run() { 326 while (running.get()) { 327 final DelayedWithTimeout task = 328 DelayedUtil.takeWithoutInterrupt(queue, 20, TimeUnit.SECONDS); 329 if (task == null || task == DelayedUtil.DELAYED_POISON) { 330 if (task == null && queue.size() > 0) { 331 LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty when timed waiting" 332 + " elapsed. If this is repeated consistently, it means no element is getting expired" 333 + " from the queue and it might freeze the system. Queue: {}", queue); 334 } 335 // the executor may be shutting down, and the task is just the shutdown request 336 continue; 337 } 338 if (task instanceof DelayedTask) { 339 threadPool.execute(((DelayedTask) task).getObject()); 340 } else { 341 ((BufferNode) task).dispatch(); 342 } 343 } 344 } 345 346 public void add(final DelayedWithTimeout delayed) { 347 queue.add(delayed); 348 } 349 350 public void remove(final DelayedWithTimeout delayed) { 351 queue.remove(delayed); 352 } 353 354 public void sendStopSignal() { 355 queue.add(DelayedUtil.DELAYED_POISON); 356 } 357 358 public void awaitTermination() { 359 try { 360 final long startTime = EnvironmentEdgeManager.currentTime(); 361 for (int i = 0; isAlive(); ++i) { 362 sendStopSignal(); 363 join(250); 364 if (i > 0 && (i % 8) == 0) { 365 LOG.warn("Waiting termination of thread " + getName() + ", " 366 + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 367 } 368 } 369 } catch (InterruptedException e) { 370 LOG.warn(getName() + " join wait got interrupted", e); 371 } 372 } 373 } 374 375 // ============================================================================================ 376 // Internals Helpers 377 // ============================================================================================ 378 379 /** 380 * Node that contains a set of RemoteProcedures 381 */ 382 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 383 implements RemoteNode<TEnv, TRemote> { 384 private Set<RemoteProcedure> operations; 385 private final Set<RemoteProcedure> dispatchedOperations = new HashSet<>(); 386 387 protected BufferNode(final TRemote key) { 388 super(key, 0); 389 } 390 391 @Override 392 public TRemote getKey() { 393 return getObject(); 394 } 395 396 @Override 397 public synchronized void add(final RemoteProcedure operation) { 398 if (this.operations == null) { 399 this.operations = new HashSet<>(); 400 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 401 timeoutExecutor.add(this); 402 } 403 this.operations.add(operation); 404 if (this.operations.size() > queueMaxSize) { 405 timeoutExecutor.remove(this); 406 dispatch(); 407 } 408 } 409 410 @Override 411 public synchronized void dispatch() { 412 if (operations != null) { 413 remoteDispatch(getKey(), operations); 414 operations.stream().filter(operation -> operation.storeInDispatchedQueue()) 415 .forEach(operation -> dispatchedOperations.add(operation)); 416 this.operations = null; 417 } 418 } 419 420 public synchronized void abortOperationsInQueue() { 421 if (operations != null) { 422 abortPendingOperations(getKey(), operations); 423 this.operations = null; 424 } 425 abortPendingOperations(getKey(), dispatchedOperations); 426 this.dispatchedOperations.clear(); 427 } 428 429 public synchronized void operationCompleted(final RemoteProcedure remoteProcedure) { 430 this.dispatchedOperations.remove(remoteProcedure); 431 } 432 433 @Override 434 public String toString() { 435 return super.toString() + ", operations=" + this.operations; 436 } 437 } 438 439 /** 440 * Delayed object that holds a FutureTask. 441 * <p/> 442 * used to submit something later to the thread-pool. 443 */ 444 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 445 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 446 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 447 } 448 } 449}