001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.Future; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.DaemonThreadFactory; 035import org.apache.hadoop.hbase.regionserver.RegionServerServices; 036import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 037import org.apache.hadoop.hbase.errorhandling.ForeignException; 038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 039import org.apache.zookeeper.KeeperException; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043public class SimpleRSProcedureManager extends RegionServerProcedureManager { 044 045 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class); 046 047 private RegionServerServices rss; 048 private ProcedureMemberRpcs memberRpcs; 049 private ProcedureMember member; 050 051 @Override 052 public void initialize(RegionServerServices rss) throws KeeperException { 053 this.rss = rss; 054 ZKWatcher zkw = rss.getZooKeeper(); 055 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature()); 056 057 ThreadPoolExecutor pool = 058 ProcedureMember.defaultPool(rss.getServerName().toString(), 1); 059 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder()); 060 LOG.info("Initialized: " + rss.getServerName().toString()); 061 } 062 063 @Override 064 public void start() { 065 this.memberRpcs.start(rss.getServerName().toString(), member); 066 LOG.info("Started."); 067 } 068 069 @Override 070 public void stop(boolean force) throws IOException { 071 LOG.info("stop: " + force); 072 try { 073 this.member.close(); 074 } finally { 075 this.memberRpcs.close(); 076 } 077 } 078 079 @Override 080 public String getProcedureSignature() { 081 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE; 082 } 083 084 /** 085 * If in a running state, creates the specified subprocedure for handling a procedure. 086 * @return Subprocedure to submit to the ProcedureMemeber. 087 */ 088 public Subprocedure buildSubprocedure(String name) { 089 090 // don't run a procedure if the parent is stop(ping) 091 if (rss.isStopping() || rss.isStopped()) { 092 throw new IllegalStateException("Can't start procedure on RS: " + rss.getServerName() 093 + ", because stopping/stopped!"); 094 } 095 096 LOG.info("Attempting to run a procedure."); 097 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); 098 Configuration conf = rss.getConfiguration(); 099 100 SimpleSubprocedurePool taskManager = 101 new SimpleSubprocedurePool(rss.getServerName().toString(), conf); 102 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name); 103 } 104 105 /** 106 * Build the actual procedure runner that will do all the 'hard' work 107 */ 108 public class SimleSubprocedureBuilder implements SubprocedureFactory { 109 110 @Override 111 public Subprocedure buildSubprocedure(String name, byte[] data) { 112 LOG.info("Building procedure: " + name); 113 return SimpleRSProcedureManager.this.buildSubprocedure(name); 114 } 115 } 116 117 public class SimpleSubprocedurePool implements Closeable, Abortable { 118 119 private final ExecutorCompletionService<Void> taskPool; 120 private final ThreadPoolExecutor executor; 121 private volatile boolean aborted; 122 private final List<Future<Void>> futures = new ArrayList<>(); 123 private final String name; 124 125 public SimpleSubprocedurePool(String name, Configuration conf) { 126 this.name = name; 127 executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS, 128 new LinkedBlockingQueue<>(), 129 new DaemonThreadFactory("rs(" + name + ")-procedure-pool")); 130 taskPool = new ExecutorCompletionService<>(executor); 131 } 132 133 /** 134 * Submit a task to the pool. 135 */ 136 public void submitTask(final Callable<Void> task) { 137 Future<Void> f = this.taskPool.submit(task); 138 futures.add(f); 139 } 140 141 /** 142 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} 143 * 144 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 145 * @throws ForeignException 146 */ 147 public boolean waitForOutstandingTasks() throws ForeignException { 148 LOG.debug("Waiting for procedure to finish."); 149 150 try { 151 for (Future<Void> f: futures) { 152 f.get(); 153 } 154 return true; 155 } catch (InterruptedException e) { 156 if (aborted) throw new ForeignException( 157 "Interrupted and found to be aborted while waiting for tasks!", e); 158 Thread.currentThread().interrupt(); 159 } catch (ExecutionException e) { 160 if (e.getCause() instanceof ForeignException) { 161 throw (ForeignException) e.getCause(); 162 } 163 throw new ForeignException(name, e.getCause()); 164 } finally { 165 // close off remaining tasks 166 for (Future<Void> f: futures) { 167 if (!f.isDone()) { 168 f.cancel(true); 169 } 170 } 171 } 172 return false; 173 } 174 175 /** 176 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly 177 * finish 178 */ 179 @Override 180 public void close() { 181 executor.shutdown(); 182 } 183 184 @Override 185 public void abort(String why, Throwable e) { 186 if (this.aborted) return; 187 188 this.aborted = true; 189 LOG.warn("Aborting because: " + why, e); 190 this.executor.shutdownNow(); 191 } 192 193 @Override 194 public boolean isAborted() { 195 return this.aborted; 196 } 197 } 198 199 public class SimpleSubprocedure extends Subprocedure { 200 private final RegionServerServices rss; 201 private final SimpleSubprocedurePool taskManager; 202 203 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member, 204 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) { 205 super(member, name, errorListener, 500, 60000); 206 LOG.info("Constructing a SimpleSubprocedure."); 207 this.rss = rss; 208 this.taskManager = taskManager; 209 } 210 211 /** 212 * Callable task. 213 * TODO. We don't need a thread pool to execute roll log. This can be simplified 214 * with no use of subprocedurepool. 215 */ 216 class RSSimpleTask implements Callable<Void> { 217 RSSimpleTask() {} 218 219 @Override 220 public Void call() throws Exception { 221 LOG.info("Execute subprocedure on " + rss.getServerName().toString()); 222 return null; 223 } 224 225 } 226 227 private void execute() throws ForeignException { 228 229 monitor.rethrowException(); 230 231 // running a task (e.g., roll log, flush table) on region server 232 taskManager.submitTask(new RSSimpleTask()); 233 monitor.rethrowException(); 234 235 // wait for everything to complete. 236 taskManager.waitForOutstandingTasks(); 237 monitor.rethrowException(); 238 239 } 240 241 @Override 242 public void acquireBarrier() throws ForeignException { 243 // do nothing, executing in inside barrier step. 244 } 245 246 /** 247 * do a log roll. 248 */ 249 @Override 250 public byte[] insideBarrier() throws ForeignException { 251 execute(); 252 return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes(); 253 } 254 255 /** 256 * Cancel threads if they haven't finished. 257 */ 258 @Override 259 public void cleanup(Exception e) { 260 taskManager.abort("Aborting simple subprocedure tasks due to error", e); 261 } 262 } 263 264}