001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ThreadPoolExecutor; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.errorhandling.ForeignException; 034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 035import org.apache.hadoop.hbase.regionserver.RegionServerServices; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.apache.zookeeper.KeeperException; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044public class SimpleRSProcedureManager extends RegionServerProcedureManager { 045 046 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class); 047 048 private RegionServerServices rss; 049 private ProcedureMemberRpcs memberRpcs; 050 private ProcedureMember member; 051 052 @Override 053 public void initialize(RegionServerServices rss) throws KeeperException { 054 this.rss = rss; 055 ZKWatcher zkw = rss.getZooKeeper(); 056 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature()); 057 058 ThreadPoolExecutor pool = 059 ProcedureMember.defaultPool(rss.getServerName().toString(), 1); 060 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder()); 061 LOG.info("Initialized: " + rss.getServerName().toString()); 062 } 063 064 @Override 065 public void start() { 066 this.memberRpcs.start(rss.getServerName().toString(), member); 067 LOG.info("Started."); 068 } 069 070 @Override 071 public void stop(boolean force) throws IOException { 072 LOG.info("stop: " + force); 073 try { 074 this.member.close(); 075 } finally { 076 this.memberRpcs.close(); 077 } 078 } 079 080 @Override 081 public String getProcedureSignature() { 082 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE; 083 } 084 085 /** 086 * If in a running state, creates the specified subprocedure for handling a procedure. 087 * @return Subprocedure to submit to the ProcedureMember. 088 */ 089 public Subprocedure buildSubprocedure(String name) { 090 091 // don't run a procedure if the parent is stop(ping) 092 if (rss.isStopping() || rss.isStopped()) { 093 throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName() 094 + ", because stopping/stopped!"); 095 } 096 097 LOG.info("Attempting to run a procedure."); 098 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); 099 Configuration conf = rss.getConfiguration(); 100 101 SimpleSubprocedurePool taskManager = 102 new SimpleSubprocedurePool(rss.getServerName().toString(), conf); 103 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name); 104 } 105 106 /** 107 * Build the actual procedure runner that will do all the 'hard' work 108 */ 109 public class SimleSubprocedureBuilder implements SubprocedureFactory { 110 111 @Override 112 public Subprocedure buildSubprocedure(String name, byte[] data) { 113 LOG.info("Building procedure: " + name); 114 return SimpleRSProcedureManager.this.buildSubprocedure(name); 115 } 116 } 117 118 public static class SimpleSubprocedurePool implements Closeable, Abortable { 119 120 private final ExecutorCompletionService<Void> taskPool; 121 private final ExecutorService executor; 122 private volatile boolean aborted; 123 private final List<Future<Void>> futures = new ArrayList<>(); 124 private final String name; 125 126 public SimpleSubprocedurePool(String name, Configuration conf) { 127 this.name = name; 128 executor = Executors.newSingleThreadExecutor( 129 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d") 130 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 131 taskPool = new ExecutorCompletionService<>(executor); 132 } 133 134 /** 135 * Submit a task to the pool. 136 */ 137 public void submitTask(final Callable<Void> task) { 138 Future<Void> f = this.taskPool.submit(task); 139 futures.add(f); 140 } 141 142 /** 143 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} 144 * 145 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 146 * @throws ForeignException 147 */ 148 public boolean waitForOutstandingTasks() throws ForeignException { 149 LOG.debug("Waiting for procedure to finish."); 150 151 try { 152 for (Future<Void> f: futures) { 153 f.get(); 154 } 155 return true; 156 } catch (InterruptedException e) { 157 if (aborted) throw new ForeignException( 158 "Interrupted and found to be aborted while waiting for tasks!", e); 159 Thread.currentThread().interrupt(); 160 } catch (ExecutionException e) { 161 if (e.getCause() instanceof ForeignException) { 162 throw (ForeignException) e.getCause(); 163 } 164 throw new ForeignException(name, e.getCause()); 165 } finally { 166 // close off remaining tasks 167 for (Future<Void> f: futures) { 168 if (!f.isDone()) { 169 f.cancel(true); 170 } 171 } 172 } 173 return false; 174 } 175 176 /** 177 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly 178 * finish 179 */ 180 @Override 181 public void close() { 182 executor.shutdown(); 183 } 184 185 @Override 186 public void abort(String why, Throwable e) { 187 if (this.aborted) return; 188 189 this.aborted = true; 190 LOG.warn("Aborting because: " + why, e); 191 this.executor.shutdownNow(); 192 } 193 194 @Override 195 public boolean isAborted() { 196 return this.aborted; 197 } 198 } 199 200 public class SimpleSubprocedure extends Subprocedure { 201 private final RegionServerServices rss; 202 private final SimpleSubprocedurePool taskManager; 203 204 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member, 205 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) { 206 super(member, name, errorListener, 500, 60000); 207 LOG.info("Constructing a SimpleSubprocedure."); 208 this.rss = rss; 209 this.taskManager = taskManager; 210 } 211 212 /** 213 * Callable task. 214 * TODO. We don't need a thread pool to execute roll log. This can be simplified 215 * with no use of subprocedurepool. 216 */ 217 class RSSimpleTask implements Callable<Void> { 218 RSSimpleTask() {} 219 220 @Override 221 public Void call() throws Exception { 222 LOG.info("Execute subprocedure on " + rss.getServerName().toString()); 223 return null; 224 } 225 226 } 227 228 private void execute() throws ForeignException { 229 230 monitor.rethrowException(); 231 232 // running a task (e.g., roll log, flush table) on region server 233 taskManager.submitTask(new RSSimpleTask()); 234 monitor.rethrowException(); 235 236 // wait for everything to complete. 237 taskManager.waitForOutstandingTasks(); 238 monitor.rethrowException(); 239 240 } 241 242 @Override 243 public void acquireBarrier() throws ForeignException { 244 // do nothing, executing in inside barrier step. 245 } 246 247 /** 248 * do a log roll. 249 */ 250 @Override 251 public byte[] insideBarrier() throws ForeignException { 252 execute(); 253 return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes(); 254 } 255 256 /** 257 * Cancel threads if they haven't finished. 258 */ 259 @Override 260 public void cleanup(Exception e) { 261 taskManager.abort("Aborting simple subprocedure tasks due to error", e); 262 } 263 } 264 265}