001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.Future; 028import java.util.concurrent.LinkedBlockingQueue; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.yetus.audience.InterfaceAudience; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.DaemonThreadFactory; 036import org.apache.hadoop.hbase.DroppedSnapshotException; 037import org.apache.hadoop.hbase.HBaseInterfaceAudience; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.errorhandling.ForeignException; 040import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 041import org.apache.hadoop.hbase.procedure.ProcedureMember; 042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 044import org.apache.hadoop.hbase.procedure.Subprocedure; 045import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 051import org.apache.zookeeper.KeeperException; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * This manager class handles flushing of the regions for table on a {@link HRegionServer}. 057 */ 058@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 059public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { 060 private static final Logger LOG = 061 LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class); 062 063 private static final String CONCURENT_FLUSH_TASKS_KEY = 064 "hbase.flush.procedure.region.concurrentTasks"; 065 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; 066 067 public static final String FLUSH_REQUEST_THREADS_KEY = 068 "hbase.flush.procedure.region.pool.threads"; 069 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; 070 071 public static final String FLUSH_TIMEOUT_MILLIS_KEY = 072 "hbase.flush.procedure.region.timeout"; 073 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; 074 075 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = 076 "hbase.flush.procedure.region.wakefrequency"; 077 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; 078 079 private RegionServerServices rss; 080 private ProcedureMemberRpcs memberRpcs; 081 private ProcedureMember member; 082 083 /** 084 * Exposed for testing. 085 * @param conf HBase configuration. 086 * @param server region server. 087 * @param memberRpc use specified memberRpc instance 088 * @param procMember use specified ProcedureMember 089 */ 090 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, 091 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 092 this.rss = server; 093 this.memberRpcs = memberRpc; 094 this.member = procMember; 095 } 096 097 public RegionServerFlushTableProcedureManager() {} 098 099 /** 100 * Start accepting flush table requests. 101 */ 102 @Override 103 public void start() { 104 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); 105 this.memberRpcs.start(rss.getServerName().toString(), member); 106 } 107 108 /** 109 * Close <tt>this</tt> and all running tasks 110 * @param force forcefully stop all running tasks 111 * @throws IOException 112 */ 113 @Override 114 public void stop(boolean force) throws IOException { 115 String mode = force ? "abruptly" : "gracefully"; 116 LOG.info("Stopping region server flush procedure manager " + mode + "."); 117 118 try { 119 this.member.close(); 120 } finally { 121 this.memberRpcs.close(); 122 } 123 } 124 125 /** 126 * If in a running state, creates the specified subprocedure to flush table regions. 127 * 128 * Because this gets the local list of regions to flush and not the set the master had, 129 * there is a possibility of a race where regions may be missed. 130 * 131 * @param table 132 * @return Subprocedure to submit to the ProcedureMemeber. 133 */ 134 public Subprocedure buildSubprocedure(String table) { 135 136 // don't run the subprocedure if the parent is stop(ping) 137 if (rss.isStopping() || rss.isStopped()) { 138 throw new IllegalStateException("Can't start flush region subprocedure on RS: " 139 + rss.getServerName() + ", because stopping/stopped!"); 140 } 141 142 // check to see if this server is hosting any regions for the table 143 List<HRegion> involvedRegions; 144 try { 145 involvedRegions = getRegionsToFlush(table); 146 } catch (IOException e1) { 147 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); 148 } 149 150 // We need to run the subprocedure even if we have no relevant regions. The coordinator 151 // expects participation in the procedure and without sending message the master procedure 152 // will hang and fail. 153 154 LOG.debug("Launching subprocedure to flush regions for " + table); 155 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); 156 Configuration conf = rss.getConfiguration(); 157 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, 158 FLUSH_TIMEOUT_MILLIS_DEFAULT); 159 long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, 160 FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); 161 162 FlushTableSubprocedurePool taskManager = 163 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); 164 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, 165 timeoutMillis, involvedRegions, table, taskManager); 166 } 167 168 /** 169 * Get the list of regions to flush for the table on this server 170 * 171 * It is possible that if a region moves somewhere between the calls 172 * we'll miss the region. 173 * 174 * @param table 175 * @return the list of online regions. Empty list is returned if no regions. 176 * @throws IOException 177 */ 178 private List<HRegion> getRegionsToFlush(String table) throws IOException { 179 return (List<HRegion>) rss.getRegions(TableName.valueOf(table)); 180 } 181 182 public class FlushTableSubprocedureBuilder implements SubprocedureFactory { 183 184 @Override 185 public Subprocedure buildSubprocedure(String name, byte[] data) { 186 // The name of the procedure instance from the master is the table name. 187 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name); 188 } 189 190 } 191 192 /** 193 * We use the FlushTableSubprocedurePool, a class specific thread pool instead of 194 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. 195 * 196 * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of 197 * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation 198 * failures. 199 */ 200 static class FlushTableSubprocedurePool { 201 private final Abortable abortable; 202 private final ExecutorCompletionService<Void> taskPool; 203 private final ThreadPoolExecutor executor; 204 private volatile boolean stopped; 205 private final List<Future<Void>> futures = new ArrayList<>(); 206 private final String name; 207 208 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { 209 this.abortable = abortable; 210 // configure the executor service 211 long keepAlive = conf.getLong( 212 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, 213 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); 214 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); 215 this.name = name; 216 executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS, 217 new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs(" 218 + name + ")-flush-proc-pool")); 219 executor.allowCoreThreadTimeOut(true); 220 taskPool = new ExecutorCompletionService<>(executor); 221 } 222 223 boolean hasTasks() { 224 return futures.size() != 0; 225 } 226 227 /** 228 * Submit a task to the pool. 229 * 230 * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. 231 */ 232 void submitTask(final Callable<Void> task) { 233 Future<Void> f = this.taskPool.submit(task); 234 futures.add(f); 235 } 236 237 /** 238 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 239 * This *must* be called after all tasks are submitted via submitTask. 240 * 241 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 242 * @throws InterruptedException 243 */ 244 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 245 LOG.debug("Waiting for local region flush to finish."); 246 247 int sz = futures.size(); 248 try { 249 // Using the completion service to process the futures. 250 for (int i = 0; i < sz; i++) { 251 Future<Void> f = taskPool.take(); 252 f.get(); 253 if (!futures.remove(f)) { 254 LOG.warn("unexpected future" + f); 255 } 256 LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks."); 257 } 258 LOG.debug("Completed " + sz + " local region flush tasks."); 259 return true; 260 } catch (InterruptedException e) { 261 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); 262 if (!stopped) { 263 Thread.currentThread().interrupt(); 264 throw new ForeignException("FlushSubprocedurePool", e); 265 } 266 // we are stopped so we can just exit. 267 } catch (ExecutionException e) { 268 Throwable cause = e.getCause(); 269 if (cause instanceof ForeignException) { 270 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); 271 throw (ForeignException)e.getCause(); 272 } else if (cause instanceof DroppedSnapshotException) { 273 // we have to abort the region server according to contract of flush 274 abortable.abort("Received DroppedSnapshotException, aborting", cause); 275 } 276 LOG.warn("Got Exception in FlushSubprocedurePool", e); 277 throw new ForeignException(name, e.getCause()); 278 } finally { 279 cancelTasks(); 280 } 281 return false; 282 } 283 284 /** 285 * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running 286 * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). 287 * @throws InterruptedException 288 */ 289 void cancelTasks() throws InterruptedException { 290 Collection<Future<Void>> tasks = futures; 291 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); 292 for (Future<Void> f: tasks) { 293 f.cancel(false); 294 } 295 296 // evict remaining tasks and futures from taskPool. 297 futures.clear(); 298 while (taskPool.poll() != null) {} 299 stop(); 300 } 301 302 /** 303 * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be 304 * interrupted (see HBASE-13877) 305 */ 306 void stop() { 307 if (this.stopped) return; 308 309 this.stopped = true; 310 this.executor.shutdown(); 311 } 312 } 313 314 /** 315 * Initialize this region server flush procedure manager 316 * Uses a zookeeper based member controller. 317 * @param rss region server 318 * @throws KeeperException if the zookeeper cannot be reached 319 */ 320 @Override 321 public void initialize(RegionServerServices rss) throws KeeperException { 322 this.rss = rss; 323 ZKWatcher zkw = rss.getZooKeeper(); 324 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, 325 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); 326 327 Configuration conf = rss.getConfiguration(); 328 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 329 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); 330 331 // create the actual flush table procedure member 332 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 333 opThreads, keepAlive); 334 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); 335 } 336 337 @Override 338 public String getProcedureSignature() { 339 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; 340 } 341 342}