001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure; 019 020import java.io.Closeable; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ThreadPoolExecutor; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.errorhandling.ForeignException; 034import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 035import org.apache.hadoop.hbase.regionserver.RegionServerServices; 036import org.apache.hadoop.hbase.util.Threads; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.apache.zookeeper.KeeperException; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 043 044public class SimpleRSProcedureManager extends RegionServerProcedureManager { 045 046 private static final Logger LOG = LoggerFactory.getLogger(SimpleRSProcedureManager.class); 047 048 private RegionServerServices rss; 049 private ProcedureMemberRpcs memberRpcs; 050 private ProcedureMember member; 051 052 @Override 053 public void initialize(RegionServerServices rss) throws KeeperException { 054 this.rss = rss; 055 ZKWatcher zkw = rss.getZooKeeper(); 056 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, getProcedureSignature()); 057 058 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 1); 059 this.member = new ProcedureMember(memberRpcs, pool, new SimleSubprocedureBuilder()); 060 LOG.info("Initialized: " + rss.getServerName().toString()); 061 } 062 063 @Override 064 public void start() { 065 this.memberRpcs.start(rss.getServerName().toString(), member); 066 LOG.info("Started."); 067 } 068 069 @Override 070 public void stop(boolean force) throws IOException { 071 LOG.info("stop: " + force); 072 try { 073 this.member.close(); 074 } finally { 075 this.memberRpcs.close(); 076 } 077 } 078 079 @Override 080 public String getProcedureSignature() { 081 return SimpleMasterProcedureManager.SIMPLE_SIGNATURE; 082 } 083 084 /** 085 * If in a running state, creates the specified subprocedure for handling a procedure. 086 * @return Subprocedure to submit to the ProcedureMember. 087 */ 088 public Subprocedure buildSubprocedure(String name) { 089 090 // don't run a procedure if the parent is stop(ping) 091 if (rss.isStopping() || rss.isStopped()) { 092 throw new IllegalStateException( 093 "Can't start procedure on RS: " + rss.getServerName() + ", because stopping/stopped!"); 094 } 095 096 LOG.info("Attempting to run a procedure."); 097 ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher(); 098 Configuration conf = rss.getConfiguration(); 099 100 SimpleSubprocedurePool taskManager = 101 new SimpleSubprocedurePool(rss.getServerName().toString(), conf); 102 return new SimpleSubprocedure(rss, member, errorDispatcher, taskManager, name); 103 } 104 105 /** 106 * Build the actual procedure runner that will do all the 'hard' work 107 */ 108 public class SimleSubprocedureBuilder implements SubprocedureFactory { 109 110 @Override 111 public Subprocedure buildSubprocedure(String name, byte[] data) { 112 LOG.info("Building procedure: " + name); 113 return SimpleRSProcedureManager.this.buildSubprocedure(name); 114 } 115 } 116 117 public static class SimpleSubprocedurePool implements Closeable, Abortable { 118 119 private final ExecutorCompletionService<Void> taskPool; 120 private final ExecutorService executor; 121 private volatile boolean aborted; 122 private final List<Future<Void>> futures = new ArrayList<>(); 123 private final String name; 124 125 public SimpleSubprocedurePool(String name, Configuration conf) { 126 this.name = name; 127 executor = Executors.newSingleThreadExecutor( 128 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-procedure-pool-%d") 129 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 130 taskPool = new ExecutorCompletionService<>(executor); 131 } 132 133 /** 134 * Submit a task to the pool. 135 */ 136 public void submitTask(final Callable<Void> task) { 137 Future<Void> f = this.taskPool.submit(task); 138 futures.add(f); 139 } 140 141 /** 142 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)} 143 * @return <tt>true</tt> on success, <tt>false</tt> otherwise n 144 */ 145 public boolean waitForOutstandingTasks() throws ForeignException { 146 LOG.debug("Waiting for procedure to finish."); 147 148 try { 149 for (Future<Void> f : futures) { 150 f.get(); 151 } 152 return true; 153 } catch (InterruptedException e) { 154 if (aborted) 155 throw new ForeignException("Interrupted and found to be aborted while waiting for tasks!", 156 e); 157 Thread.currentThread().interrupt(); 158 } catch (ExecutionException e) { 159 if (e.getCause() instanceof ForeignException) { 160 throw (ForeignException) e.getCause(); 161 } 162 throw new ForeignException(name, e.getCause()); 163 } finally { 164 // close off remaining tasks 165 for (Future<Void> f : futures) { 166 if (!f.isDone()) { 167 f.cancel(true); 168 } 169 } 170 } 171 return false; 172 } 173 174 /** 175 * Attempt to cleanly shutdown any running tasks - allows currently running tasks to cleanly 176 * finish 177 */ 178 @Override 179 public void close() { 180 executor.shutdown(); 181 } 182 183 @Override 184 public void abort(String why, Throwable e) { 185 if (this.aborted) return; 186 187 this.aborted = true; 188 LOG.warn("Aborting because: " + why, e); 189 this.executor.shutdownNow(); 190 } 191 192 @Override 193 public boolean isAborted() { 194 return this.aborted; 195 } 196 } 197 198 public class SimpleSubprocedure extends Subprocedure { 199 private final RegionServerServices rss; 200 private final SimpleSubprocedurePool taskManager; 201 202 public SimpleSubprocedure(RegionServerServices rss, ProcedureMember member, 203 ForeignExceptionDispatcher errorListener, SimpleSubprocedurePool taskManager, String name) { 204 super(member, name, errorListener, 500, 60000); 205 LOG.info("Constructing a SimpleSubprocedure."); 206 this.rss = rss; 207 this.taskManager = taskManager; 208 } 209 210 /** 211 * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified 212 * with no use of subprocedurepool. 213 */ 214 class RSSimpleTask implements Callable<Void> { 215 RSSimpleTask() { 216 } 217 218 @Override 219 public Void call() throws Exception { 220 LOG.info("Execute subprocedure on " + rss.getServerName().toString()); 221 return null; 222 } 223 224 } 225 226 private void execute() throws ForeignException { 227 228 monitor.rethrowException(); 229 230 // running a task (e.g., roll log, flush table) on region server 231 taskManager.submitTask(new RSSimpleTask()); 232 monitor.rethrowException(); 233 234 // wait for everything to complete. 235 taskManager.waitForOutstandingTasks(); 236 monitor.rethrowException(); 237 238 } 239 240 @Override 241 public void acquireBarrier() throws ForeignException { 242 // do nothing, executing in inside barrier step. 243 } 244 245 /** 246 * do a log roll. 247 */ 248 @Override 249 public byte[] insideBarrier() throws ForeignException { 250 execute(); 251 return SimpleMasterProcedureManager.SIMPLE_DATA.getBytes(); 252 } 253 254 /** 255 * Cancel threads if they haven't finished. 256 */ 257 @Override 258 public void cleanup(Exception e) { 259 taskManager.abort("Aborting simple subprocedure tasks due to error", e); 260 } 261 } 262 263}