001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.RejectedExecutionException; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hadoop.hbase.DaemonThreadFactory; 036import org.apache.hadoop.hbase.errorhandling.ForeignException; 037import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 038 039import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; 040 041/** 042 * This is the master side of a distributed complex procedure execution. 043 * <p> 044 * The {@link Procedure} is generic and subclassing or customization shouldn't be 045 * necessary -- any customization should happen just in {@link Subprocedure}s. 046 */ 047@InterfaceAudience.Private 048public class ProcedureCoordinator { 049 private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class); 050 051 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; 052 final static long TIMEOUT_MILLIS_DEFAULT = 60000; 053 final static long WAKE_MILLIS_DEFAULT = 500; 054 055 private final ProcedureCoordinatorRpcs rpcs; 056 private final ExecutorService pool; 057 private final long wakeTimeMillis; 058 private final long timeoutMillis; 059 060 // Running procedure table. Maps procedure name to running procedure reference 061 private final ConcurrentMap<String, Procedure> procedures = 062 new MapMaker().concurrencyLevel(4).weakValues().makeMap(); 063 064 /** 065 * Create and start a ProcedureCoordinator. 066 * 067 * The rpc object registers the ProcedureCoordinator and starts any threads in this 068 * constructor. 069 * 070 * @param rpcs 071 * @param pool Used for executing procedures. 072 */ 073 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { 074 this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT); 075 } 076 077 /** 078 * Create and start a ProcedureCoordinator. 079 * 080 * The rpc object registers the ProcedureCoordinator and starts any threads in 081 * this constructor. 082 * 083 * @param rpcs 084 * @param pool Used for executing procedures. 085 * @param timeoutMillis 086 */ 087 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, 088 long timeoutMillis, long wakeTimeMillis) { 089 this.timeoutMillis = timeoutMillis; 090 this.wakeTimeMillis = wakeTimeMillis; 091 this.rpcs = rpcs; 092 this.pool = pool; 093 this.rpcs.start(this); 094 } 095 096 /** 097 * Default thread pool for the procedure 098 * 099 * @param coordName 100 * @param opThreads the maximum number of threads to allow in the pool 101 */ 102 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) { 103 return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT); 104 } 105 106 /** 107 * Default thread pool for the procedure 108 * 109 * @param coordName 110 * @param opThreads the maximum number of threads to allow in the pool 111 * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks 112 */ 113 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, 114 long keepAliveMillis) { 115 return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, 116 new SynchronousQueue<>(), 117 new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool")); 118 } 119 120 /** 121 * Shutdown the thread pools and release rpc resources 122 * @throws IOException 123 */ 124 public void close() throws IOException { 125 // have to use shutdown now to break any latch waiting 126 pool.shutdownNow(); 127 rpcs.close(); 128 } 129 130 /** 131 * Submit an procedure to kick off its dependent subprocedures. 132 * @param proc Procedure to execute 133 * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the 134 * procedure or any subprocedures could not be started. Failure could be due to 135 * submitting a procedure multiple times (or one with the same name), or some sort 136 * of IO problem. On errors, the procedure's monitor holds a reference to the exception 137 * that caused the failure. 138 */ 139 boolean submitProcedure(Procedure proc) { 140 // if the submitted procedure was null, then we don't want to run it 141 if (proc == null) { 142 return false; 143 } 144 String procName = proc.getName(); 145 146 // make sure we aren't already running a procedure of that name 147 Procedure oldProc = procedures.get(procName); 148 if (oldProc != null) { 149 // procedures are always eventually completed on both successful and failed execution 150 try { 151 if (!oldProc.isCompleted()) { 152 LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); 153 return false; 154 } else { 155 LOG.debug("Procedure " + procName 156 + " was in running list but was completed. Accepting new attempt."); 157 if (!procedures.remove(procName, oldProc)) { 158 LOG.warn("Procedure " + procName 159 + " has been resubmitted by another thread. Rejecting this request."); 160 return false; 161 } 162 } 163 } catch (ForeignException e) { 164 LOG.debug("Procedure " + procName 165 + " was in running list but has exception. Accepting new attempt."); 166 if (!procedures.remove(procName, oldProc)) { 167 LOG.warn("Procedure " + procName 168 + " has been resubmitted by another thread. Rejecting this request."); 169 return false; 170 } 171 } 172 } 173 174 // kick off the procedure's execution in a separate thread 175 try { 176 if (this.procedures.putIfAbsent(procName, proc) == null) { 177 LOG.debug("Submitting procedure " + procName); 178 this.pool.submit(proc); 179 return true; 180 } else { 181 LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt."); 182 return false; 183 } 184 } catch (RejectedExecutionException e) { 185 LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e); 186 // Remove the procedure from the list since is not started 187 this.procedures.remove(procName, proc); 188 // the thread pool is full and we can't run the procedure 189 proc.receive(new ForeignException(procName, e)); 190 } 191 return false; 192 } 193 194 /** 195 * The connection to the rest of the procedure group (members and coordinator) has been 196 * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other 197 * members since we cannot reach them anymore. 198 * @param message description of the error 199 * @param cause the actual cause of the failure 200 */ 201 void rpcConnectionFailure(final String message, final IOException cause) { 202 Collection<Procedure> toNotify = procedures.values(); 203 204 boolean isTraceEnabled = LOG.isTraceEnabled(); 205 LOG.debug("received connection failure: " + message, cause); 206 for (Procedure proc : toNotify) { 207 if (proc == null) { 208 continue; 209 } 210 // notify the elements, if they aren't null 211 if (isTraceEnabled) { 212 LOG.trace("connection failure - notify procedure: " + proc.getName()); 213 } 214 proc.receive(new ForeignException(proc.getName(), cause)); 215 } 216 } 217 218 /** 219 * Abort the procedure with the given name 220 * @param procName name of the procedure to abort 221 * @param reason serialized information about the abort 222 */ 223 public void abortProcedure(String procName, ForeignException reason) { 224 LOG.debug("abort procedure " + procName, reason); 225 // if we know about the Procedure, notify it 226 Procedure proc = procedures.get(procName); 227 if (proc == null) { 228 return; 229 } 230 proc.receive(reason); 231 } 232 233 /** 234 * Exposed for hooking with unit tests. 235 * @param procName 236 * @param procArgs 237 * @param expectedMembers 238 * @return the newly created procedure 239 */ 240 Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 241 List<String> expectedMembers) { 242 // build the procedure 243 return new Procedure(this, fed, wakeTimeMillis, timeoutMillis, 244 procName, procArgs, expectedMembers); 245 } 246 247 /** 248 * Kick off the named procedure 249 * Currently only one procedure with the same type and name is allowed to run at a time. 250 * @param procName name of the procedure to start 251 * @param procArgs arguments for the procedure 252 * @param expectedMembers expected members to start 253 * @return handle to the running procedure, if it was started correctly, 254 * <tt>null</tt> otherwise. 255 * Null could be due to submitting a procedure multiple times 256 * (or one with the same name), or runtime exception. 257 * Check the procedure's monitor that holds a reference to the exception 258 * that caused the failure. 259 */ 260 public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 261 List<String> expectedMembers) { 262 Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers); 263 if (!this.submitProcedure(proc)) { 264 LOG.error("Failed to submit procedure '" + procName + "'"); 265 return null; 266 } 267 return proc; 268 } 269 270 /** 271 * Notification that the procedure had the specified member acquired its part of the barrier 272 * via {@link Subprocedure#acquireBarrier()}. 273 * @param procName name of the procedure that acquired 274 * @param member name of the member that acquired 275 */ 276 void memberAcquiredBarrier(String procName, final String member) { 277 Procedure proc = procedures.get(procName); 278 if (proc == null) { 279 LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'"); 280 return; 281 } 282 if (LOG.isTraceEnabled()) { 283 LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'"); 284 } 285 proc.barrierAcquiredByMember(member); 286 } 287 288 /** 289 * Notification that the procedure had another member finished executing its in-barrier subproc 290 * via {@link Subprocedure#insideBarrier()}. 291 * @param procName name of the subprocedure that finished 292 * @param member name of the member that executed and released its barrier 293 * @param dataFromMember the data that the member returned along with the notification 294 */ 295 void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) { 296 Procedure proc = procedures.get(procName); 297 if (proc == null) { 298 LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); 299 return; 300 } 301 if (LOG.isTraceEnabled()) { 302 LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'"); 303 } 304 proc.barrierReleasedByMember(member, dataFromMember); 305 } 306 307 /** 308 * @return the rpcs implementation for all current procedures 309 */ 310 ProcedureCoordinatorRpcs getRpcs() { 311 return rpcs; 312 } 313 314 /** 315 * Returns the procedure. This Procedure is a live instance so should not be modified but can 316 * be inspected. 317 * @param name Name of the procedure 318 * @return Procedure or null if not present any more 319 */ 320 public Procedure getProcedure(String name) { 321 return procedures.get(name); 322 } 323 324 /** 325 * @return Return set of all procedure names. 326 */ 327 public Set<String> getProcedureNames() { 328 return new HashSet<>(procedures.keySet()); 329 } 330}