001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.IOException; 021import java.util.Collection; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentMap; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.RejectedExecutionException; 028import java.util.concurrent.SynchronousQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hadoop.hbase.DaemonThreadFactory; 037import org.apache.hadoop.hbase.errorhandling.ForeignException; 038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; 041 042/** 043 * This is the master side of a distributed complex procedure execution. 044 * <p> 045 * The {@link Procedure} is generic and subclassing or customization shouldn't be 046 * necessary -- any customization should happen just in {@link Subprocedure}s. 047 */ 048@InterfaceAudience.Private 049public class ProcedureCoordinator { 050 private static final Logger LOG = LoggerFactory.getLogger(ProcedureCoordinator.class); 051 052 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; 053 final static long TIMEOUT_MILLIS_DEFAULT = 60000; 054 final static long WAKE_MILLIS_DEFAULT = 500; 055 056 private final ProcedureCoordinatorRpcs rpcs; 057 private final ExecutorService pool; 058 private final long wakeTimeMillis; 059 private final long timeoutMillis; 060 061 // Running procedure table. Maps procedure name to running procedure reference 062 private final ConcurrentMap<String, Procedure> procedures = 063 new MapMaker().concurrencyLevel(4).weakValues().makeMap(); 064 065 /** 066 * Create and start a ProcedureCoordinator. 067 * 068 * The rpc object registers the ProcedureCoordinator and starts any threads in this 069 * constructor. 070 * 071 * @param pool Used for executing procedures. 072 */ 073 @VisibleForTesting // Only used in tests. SimpleMasterProcedureManager is a test class. 074 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) { 075 this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT); 076 } 077 078 /** 079 * Create and start a ProcedureCoordinator. 080 * 081 * The rpc object registers the ProcedureCoordinator and starts any threads in 082 * this constructor. 083 * 084 * @param pool Used for executing procedures. 085 */ 086 public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool, 087 long timeoutMillis, long wakeTimeMillis) { 088 this.timeoutMillis = timeoutMillis; 089 this.wakeTimeMillis = wakeTimeMillis; 090 this.rpcs = rpcs; 091 this.pool = pool; 092 this.rpcs.start(this); 093 } 094 095 /** 096 * Default thread pool for the procedure 097 * 098 * @param coordName 099 * @param opThreads the maximum number of threads to allow in the pool 100 */ 101 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) { 102 return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT); 103 } 104 105 /** 106 * Default thread pool for the procedure 107 * 108 * @param coordName 109 * @param opThreads the maximum number of threads to allow in the pool 110 * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks 111 */ 112 public static ThreadPoolExecutor defaultPool(String coordName, int opThreads, 113 long keepAliveMillis) { 114 return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS, 115 new SynchronousQueue<>(), 116 new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool")); 117 } 118 119 /** 120 * Shutdown the thread pools and release rpc resources 121 * @throws IOException 122 */ 123 public void close() throws IOException { 124 // have to use shutdown now to break any latch waiting 125 pool.shutdownNow(); 126 rpcs.close(); 127 } 128 129 /** 130 * Submit an procedure to kick off its dependent subprocedures. 131 * @param proc Procedure to execute 132 * @return <tt>true</tt> if the procedure was started correctly, <tt>false</tt> if the 133 * procedure or any subprocedures could not be started. Failure could be due to 134 * submitting a procedure multiple times (or one with the same name), or some sort 135 * of IO problem. On errors, the procedure's monitor holds a reference to the exception 136 * that caused the failure. 137 */ 138 boolean submitProcedure(Procedure proc) { 139 // if the submitted procedure was null, then we don't want to run it 140 if (proc == null) { 141 return false; 142 } 143 String procName = proc.getName(); 144 145 // make sure we aren't already running a procedure of that name 146 Procedure oldProc = procedures.get(procName); 147 if (oldProc != null) { 148 // procedures are always eventually completed on both successful and failed execution 149 try { 150 if (!oldProc.isCompleted()) { 151 LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); 152 return false; 153 } else { 154 LOG.debug("Procedure " + procName 155 + " was in running list but was completed. Accepting new attempt."); 156 if (!procedures.remove(procName, oldProc)) { 157 LOG.warn("Procedure " + procName 158 + " has been resubmitted by another thread. Rejecting this request."); 159 return false; 160 } 161 } 162 } catch (ForeignException e) { 163 LOG.debug("Procedure " + procName 164 + " was in running list but has exception. Accepting new attempt."); 165 if (!procedures.remove(procName, oldProc)) { 166 LOG.warn("Procedure " + procName 167 + " has been resubmitted by another thread. Rejecting this request."); 168 return false; 169 } 170 } 171 } 172 173 // kick off the procedure's execution in a separate thread 174 try { 175 if (this.procedures.putIfAbsent(procName, proc) == null) { 176 LOG.debug("Submitting procedure " + procName); 177 this.pool.submit(proc); 178 return true; 179 } else { 180 LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt."); 181 return false; 182 } 183 } catch (RejectedExecutionException e) { 184 LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e); 185 // Remove the procedure from the list since is not started 186 this.procedures.remove(procName, proc); 187 // the thread pool is full and we can't run the procedure 188 proc.receive(new ForeignException(procName, e)); 189 } 190 return false; 191 } 192 193 /** 194 * The connection to the rest of the procedure group (members and coordinator) has been 195 * broken/lost/failed. This should fail any interested procedures, but not attempt to notify other 196 * members since we cannot reach them anymore. 197 * @param message description of the error 198 * @param cause the actual cause of the failure 199 */ 200 void rpcConnectionFailure(final String message, final IOException cause) { 201 Collection<Procedure> toNotify = procedures.values(); 202 203 boolean isTraceEnabled = LOG.isTraceEnabled(); 204 LOG.debug("received connection failure: " + message, cause); 205 for (Procedure proc : toNotify) { 206 if (proc == null) { 207 continue; 208 } 209 // notify the elements, if they aren't null 210 if (isTraceEnabled) { 211 LOG.trace("connection failure - notify procedure: " + proc.getName()); 212 } 213 proc.receive(new ForeignException(proc.getName(), cause)); 214 } 215 } 216 217 /** 218 * Abort the procedure with the given name 219 * @param procName name of the procedure to abort 220 * @param reason serialized information about the abort 221 */ 222 public void abortProcedure(String procName, ForeignException reason) { 223 LOG.debug("abort procedure " + procName, reason); 224 // if we know about the Procedure, notify it 225 Procedure proc = procedures.get(procName); 226 if (proc == null) { 227 return; 228 } 229 proc.receive(reason); 230 } 231 232 /** 233 * Exposed for hooking with unit tests. 234 * @param procName 235 * @param procArgs 236 * @param expectedMembers 237 * @return the newly created procedure 238 */ 239 Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 240 List<String> expectedMembers) { 241 // build the procedure 242 return new Procedure(this, fed, wakeTimeMillis, timeoutMillis, 243 procName, procArgs, expectedMembers); 244 } 245 246 /** 247 * Kick off the named procedure 248 * Currently only one procedure with the same type and name is allowed to run at a time. 249 * @param procName name of the procedure to start 250 * @param procArgs arguments for the procedure 251 * @param expectedMembers expected members to start 252 * @return handle to the running procedure, if it was started correctly, 253 * <tt>null</tt> otherwise. 254 * Null could be due to submitting a procedure multiple times 255 * (or one with the same name), or runtime exception. 256 * Check the procedure's monitor that holds a reference to the exception 257 * that caused the failure. 258 */ 259 public Procedure startProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs, 260 List<String> expectedMembers) { 261 Procedure proc = createProcedure(fed, procName, procArgs, expectedMembers); 262 if (!this.submitProcedure(proc)) { 263 LOG.error("Failed to submit procedure '" + procName + "'"); 264 return null; 265 } 266 return proc; 267 } 268 269 /** 270 * Notification that the procedure had the specified member acquired its part of the barrier 271 * via {@link Subprocedure#acquireBarrier()}. 272 * @param procName name of the procedure that acquired 273 * @param member name of the member that acquired 274 */ 275 void memberAcquiredBarrier(String procName, final String member) { 276 Procedure proc = procedures.get(procName); 277 if (proc == null) { 278 LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'"); 279 return; 280 } 281 if (LOG.isTraceEnabled()) { 282 LOG.trace("Member '"+ member +"' acquired procedure '"+ procName +"'"); 283 } 284 proc.barrierAcquiredByMember(member); 285 } 286 287 /** 288 * Notification that the procedure had another member finished executing its in-barrier subproc 289 * via {@link Subprocedure#insideBarrier()}. 290 * @param procName name of the subprocedure that finished 291 * @param member name of the member that executed and released its barrier 292 * @param dataFromMember the data that the member returned along with the notification 293 */ 294 void memberFinishedBarrier(String procName, final String member, byte[] dataFromMember) { 295 Procedure proc = procedures.get(procName); 296 if (proc == null) { 297 LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); 298 return; 299 } 300 if (LOG.isTraceEnabled()) { 301 LOG.trace("Member '"+ member +"' released procedure '"+ procName +"'"); 302 } 303 proc.barrierReleasedByMember(member, dataFromMember); 304 } 305 306 /** 307 * @return the rpcs implementation for all current procedures 308 */ 309 ProcedureCoordinatorRpcs getRpcs() { 310 return rpcs; 311 } 312 313 /** 314 * Returns the procedure. This Procedure is a live instance so should not be modified but can 315 * be inspected. 316 * @param name Name of the procedure 317 * @return Procedure or null if not present any more 318 */ 319 public Procedure getProcedure(String name) { 320 return procedures.get(name); 321 } 322 323 /** 324 * @return Return set of all procedure names. 325 */ 326 public Set<String> getProcedureNames() { 327 return new HashSet<>(procedures.keySet()); 328 } 329}