001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.ExecutionException; 026import java.util.concurrent.ExecutorCompletionService; 027import java.util.concurrent.Future; 028import java.util.concurrent.ThreadPoolExecutor; 029import java.util.concurrent.TimeUnit; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Abortable; 032import org.apache.hadoop.hbase.DroppedSnapshotException; 033import org.apache.hadoop.hbase.HBaseInterfaceAudience; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.errorhandling.ForeignException; 036import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 037import org.apache.hadoop.hbase.procedure.ProcedureMember; 038import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 039import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 040import org.apache.hadoop.hbase.procedure.Subprocedure; 041import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 042import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 043import org.apache.hadoop.hbase.regionserver.HRegion; 044import org.apache.hadoop.hbase.regionserver.HRegionServer; 045import org.apache.hadoop.hbase.regionserver.RegionServerServices; 046import org.apache.hadoop.hbase.util.Threads; 047import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 048import org.apache.yetus.audience.InterfaceAudience; 049import org.apache.zookeeper.KeeperException; 050import org.slf4j.Logger; 051import org.slf4j.LoggerFactory; 052 053import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 054 055import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 056 057/** 058 * This manager class handles flushing of the regions for table on a {@link HRegionServer}. 059 */ 060@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 061public class RegionServerFlushTableProcedureManager extends RegionServerProcedureManager { 062 private static final Logger LOG = 063 LoggerFactory.getLogger(RegionServerFlushTableProcedureManager.class); 064 065 private static final String CONCURENT_FLUSH_TASKS_KEY = 066 "hbase.flush.procedure.region.concurrentTasks"; 067 private static final int DEFAULT_CONCURRENT_FLUSH_TASKS = 3; 068 069 public static final String FLUSH_REQUEST_THREADS_KEY = 070 "hbase.flush.procedure.region.pool.threads"; 071 public static final int FLUSH_REQUEST_THREADS_DEFAULT = 10; 072 073 public static final String FLUSH_TIMEOUT_MILLIS_KEY = "hbase.flush.procedure.region.timeout"; 074 public static final long FLUSH_TIMEOUT_MILLIS_DEFAULT = 60000; 075 076 public static final String FLUSH_REQUEST_WAKE_MILLIS_KEY = 077 "hbase.flush.procedure.region.wakefrequency"; 078 private static final long FLUSH_REQUEST_WAKE_MILLIS_DEFAULT = 500; 079 080 private RegionServerServices rss; 081 private ProcedureMemberRpcs memberRpcs; 082 private ProcedureMember member; 083 084 /** 085 * Exposed for testing. 086 * @param conf HBase configuration. 087 * @param server region server. 088 * @param memberRpc use specified memberRpc instance 089 * @param procMember use specified ProcedureMember 090 */ 091 RegionServerFlushTableProcedureManager(Configuration conf, HRegionServer server, 092 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 093 this.rss = server; 094 this.memberRpcs = memberRpc; 095 this.member = procMember; 096 } 097 098 public RegionServerFlushTableProcedureManager() { 099 } 100 101 /** 102 * Start accepting flush table requests. 103 */ 104 @Override 105 public void start() { 106 LOG.debug("Start region server flush procedure manager " + rss.getServerName().toString()); 107 this.memberRpcs.start(rss.getServerName().toString(), member); 108 } 109 110 /** 111 * Close <tt>this</tt> and all running tasks 112 * @param force forcefully stop all running tasks 113 */ 114 @Override 115 public void stop(boolean force) throws IOException { 116 String mode = force ? "abruptly" : "gracefully"; 117 LOG.info("Stopping region server flush procedure manager " + mode + "."); 118 119 try { 120 this.member.close(); 121 } finally { 122 this.memberRpcs.close(); 123 } 124 } 125 126 /** 127 * If in a running state, creates the specified subprocedure to flush table regions. Because this 128 * gets the local list of regions to flush and not the set the master had, there is a possibility 129 * of a race where regions may be missed. 130 * @return Subprocedure to submit to the ProcedureMember. 131 */ 132 public Subprocedure buildSubprocedure(String table, String family) { 133 134 // don't run the subprocedure if the parent is stop(ping) 135 if (rss.isStopping() || rss.isStopped()) { 136 throw new IllegalStateException("Can't start flush region subprocedure on RS: " 137 + rss.getServerName() + ", because stopping/stopped!"); 138 } 139 140 // check to see if this server is hosting any regions for the table 141 List<HRegion> involvedRegions; 142 try { 143 involvedRegions = getRegionsToFlush(table); 144 } catch (IOException e1) { 145 throw new IllegalStateException("Failed to figure out if there is region to flush.", e1); 146 } 147 148 // We need to run the subprocedure even if we have no relevant regions. The coordinator 149 // expects participation in the procedure and without sending message the master procedure 150 // will hang and fail. 151 152 LOG.debug("Launching subprocedure to flush regions for " + table); 153 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(table); 154 Configuration conf = rss.getConfiguration(); 155 long timeoutMillis = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 156 long wakeMillis = 157 conf.getLong(FLUSH_REQUEST_WAKE_MILLIS_KEY, FLUSH_REQUEST_WAKE_MILLIS_DEFAULT); 158 159 FlushTableSubprocedurePool taskManager = 160 new FlushTableSubprocedurePool(rss.getServerName().toString(), conf, rss); 161 return new FlushTableSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, 162 involvedRegions, table, family, taskManager); 163 } 164 165 /** 166 * Get the list of regions to flush for the table on this server It is possible that if a region 167 * moves somewhere between the calls we'll miss the region. 168 * @return the list of online regions. Empty list is returned if no regions. 169 */ 170 private List<HRegion> getRegionsToFlush(String table) throws IOException { 171 return (List<HRegion>) rss.getRegions(TableName.valueOf(table)); 172 } 173 174 public class FlushTableSubprocedureBuilder implements SubprocedureFactory { 175 176 @Override 177 public Subprocedure buildSubprocedure(String name, byte[] data) { 178 String family = null; 179 // Currently we do not put other data except family, so it is ok to 180 // judge by length that if family was specified 181 if (data.length > 0) { 182 try { 183 HBaseProtos.NameStringPair nsp = HBaseProtos.NameStringPair.parseFrom(data); 184 family = nsp.getValue(); 185 } catch (Exception e) { 186 LOG.error("fail to get family by parsing from data", e); 187 } 188 } 189 // The name of the procedure instance from the master is the table name. 190 return RegionServerFlushTableProcedureManager.this.buildSubprocedure(name, family); 191 } 192 193 } 194 195 /** 196 * We use the FlushTableSubprocedurePool, a class specific thread pool instead of 197 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a 198 * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed 199 * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures. 200 */ 201 static class FlushTableSubprocedurePool { 202 private final Abortable abortable; 203 private final ExecutorCompletionService<Void> taskPool; 204 private final ThreadPoolExecutor executor; 205 private volatile boolean stopped; 206 private final List<Future<Void>> futures = new ArrayList<>(); 207 private final String name; 208 209 FlushTableSubprocedurePool(String name, Configuration conf, Abortable abortable) { 210 this.abortable = abortable; 211 // configure the executor service 212 long keepAlive = conf.getLong(RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_KEY, 213 RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT); 214 int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS); 215 this.name = name; 216 executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, 217 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-flush-proc-pool-%d") 218 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 219 taskPool = new ExecutorCompletionService<>(executor); 220 } 221 222 boolean hasTasks() { 223 return futures.size() != 0; 224 } 225 226 /** 227 * Submit a task to the pool. NOTE: all must be submitted before you can safely 228 * {@link #waitForOutstandingTasks()}. 229 */ 230 void submitTask(final Callable<Void> task) { 231 Future<Void> f = this.taskPool.submit(task); 232 futures.add(f); 233 } 234 235 /** 236 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 237 * This *must* be called after all tasks are submitted via submitTask. 238 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 239 */ 240 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 241 LOG.debug("Waiting for local region flush to finish."); 242 243 int sz = futures.size(); 244 try { 245 // Using the completion service to process the futures. 246 for (int i = 0; i < sz; i++) { 247 Future<Void> f = taskPool.take(); 248 f.get(); 249 if (!futures.remove(f)) { 250 LOG.warn("unexpected future" + f); 251 } 252 LOG.debug("Completed " + (i + 1) + "/" + sz + " local region flush tasks."); 253 } 254 LOG.debug("Completed " + sz + " local region flush tasks."); 255 return true; 256 } catch (InterruptedException e) { 257 LOG.warn("Got InterruptedException in FlushSubprocedurePool", e); 258 if (!stopped) { 259 Thread.currentThread().interrupt(); 260 throw new ForeignException("FlushSubprocedurePool", e); 261 } 262 // we are stopped so we can just exit. 263 } catch (ExecutionException e) { 264 Throwable cause = e.getCause(); 265 if (cause instanceof ForeignException) { 266 LOG.warn("Rethrowing ForeignException from FlushSubprocedurePool", e); 267 throw (ForeignException) e.getCause(); 268 } else if (cause instanceof DroppedSnapshotException) { 269 // we have to abort the region server according to contract of flush 270 abortable.abort("Received DroppedSnapshotException, aborting", cause); 271 } 272 LOG.warn("Got Exception in FlushSubprocedurePool", e); 273 throw new ForeignException(name, e.getCause()); 274 } finally { 275 cancelTasks(); 276 } 277 return false; 278 } 279 280 /** 281 * This attempts to cancel out all pending and in progress tasks. Does not interrupt the running 282 * tasks itself. An ongoing HRegion.flush() should not be interrupted (see HBASE-13877). 283 */ 284 void cancelTasks() throws InterruptedException { 285 Collection<Future<Void>> tasks = futures; 286 LOG.debug("cancelling " + tasks.size() + " flush region tasks " + name); 287 for (Future<Void> f : tasks) { 288 f.cancel(false); 289 } 290 291 // evict remaining tasks and futures from taskPool. 292 futures.clear(); 293 while (taskPool.poll() != null) { 294 } 295 stop(); 296 } 297 298 /** 299 * Gracefully shutdown the thread pool. An ongoing HRegion.flush() should not be interrupted 300 * (see HBASE-13877) 301 */ 302 void stop() { 303 if (this.stopped) return; 304 305 this.stopped = true; 306 this.executor.shutdown(); 307 } 308 } 309 310 /** 311 * Initialize this region server flush procedure manager Uses a zookeeper based member controller. 312 * @param rss region server 313 * @throws KeeperException if the zookeeper cannot be reached 314 */ 315 @Override 316 public void initialize(RegionServerServices rss) throws KeeperException { 317 this.rss = rss; 318 ZKWatcher zkw = rss.getZooKeeper(); 319 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, 320 MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE); 321 322 Configuration conf = rss.getConfiguration(); 323 long keepAlive = conf.getLong(FLUSH_TIMEOUT_MILLIS_KEY, FLUSH_TIMEOUT_MILLIS_DEFAULT); 324 int opThreads = conf.getInt(FLUSH_REQUEST_THREADS_KEY, FLUSH_REQUEST_THREADS_DEFAULT); 325 326 // create the actual flush table procedure member 327 ThreadPoolExecutor pool = 328 ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); 329 this.member = new ProcedureMember(memberRpcs, pool, new FlushTableSubprocedureBuilder()); 330 } 331 332 @Override 333 public String getProcedureSignature() { 334 return MasterFlushTableProcedureManager.FLUSH_TABLE_PROCEDURE_SIGNATURE; 335 } 336 337}