001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.Future; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.DroppedSnapshotException; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.errorhandling.ForeignException; 036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 037import org.apache.hadoop.hbase.procedure.ProcedureMember; 038import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 039import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 040import org.apache.hadoop.hbase.procedure.Subprocedure; 041import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 042import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.RegionServerServices; 046import org.apache.hadoop.hbase.util.Threads; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.zookeeper.KeeperException; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056 057/** 058 * This manager class handles flushing of the regions for table on a {@link HRegionServer}. 059 */ 060@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 061public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { 062 private static final Logger LOG = 063 LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class); 064 065 private static final String CONCURENT_FLUSH_TASKS_KEY = 066 "hbase.flush.procedure.region.concurrentTasks"; 067 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; 068 069 public static final String FLUSH_REQUEST_THREADS_KEY = 070 "hbase.flush.procedure.region.pool.threads"; 071 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; 072 073 public static final String FLUSH_TIMEOUT_MILLIS_KEY = 074 "hbase.flush.procedure.region.timeout"; 075 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; 076 077 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = 078 "hbase.flush.procedure.region.wakefrequency"; 079 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; 080 081 private RegionServerServices rss; 082 private ProcedureMemberRpcs memberRpcs; 083 private ProcedureMember member; 084 085 /** 086 * Exposed for testing. 087 * @param conf HBase configuration. 088 * @param server region server. 089 * @param memberRpc use specified memberRpc instance 090 * @param procMember use specified ProcedureMember 091 */ 092 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, 093 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 094 this.rss = server; 095 this.memberRpcs = memberRpc; 096 this.member = procMember; 097 } 098 099 public RegionServerFlushTableProcedureManager() {} 100 101 /** 102 * Start accepting flush table requests. 103 */ 104 @Override 105 public void start() { 106 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); 107 this.memberRpcs.start(rss.getServerName().toString(), member); 108 } 109 110 /** 111 * Close <tt>this</tt> and all running tasks 112 * @param force forcefully stop all running tasks 113 * @throws IOException 114 */ 115 @Override 116 public void stop(boolean force) throws IOException { 117 String mode = force ? "abruptly" : "gracefully"; 118 LOG.info("Stopping region server flush procedure manager " + mode + "."); 119 120 try { 121 this.member.close(); 122 } finally { 123 this.memberRpcs.close(); 124 } 125 } 126 127 /** 128 * If in a running state, creates the specified subprocedure to flush table regions. 129 * 130 * Because this gets the local list of regions to flush and not the set the master had, 131 * there is a possibility of a race where regions may be missed. 132 * 133 * @param table table to flush 134 * @param family column family within a table 135 * @return Subprocedure to submit to the ProcedureMember. 136 */ 137 public Subprocedure buildSubprocedure(String table, String family) { 138 139 // don't run the subprocedure if the parent is stop(ping) 140 if (rss.isStopping() || rss.isStopped()) { 141 throw new IllegalStateException("Can't start flush region subprocedure on RS: " 142 + rss.getServerName() + ", because stopping/stopped!"); 143 } 144 145 // check to see if this server is hosting any regions for the table 146 List<HRegion> involvedRegions; 147 try { 148 involvedRegions = getRegionsToFlush(table); 149 } catch (IOException e1) { 150 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); 151 } 152 153 // We need to run the subprocedure even if we have no relevant regions. The coordinator 154 // expects participation in the procedure and without sending message the master procedure 155 // will hang and fail. 156 157 LOG.debug("Launching subprocedure to flush regions for " + table); 158 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); 159 Configuration conf = rss.getConfiguration(); 160 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, 161 FLUSH_TIMEOUT_MILLIS_DEFAULT); 162 long wakeMillis = conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, 163 FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); 164 165 FlushTableSubprocedurePool taskManager = 166 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); 167 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, 168 timeoutMillis, involvedRegions, table, family, taskManager); 169 } 170 171 /** 172 * Get the list of regions to flush for the table on this server 173 * 174 * It is possible that if a region moves somewhere between the calls 175 * we'll miss the region. 176 * 177 * @param table 178 * @return the list of online regions. Empty list is returned if no regions. 179 * @throws IOException 180 */ 181 private List<HRegion> getRegionsToFlush(String table) throws IOException { 182 return (List<HRegion>) rss.getRegions(TableName.valueOf(table)); 183 } 184 185 public class FlushTableSubprocedureBuilder implements SubprocedureFactory { 186 187 @Override 188 public Subprocedure buildSubprocedure(String name, byte[] data) { 189 String family = null; 190 // Currently we do not put other data except family, so it is ok to 191 // judge by length that if family was specified 192 if (data.length > 0) { 193 try { 194 HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); 195 family = nsp.getValue(); 196 } catch (Exception e) { 197 LOG.error("fail to get family by parsing from data", e); 198 } 199 } 200 // The name of the procedure instance from the master is the table name. 201 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); 202 } 203 204 } 205 206 /** 207 * We use the FlushTableSubprocedurePool, a class specific thread pool instead of 208 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. 209 * 210 * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of 211 * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation 212 * failures. 213 */ 214 static class FlushTableSubprocedurePool { 215 private final Abortable abortable; 216 private final ExecutorCompletionService<Void> taskPool; 217 private final ThreadPoolExecutor executor; 218 private volatile boolean stopped; 219 private final List<Future<Void>> futures = new ArrayList<>(); 220 private final String name; 221 222 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { 223 this.abortable = abortable; 224 // configure the executor service 225 long keepAlive = conf.getLong( 226 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, 227 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); 228 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); 229 this.name = name; 230 executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, 231 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d") 232 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 233 taskPool = new ExecutorCompletionService<>(executor); 234 } 235 236 boolean hasTasks() { 237 return futures.size() != 0; 238 } 239 240 /** 241 * Submit a task to the pool. 242 * 243 * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. 244 */ 245 void submitTask(final Callable<Void> task) { 246 Future<Void> f = this.taskPool.submit(task); 247 futures.add(f); 248 } 249 250 /** 251 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 252 * This *must* be called after all tasks are submitted via submitTask. 253 * 254 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 255 * @throws InterruptedException 256 */ 257 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 258 LOG.debug("Waiting for local region flush to finish."); 259 260 int sz = futures.size(); 261 try { 262 // Using the completion service to process the futures. 263 for (int i = 0; i < sz; i++) { 264 Future<Void> f = taskPool.take(); 265 f.get(); 266 if (!futures.remove(f)) { 267 LOG.warn("unexpected future" + f); 268 } 269 LOG.debug("Completed " + (i+1) + "/" + sz + " local region flush tasks."); 270 } 271 LOG.debug("Completed " + sz + " local region flush tasks."); 272 return true; 273 } catch (InterruptedException e) { 274 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); 275 if (!stopped) { 276 Thread.currentThread().interrupt(); 277 throw new ForeignException("FlushSubprocedurePool", e); 278 } 279 // we are stopped so we can just exit. 280 } catch (ExecutionException e) { 281 Throwable cause = e.getCause(); 282 if (cause instanceof ForeignException) { 283 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); 284 throw (ForeignException)e.getCause(); 285 } else if (cause instanceof DroppedSnapshotException) { 286 // we have to abort the region server according to contract of flush 287 abortable.abort("Received DroppedSnapshotException, aborting", cause); 288 } 289 LOG.warn("Got Exception in FlushSubprocedurePool", e); 290 throw new ForeignException(name, e.getCause()); 291 } finally { 292 cancelTasks(); 293 } 294 return false; 295 } 296 297 /** 298 * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running 299 * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). 300 * @throws InterruptedException 301 */ 302 void cancelTasks() throws InterruptedException { 303 Collection<Future<Void>> tasks = futures; 304 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); 305 for (Future<Void> f: tasks) { 306 f.cancel(false); 307 } 308 309 // evict remaining tasks and futures from taskPool. 310 futures.clear(); 311 while (taskPool.poll() != null) {} 312 stop(); 313 } 314 315 /** 316 * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be 317 * interrupted (see HBASE-13877) 318 */ 319 void stop() { 320 if (this.stopped) return; 321 322 this.stopped = true; 323 this.executor.shutdown(); 324 } 325 } 326 327 /** 328 * Initialize this region server flush procedure manager 329 * Uses a zookeeper based member controller. 330 * @param rss region server 331 * @throws KeeperException if the zookeeper cannot be reached 332 */ 333 @Override 334 public void initialize(RegionServerServices rss) throws KeeperException { 335 this.rss = rss; 336 ZKWatcher zkw = rss.getZooKeeper(); 337 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, 338 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); 339 340 Configuration conf = rss.getConfiguration(); 341 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 342 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); 343 344 // create the actual flush table procedure member 345 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 346 opThreads, keepAlive); 347 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); 348 } 349 350 @Override 351 public String getProcedureSignature() { 352 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; 353 } 354 355}