001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.SynchronousQueue; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import org.apache.hadoop.hbase.errorhandling.ForeignException; 029import org.apache.hadoop.hbase.util.Threads; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import org.apache.hbase.thirdparty.com.google.common.collect.MapMaker; 035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 036 037/** 038 * Process to kick off and manage a running {@link Subprocedure} on a member. This is the 039 * specialized part of a {@link Procedure} that actually does procedure type-specific work 040 * and reports back to the coordinator as it completes each phase. 041 */ 042@InterfaceAudience.Private 043public class ProcedureMember implements Closeable { 044 private static final Logger LOG = LoggerFactory.getLogger(ProcedureMember.class); 045 046 final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000; 047 048 private final SubprocedureFactory builder; 049 private final ProcedureMemberRpcs rpcs; 050 051 private final ConcurrentMap<String,Subprocedure> subprocs = 052 new MapMaker().concurrencyLevel(4).weakValues().makeMap(); 053 private final ExecutorService pool; 054 055 /** 056 * Instantiate a new ProcedureMember. This is a slave that executes subprocedures. 057 * 058 * @param rpcs controller used to send notifications to the procedure coordinator 059 * @param pool thread pool to submit subprocedures 060 * @param factory class that creates instances of a subprocedure. 061 */ 062 public ProcedureMember(ProcedureMemberRpcs rpcs, ThreadPoolExecutor pool, 063 SubprocedureFactory factory) { 064 this.pool = pool; 065 this.rpcs = rpcs; 066 this.builder = factory; 067 } 068 069 /** 070 * Default thread pool for the procedure 071 * 072 * @param memberName 073 * @param procThreads the maximum number of threads to allow in the pool 074 */ 075 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) { 076 return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT); 077 } 078 079 /** 080 * Default thread pool for the procedure 081 * 082 * @param memberName 083 * @param procThreads the maximum number of threads to allow in the pool 084 * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for new tasks 085 */ 086 public static ThreadPoolExecutor defaultPool(String memberName, int procThreads, 087 long keepAliveMillis) { 088 return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS, 089 new SynchronousQueue<>(), 090 new ThreadFactoryBuilder().setNameFormat("member: '" + memberName + "' subprocedure-pool-%d") 091 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 092 } 093 094 /** 095 * Package exposed. Not for public use. 096 * 097 * @return reference to the Procedure member's rpcs object 098 */ 099 ProcedureMemberRpcs getRpcs() { 100 return rpcs; 101 } 102 103 104 /** 105 * This is separated from execution so that we can detect and handle the case where the 106 * subprocedure is invalid and inactionable due to bad info (like DISABLED snapshot type being 107 * sent here) 108 * @param opName 109 * @param data 110 * @return subprocedure 111 */ 112 public Subprocedure createSubprocedure(String opName, byte[] data) { 113 return builder.buildSubprocedure(opName, data); 114 } 115 116 /** 117 * Submit an subprocedure for execution. This starts the local acquire phase. 118 * @param subproc the subprocedure to execute. 119 * @return <tt>true</tt> if the subprocedure was started correctly, <tt>false</tt> if it 120 * could not be started. In the latter case, the subprocedure holds a reference to 121 * the exception that caused the failure. 122 */ 123 public boolean submitSubprocedure(Subprocedure subproc) { 124 // if the submitted subprocedure was null, bail. 125 if (subproc == null) { 126 LOG.warn("Submitted null subprocedure, nothing to run here."); 127 return false; 128 } 129 130 String procName = subproc.getName(); 131 if (procName == null || procName.length() == 0) { 132 LOG.error("Subproc name cannot be null or the empty string"); 133 return false; 134 } 135 136 // make sure we aren't already running an subprocedure of that name 137 Subprocedure rsub = subprocs.get(procName); 138 if (rsub != null) { 139 if (!rsub.isComplete()) { 140 LOG.error("Subproc '" + procName + "' is already running. Bailing out"); 141 return false; 142 } 143 LOG.warn("A completed old subproc " + procName + " is still present, removing"); 144 if (!subprocs.remove(procName, rsub)) { 145 LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out"); 146 return false; 147 } 148 } 149 150 LOG.debug("Submitting new Subprocedure:" + procName); 151 152 // kick off the subprocedure 153 try { 154 if (subprocs.putIfAbsent(procName, subproc) == null) { 155 this.pool.submit(subproc); 156 return true; 157 } else { 158 LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out"); 159 return false; 160 } 161 } catch (RejectedExecutionException e) { 162 subprocs.remove(procName, subproc); 163 164 // the thread pool is full and we can't run the subprocedure 165 String msg = "Subprocedure pool is full!"; 166 subproc.cancel(msg, e.getCause()); 167 } 168 169 LOG.error("Failed to start subprocedure '" + procName + "'"); 170 return false; 171 } 172 173 /** 174 * Notification that procedure coordinator has reached the global barrier 175 * @param procName name of the subprocedure that should start running the in-barrier phase 176 */ 177 public void receivedReachedGlobalBarrier(String procName) { 178 Subprocedure subproc = subprocs.get(procName); 179 if (subproc == null) { 180 LOG.warn("Unexpected reached globa barrier message for Sub-Procedure '" + procName + "'"); 181 return; 182 } 183 if (LOG.isTraceEnabled()) { 184 LOG.trace("reached global barrier message for Sub-Procedure '" + procName + "'"); 185 } 186 subproc.receiveReachedGlobalBarrier(); 187 } 188 189 /** 190 * Best effort attempt to close the threadpool via Thread.interrupt. 191 */ 192 @Override 193 public void close() throws IOException { 194 // have to use shutdown now to break any latch waiting 195 pool.shutdownNow(); 196 } 197 198 /** 199 * Shutdown the threadpool, and wait for upto timeoutMs millis before bailing 200 * @param timeoutMs timeout limit in millis 201 * @return true if successfully, false if bailed due to timeout. 202 * @throws InterruptedException 203 */ 204 boolean closeAndWait(long timeoutMs) throws InterruptedException { 205 pool.shutdown(); 206 return pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS); 207 } 208 209 /** 210 * The connection to the rest of the procedure group (member and coordinator) has been 211 * broken/lost/failed. This should fail any interested subprocedure, but not attempt to notify 212 * other members since we cannot reach them anymore. 213 * @param message description of the error 214 * @param cause the actual cause of the failure 215 * @param procName the name of the procedure we'd cancel due to the error. 216 */ 217 public void controllerConnectionFailure(final String message, final Throwable cause, 218 final String procName) { 219 LOG.error(message, cause); 220 if (procName == null) { 221 return; 222 } 223 Subprocedure toNotify = subprocs.get(procName); 224 if (toNotify != null) { 225 toNotify.cancel(message, cause); 226 } 227 } 228 229 /** 230 * Send abort to the specified procedure 231 * @param procName name of the procedure to about 232 * @param ee exception information about the abort 233 */ 234 public void receiveAbortProcedure(String procName, ForeignException ee) { 235 LOG.debug("Request received to abort procedure " + procName, ee); 236 // if we know about the procedure, notify it 237 Subprocedure sub = subprocs.get(procName); 238 if (sub == null) { 239 LOG.info("Received abort on procedure with no local subprocedure " + procName + 240 ", ignoring it.", ee); 241 return; // Procedure has already completed 242 } 243 String msg = "Propagating foreign exception to subprocedure " + sub.getName(); 244 LOG.error(msg, ee); 245 sub.cancel(msg, ee); 246 } 247}