001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.snapshot; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorCompletionService; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.DroppedSnapshotException; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionReplicaUtil; 037import org.apache.hadoop.hbase.errorhandling.ForeignException; 038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.procedure.ProcedureMember; 042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 044import org.apache.hadoop.hbase.procedure.Subprocedure; 045import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.yetus.audience.InterfaceStability; 054import org.apache.zookeeper.KeeperException; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 061 062/** 063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}. 064 * <p> 065 * This provides the mechanism necessary to kick off a online snapshot specific {@link Subprocedure} 066 * that is responsible for the regions being served by this region server. If any failures occur 067 * with the subprocedure, the RegionSeverSnapshotManager's subprocedure handler, 068 * {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all others. 069 * <p> 070 * On startup, requires {@link #start()} to be called. 071 * <p> 072 * On shutdown, requires {@link #stop(boolean)} to be called 073 */ 074@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 075@InterfaceStability.Unstable 076public class RegionServerSnapshotManager extends RegionServerProcedureManager { 077 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class); 078 079 /** Maximum number of snapshot region tasks that can run concurrently */ 080 private static final String CONCURENT_SNAPSHOT_TASKS_KEY = 081 "hbase.snapshot.region.concurrentTasks"; 082 private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3; 083 084 /** Conf key for number of request threads to start snapshots on regionservers */ 085 public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads"; 086 /** # of threads for snapshotting regions on the rs. */ 087 public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10; 088 089 /** Conf key for max time to keep threads in snapshot request pool waiting */ 090 public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout"; 091 /** Keep threads alive in request pool for max of 300 seconds */ 092 public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000; 093 094 /** Conf key for millis between checks to see if snapshot completed or if there are errors */ 095 public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = 096 "hbase.snapshot.region.wakefrequency"; 097 /** Default amount of time to check for errors while regions finish snapshotting */ 098 private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500; 099 100 private RegionServerServices rss; 101 private ProcedureMemberRpcs memberRpcs; 102 private ProcedureMember member; 103 104 /** 105 * Exposed for testing. 106 * @param conf HBase configuration. 107 * @param parent parent running the snapshot handler 108 * @param memberRpc use specified memberRpc instance 109 * @param procMember use specified ProcedureMember 110 */ 111 RegionServerSnapshotManager(Configuration conf, HRegionServer parent, 112 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 113 this.rss = parent; 114 this.memberRpcs = memberRpc; 115 this.member = procMember; 116 } 117 118 public RegionServerSnapshotManager() { 119 } 120 121 /** 122 * Start accepting snapshot requests. 123 */ 124 @Override 125 public void start() { 126 LOG.debug("Start Snapshot Manager " + rss.getServerName().toString()); 127 this.memberRpcs.start(rss.getServerName().toString(), member); 128 } 129 130 /** 131 * Close <tt>this</tt> and all running snapshot tasks 132 * @param force forcefully stop all running tasks 133 */ 134 @Override 135 public void stop(boolean force) throws IOException { 136 String mode = force ? "abruptly" : "gracefully"; 137 LOG.info("Stopping RegionServerSnapshotManager " + mode + "."); 138 139 try { 140 this.member.close(); 141 } finally { 142 this.memberRpcs.close(); 143 } 144 } 145 146 /** 147 * If in a running state, creates the specified subprocedure for handling an online snapshot. 148 * Because this gets the local list of regions to snapshot and not the set the master had, there 149 * is a possibility of a race where regions may be missed. This detected by the master in the 150 * snapshot verification step. 151 * @return Subprocedure to submit to the ProcedureMember. 152 */ 153 public Subprocedure buildSubprocedure(SnapshotDescription snapshot) { 154 155 // don't run a snapshot if the parent is stop(ping) 156 if (rss.isStopping() || rss.isStopped()) { 157 throw new IllegalStateException( 158 "Can't start snapshot on RS: " + rss.getServerName() + ", because stopping/stopped!"); 159 } 160 161 // check to see if this server is hosting any regions for the snapshots 162 // check to see if we have regions for the snapshot 163 List<HRegion> involvedRegions; 164 try { 165 involvedRegions = getRegionsToSnapshot(snapshot); 166 } catch (IOException e1) { 167 throw new IllegalStateException("Failed to figure out if we should handle a snapshot - " 168 + "something has gone awry with the online regions.", e1); 169 } 170 171 // We need to run the subprocedure even if we have no relevant regions. The coordinator 172 // expects participation in the procedure and without sending message the snapshot attempt 173 // will hang and fail. 174 175 LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " 176 + snapshot.getTable() + " type " + snapshot.getType()); 177 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName()); 178 Configuration conf = rss.getConfiguration(); 179 long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 180 long wakeMillis = 181 conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); 182 183 switch (snapshot.getType()) { 184 case FLUSH: 185 SnapshotSubprocedurePool taskManager = 186 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 187 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, 188 involvedRegions, snapshot, taskManager); 189 case SKIPFLUSH: 190 /* 191 * This is to take an online-snapshot without force a coordinated flush to prevent pause The 192 * snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure 193 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be 194 * turned on/off based on the flush type. To minimized the code change, class name is not 195 * changed. 196 */ 197 SnapshotSubprocedurePool taskManager2 = 198 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 199 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, 200 involvedRegions, snapshot, taskManager2); 201 202 default: 203 throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType()); 204 } 205 } 206 207 /** 208 * Determine if the snapshot should be handled on this server NOTE: This is racy -- the master 209 * expects a list of regionservers. This means if a region moves somewhere between the calls we'll 210 * miss some regions. For example, a region move during a snapshot could result in a region to be 211 * skipped or done twice. This is manageable because the {@link MasterSnapshotVerifier} will 212 * double check the region lists after the online portion of the snapshot completes and will 213 * explicitly fail the snapshot. 214 * @return the list of online regions. Empty list is returned if no regions are responsible for 215 * the given snapshot. 216 */ 217 private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException { 218 List<HRegion> onlineRegions = 219 (List<HRegion>) rss.getRegions(TableName.valueOf(snapshot.getTable())); 220 Iterator<HRegion> iterator = onlineRegions.iterator(); 221 // remove the non-default regions 222 while (iterator.hasNext()) { 223 HRegion r = iterator.next(); 224 if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 225 iterator.remove(); 226 } 227 } 228 return onlineRegions; 229 } 230 231 /** 232 * Build the actual snapshot runner that will do all the 'hard' work 233 */ 234 public class SnapshotSubprocedureBuilder implements SubprocedureFactory { 235 236 @Override 237 public Subprocedure buildSubprocedure(String name, byte[] data) { 238 try { 239 // unwrap the snapshot information 240 SnapshotDescription snapshot = SnapshotDescription.parseFrom(data); 241 return RegionServerSnapshotManager.this.buildSubprocedure(snapshot); 242 } catch (IOException e) { 243 throw new IllegalArgumentException("Could not read snapshot information from request."); 244 } 245 } 246 247 } 248 249 /** 250 * We use the SnapshotSubprocedurePool, a class specific thread pool instead of 251 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a 252 * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed 253 * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures. 254 * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't 255 * really built for coordinated tasks where multiple threads as part of one larger task. In RS's 256 * the HBase Executor services are only used for open and close and not other threadpooled 257 * operations such as compactions and replication sinks. 258 */ 259 static class SnapshotSubprocedurePool { 260 private final Abortable abortable; 261 private final ExecutorCompletionService<Void> taskPool; 262 private final ThreadPoolExecutor executor; 263 private volatile boolean stopped; 264 private final List<Future<Void>> futures = new ArrayList<>(); 265 private final String name; 266 267 SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) { 268 this.abortable = abortable; 269 // configure the executor service 270 long keepAlive = conf.getLong(RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, 271 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 272 int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); 273 this.name = name; 274 executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, 275 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d") 276 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 277 taskPool = new ExecutorCompletionService<>(executor); 278 } 279 280 boolean hasTasks() { 281 return futures.size() != 0; 282 } 283 284 /** 285 * Submit a task to the pool. NOTE: all must be submitted before you can safely 286 * {@link #waitForOutstandingTasks()}. This version does not support issuing tasks from multiple 287 * concurrent table snapshots requests. 288 */ 289 void submitTask(final Callable<Void> task) { 290 Future<Void> f = this.taskPool.submit(task); 291 futures.add(f); 292 } 293 294 /** 295 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 296 * This *must* be called after all tasks are submitted via submitTask. 297 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 298 * @throws SnapshotCreationException if the snapshot failed while we were waiting 299 */ 300 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 301 LOG.debug("Waiting for local region snapshots to finish."); 302 303 int sz = futures.size(); 304 try { 305 // Using the completion service to process the futures that finish first first. 306 for (int i = 0; i < sz; i++) { 307 Future<Void> f = taskPool.take(); 308 f.get(); 309 if (!futures.remove(f)) { 310 LOG.warn("unexpected future" + f); 311 } 312 LOG.debug("Completed " + (i + 1) + "/" + sz + " local region snapshots."); 313 } 314 LOG.debug("Completed " + sz + " local region snapshots."); 315 return true; 316 } catch (InterruptedException e) { 317 LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e); 318 if (!stopped) { 319 Thread.currentThread().interrupt(); 320 throw new ForeignException("SnapshotSubprocedurePool", e); 321 } 322 // we are stopped so we can just exit. 323 } catch (ExecutionException e) { 324 Throwable cause = e.getCause(); 325 if (cause instanceof ForeignException) { 326 LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e); 327 throw (ForeignException) e.getCause(); 328 } else if (cause instanceof DroppedSnapshotException) { 329 // we have to abort the region server according to contract of flush 330 abortable.abort("Received DroppedSnapshotException, aborting", cause); 331 } 332 LOG.warn("Got Exception in SnapshotSubprocedurePool", e); 333 throw new ForeignException(name, e.getCause()); 334 } finally { 335 cancelTasks(); 336 } 337 return false; 338 } 339 340 /** 341 * This attempts to cancel out all pending and in progress tasks (interruptions issues) 342 */ 343 void cancelTasks() throws InterruptedException { 344 Collection<Future<Void>> tasks = futures; 345 LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name); 346 for (Future<Void> f : tasks) { 347 // TODO Ideally we'd interrupt hbase threads when we cancel. However it seems that there 348 // are places in the HBase code where row/region locks are taken and not released in a 349 // finally block. Thus we cancel without interrupting. Cancellations will be slower to 350 // complete but we won't suffer from unreleased locks due to poor code discipline. 351 f.cancel(false); 352 } 353 354 // evict remaining tasks and futures from taskPool. 355 futures.clear(); 356 while (taskPool.poll() != null) { 357 } 358 stop(); 359 } 360 361 /** 362 * Abruptly shutdown the thread pool. Call when exiting a region server. 363 */ 364 void stop() { 365 if (this.stopped) return; 366 367 this.stopped = true; 368 this.executor.shutdown(); 369 } 370 } 371 372 /** 373 * Create a default snapshot handler - uses a zookeeper based member controller. 374 * @param rss region server running the handler 375 * @throws KeeperException if the zookeeper cluster cannot be reached 376 */ 377 @Override 378 public void initialize(RegionServerServices rss) throws KeeperException { 379 this.rss = rss; 380 ZKWatcher zkw = rss.getZooKeeper(); 381 this.memberRpcs = 382 new ZKProcedureMemberRpcs(zkw, SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION); 383 384 // read in the snapshot request configuration properties 385 Configuration conf = rss.getConfiguration(); 386 long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 387 int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); 388 389 // create the actual snapshot procedure member 390 ThreadPoolExecutor pool = 391 ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); 392 this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); 393 } 394 395 @Override 396 public String getProcedureSignature() { 397 return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; 398 } 399 400}