001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.DelayQueue; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; 033import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; 034import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; 035import org.apache.hadoop.hbase.procedure2.util.StringUtils; 036import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 037import org.apache.hadoop.hbase.util.Threads; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 043 044/** 045 * A procedure dispatcher that aggregates and sends after elapsed time or after we hit 046 * count threshold. Creates its own threadpool to run RPCs with timeout. 047 * <ul> 048 * <li>Each server queue has a dispatch buffer</li> 049 * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> 050 * </ul> 051 * <p>Call {@link #start()} and then {@link #submitTask(Runnable)}. When done, 052 * call {@link #stop()}. 053 */ 054@InterfaceAudience.Private 055public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { 056 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class); 057 058 public static final String THREAD_POOL_SIZE_CONF_KEY = 059 "hbase.procedure.remote.dispatcher.threadpool.size"; 060 private static final int DEFAULT_THREAD_POOL_SIZE = 128; 061 062 public static final String DISPATCH_DELAY_CONF_KEY = 063 "hbase.procedure.remote.dispatcher.delay.msec"; 064 private static final int DEFAULT_DISPATCH_DELAY = 150; 065 066 public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = 067 "hbase.procedure.remote.dispatcher.max.queue.size"; 068 private static final int DEFAULT_MAX_QUEUE_SIZE = 32; 069 070 private final AtomicBoolean running = new AtomicBoolean(false); 071 private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = 072 new ConcurrentHashMap<TRemote, BufferNode>(); 073 074 private final int operationDelay; 075 private final int queueMaxSize; 076 private final int corePoolSize; 077 078 private TimeoutExecutorThread timeoutExecutor; 079 private ThreadPoolExecutor threadPool; 080 081 protected RemoteProcedureDispatcher(Configuration conf) { 082 this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); 083 this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); 084 this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); 085 } 086 087 public boolean start() { 088 if (running.getAndSet(true)) { 089 LOG.warn("Already running"); 090 return false; 091 } 092 093 LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, " + 094 "operationDelay={}", this.corePoolSize, this.queueMaxSize, this.operationDelay); 095 096 // Create the timeout executor 097 timeoutExecutor = new TimeoutExecutorThread(); 098 timeoutExecutor.start(); 099 100 // Create the thread pool that will execute RPCs 101 threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, 102 Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), 103 getUncaughtExceptionHandler())); 104 return true; 105 } 106 107 public boolean stop() { 108 if (!running.getAndSet(false)) { 109 return false; 110 } 111 112 LOG.info("Stopping procedure remote dispatcher"); 113 114 // send stop signals 115 timeoutExecutor.sendStopSignal(); 116 threadPool.shutdownNow(); 117 return true; 118 } 119 120 public void join() { 121 assert !running.get() : "expected not running"; 122 123 // wait the timeout executor 124 timeoutExecutor.awaitTermination(); 125 timeoutExecutor = null; 126 127 // wait for the thread pool to terminate 128 threadPool.shutdownNow(); 129 try { 130 while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { 131 LOG.warn("Waiting for thread-pool to terminate"); 132 } 133 } catch (InterruptedException e) { 134 LOG.warn("Interrupted while waiting for thread-pool termination", e); 135 } 136 } 137 138 protected abstract UncaughtExceptionHandler getUncaughtExceptionHandler(); 139 140 // ============================================================================================ 141 // Node Helpers 142 // ============================================================================================ 143 /** 144 * Add a node that will be able to execute remote procedures 145 * @param key the node identifier 146 */ 147 public void addNode(final TRemote key) { 148 assert key != null: "Tried to add a node with a null key"; 149 final BufferNode newNode = new BufferNode(key); 150 nodeMap.putIfAbsent(key, newNode); 151 } 152 153 /** 154 * Add a remote rpc. 155 * @param key the node identifier 156 */ 157 public void addOperationToNode(final TRemote key, RemoteProcedure rp) 158 throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException { 159 if (key == null) { 160 throw new NullTargetServerDispatchException(rp.toString()); 161 } 162 BufferNode node = nodeMap.get(key); 163 if (node == null) { 164 // If null here, it means node has been removed because it crashed. This happens when server 165 // is expired in ServerManager. ServerCrashProcedure may or may not have run. 166 throw new NoServerDispatchException(key.toString() + "; " + rp.toString()); 167 } 168 node.add(rp); 169 // Check our node still in the map; could have been removed by #removeNode. 170 if (!nodeMap.containsValue(node)) { 171 throw new NoNodeDispatchException(key.toString() + "; " + rp.toString()); 172 } 173 } 174 175 /** 176 * Remove a remote node 177 * @param key the node identifier 178 */ 179 public boolean removeNode(final TRemote key) { 180 final BufferNode node = nodeMap.remove(key); 181 if (node == null) return false; 182 node.abortOperationsInQueue(); 183 return true; 184 } 185 186 // ============================================================================================ 187 // Task Helpers 188 // ============================================================================================ 189 protected final void submitTask(Runnable task) { 190 threadPool.execute(task); 191 } 192 193 protected final void submitTask(Runnable task, long delay, TimeUnit unit) { 194 timeoutExecutor.add(new DelayedTask(task, delay, unit)); 195 } 196 197 protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); 198 protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); 199 200 /** 201 * Data structure with reference to remote operation. 202 */ 203 public static abstract class RemoteOperation { 204 private final RemoteProcedure remoteProcedure; 205 206 protected RemoteOperation(final RemoteProcedure remoteProcedure) { 207 this.remoteProcedure = remoteProcedure; 208 } 209 210 public RemoteProcedure getRemoteProcedure() { 211 return remoteProcedure; 212 } 213 } 214 215 /** 216 * Remote procedure reference. 217 * @param <TEnv> 218 * @param <TRemote> 219 */ 220 public interface RemoteProcedure<TEnv, TRemote> { 221 RemoteOperation remoteCallBuild(TEnv env, TRemote remote); 222 void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response); 223 boolean remoteCallFailed(TEnv env, TRemote remote, IOException exception); 224 } 225 226 /** 227 * Account of what procedures are running on remote node. 228 */ 229 public interface RemoteNode<TEnv, TRemote> { 230 TRemote getKey(); 231 232 void add(RemoteProcedure<TEnv, TRemote> operation); 233 234 void dispatch(); 235 } 236 237 protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, 238 final TRemote remote, final Set<RemoteProcedure> remoteProcedures) { 239 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); 240 for (RemoteProcedure proc : remoteProcedures) { 241 RemoteOperation operation = proc.remoteCallBuild(env, remote); 242 requestByType.put(operation.getClass(), operation); 243 } 244 return requestByType; 245 } 246 247 protected <T extends RemoteOperation> List<T> fetchType( 248 final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { 249 return (List<T>)requestByType.removeAll(type); 250 } 251 252 // ============================================================================================ 253 // Timeout Helpers 254 // ============================================================================================ 255 private final class TimeoutExecutorThread extends Thread { 256 private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); 257 258 public TimeoutExecutorThread() { 259 super("ProcedureDispatcherTimeoutThread"); 260 } 261 262 @Override 263 public void run() { 264 while (running.get()) { 265 final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); 266 if (task == null || task == DelayedUtil.DELAYED_POISON) { 267 // the executor may be shutting down, and the task is just the shutdown request 268 continue; 269 } 270 if (task instanceof DelayedTask) { 271 threadPool.execute(((DelayedTask) task).getObject()); 272 } else { 273 ((BufferNode) task).dispatch(); 274 } 275 } 276 } 277 278 public void add(final DelayedWithTimeout delayed) { 279 queue.add(delayed); 280 } 281 282 public void remove(final DelayedWithTimeout delayed) { 283 queue.remove(delayed); 284 } 285 286 public void sendStopSignal() { 287 queue.add(DelayedUtil.DELAYED_POISON); 288 } 289 290 public void awaitTermination() { 291 try { 292 final long startTime = EnvironmentEdgeManager.currentTime(); 293 for (int i = 0; isAlive(); ++i) { 294 sendStopSignal(); 295 join(250); 296 if (i > 0 && (i % 8) == 0) { 297 LOG.warn("Waiting termination of thread " + getName() + ", " + 298 StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); 299 } 300 } 301 } catch (InterruptedException e) { 302 LOG.warn(getName() + " join wait got interrupted", e); 303 } 304 } 305 } 306 307 // ============================================================================================ 308 // Internals Helpers 309 // ============================================================================================ 310 311 /** 312 * Node that contains a set of RemoteProcedures 313 */ 314 protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> 315 implements RemoteNode<TEnv, TRemote> { 316 private Set<RemoteProcedure> operations; 317 318 protected BufferNode(final TRemote key) { 319 super(key, 0); 320 } 321 322 @Override 323 public TRemote getKey() { 324 return getObject(); 325 } 326 327 @Override 328 public synchronized void add(final RemoteProcedure operation) { 329 if (this.operations == null) { 330 this.operations = new HashSet<>(); 331 setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); 332 timeoutExecutor.add(this); 333 } 334 this.operations.add(operation); 335 if (this.operations.size() > queueMaxSize) { 336 timeoutExecutor.remove(this); 337 dispatch(); 338 } 339 } 340 341 @Override 342 public synchronized void dispatch() { 343 if (operations != null) { 344 remoteDispatch(getKey(), operations); 345 this.operations = null; 346 } 347 } 348 349 public synchronized void abortOperationsInQueue() { 350 if (operations != null) { 351 abortPendingOperations(getKey(), operations); 352 this.operations = null; 353 } 354 } 355 356 @Override 357 public String toString() { 358 return super.toString() + ", operations=" + this.operations; 359 } 360 } 361 362 /** 363 * Delayed object that holds a FutureTask. 364 * <p/> 365 * used to submit something later to the thread-pool. 366 */ 367 private static final class DelayedTask extends DelayedContainerWithTimestamp<Runnable> { 368 public DelayedTask(Runnable task, long delay, TimeUnit unit) { 369 super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); 370 } 371 }; 372}