001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.snapshot; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorCompletionService; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.Abortable; 033import org.apache.hadoop.hbase.DroppedSnapshotException; 034import org.apache.hadoop.hbase.HBaseInterfaceAudience; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.RegionReplicaUtil; 037import org.apache.hadoop.hbase.errorhandling.ForeignException; 038import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 039import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 040import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 041import org.apache.hadoop.hbase.procedure.ProcedureMember; 042import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 043import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 044import org.apache.hadoop.hbase.procedure.Subprocedure; 045import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 046import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 047import org.apache.hadoop.hbase.regionserver.HRegion; 048import org.apache.hadoop.hbase.regionserver.HRegionServer; 049import org.apache.hadoop.hbase.regionserver.RegionServerServices; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 052import org.apache.yetus.audience.InterfaceAudience; 053import org.apache.yetus.audience.InterfaceStability; 054import org.apache.zookeeper.KeeperException; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 059 060import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 061 062/** 063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}. 064 * <p> 065 * This provides the mechanism necessary to kick off a online snapshot specific {@link Subprocedure} 066 * that is responsible for the regions being served by this region server. If any failures occur 067 * with the subprocedure, the RegionSeverSnapshotManager's subprocedure handler, 068 * {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all others. 069 * <p> 070 * On startup, requires {@link #start()} to be called. 071 * <p> 072 * On shutdown, requires {@link #stop(boolean)} to be called 073 */ 074@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 075@InterfaceStability.Unstable 076public class RegionServerSnapshotManager extends RegionServerProcedureManager { 077 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class); 078 079 /** Maximum number of snapshot region tasks that can run concurrently */ 080 private static final String CONCURENT_SNAPSHOT_TASKS_KEY = 081 "hbase.snapshot.region.concurrentTasks"; 082 private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3; 083 084 /** Conf key for number of request threads to start snapshots on regionservers */ 085 public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads"; 086 /** # of threads for snapshotting regions on the rs. */ 087 public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10; 088 089 /** Conf key for max time to keep threads in snapshot request pool waiting */ 090 public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout"; 091 /** Keep threads alive in request pool for max of 300 seconds */ 092 public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000; 093 094 /** Conf key for millis between checks to see if snapshot completed or if there are errors */ 095 public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = 096 "hbase.snapshot.region.wakefrequency"; 097 /** Default amount of time to check for errors while regions finish snapshotting */ 098 private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500; 099 100 private RegionServerServices rss; 101 private ProcedureMemberRpcs memberRpcs; 102 private ProcedureMember member; 103 104 /** 105 * Exposed for testing. 106 * @param conf HBase configuration. 107 * @param parent parent running the snapshot handler 108 * @param memberRpc use specified memberRpc instance 109 * @param procMember use specified ProcedureMember 110 */ 111 RegionServerSnapshotManager(Configuration conf, HRegionServer parent, 112 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 113 this.rss = parent; 114 this.memberRpcs = memberRpc; 115 this.member = procMember; 116 } 117 118 public RegionServerSnapshotManager() { 119 } 120 121 /** 122 * Start accepting snapshot requests. 123 */ 124 @Override 125 public void start() { 126 LOG.debug("Start Snapshot Manager " + rss.getServerName().toString()); 127 this.memberRpcs.start(rss.getServerName().toString(), member); 128 } 129 130 /** 131 * Close <tt>this</tt> and all running snapshot tasks 132 * @param force forcefully stop all running tasks n 133 */ 134 @Override 135 public void stop(boolean force) throws IOException { 136 String mode = force ? "abruptly" : "gracefully"; 137 LOG.info("Stopping RegionServerSnapshotManager " + mode + "."); 138 139 try { 140 this.member.close(); 141 } finally { 142 this.memberRpcs.close(); 143 } 144 } 145 146 /** 147 * If in a running state, creates the specified subprocedure for handling an online snapshot. 148 * Because this gets the local list of regions to snapshot and not the set the master had, there 149 * is a possibility of a race where regions may be missed. This detected by the master in the 150 * snapshot verification step. n * @return Subprocedure to submit to the ProcedureMember. 151 */ 152 public Subprocedure buildSubprocedure(SnapshotDescription snapshot) { 153 154 // don't run a snapshot if the parent is stop(ping) 155 if (rss.isStopping() || rss.isStopped()) { 156 throw new IllegalStateException( 157 "Can't start snapshot on RS: " + rss.getServerName() + ", because stopping/stopped!"); 158 } 159 160 // check to see if this server is hosting any regions for the snapshots 161 // check to see if we have regions for the snapshot 162 List<HRegion> involvedRegions; 163 try { 164 involvedRegions = getRegionsToSnapshot(snapshot); 165 } catch (IOException e1) { 166 throw new IllegalStateException("Failed to figure out if we should handle a snapshot - " 167 + "something has gone awry with the online regions.", e1); 168 } 169 170 // We need to run the subprocedure even if we have no relevant regions. The coordinator 171 // expects participation in the procedure and without sending message the snapshot attempt 172 // will hang and fail. 173 174 LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " 175 + snapshot.getTable() + " type " + snapshot.getType()); 176 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName()); 177 Configuration conf = rss.getConfiguration(); 178 long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 179 long wakeMillis = 180 conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); 181 182 switch (snapshot.getType()) { 183 case FLUSH: 184 SnapshotSubprocedurePool taskManager = 185 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 186 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, 187 involvedRegions, snapshot, taskManager); 188 case SKIPFLUSH: 189 /* 190 * This is to take an online-snapshot without force a coordinated flush to prevent pause The 191 * snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure 192 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be 193 * turned on/off based on the flush type. To minimized the code change, class name is not 194 * changed. 195 */ 196 SnapshotSubprocedurePool taskManager2 = 197 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 198 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, timeoutMillis, 199 involvedRegions, snapshot, taskManager2); 200 201 default: 202 throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType()); 203 } 204 } 205 206 /** 207 * Determine if the snapshot should be handled on this server NOTE: This is racy -- the master 208 * expects a list of regionservers. This means if a region moves somewhere between the calls we'll 209 * miss some regions. For example, a region move during a snapshot could result in a region to be 210 * skipped or done twice. This is manageable because the {@link MasterSnapshotVerifier} will 211 * double check the region lists after the online portion of the snapshot completes and will 212 * explicitly fail the snapshot. n * @return the list of online regions. Empty list is returned if 213 * no regions are responsible for the given snapshot. n 214 */ 215 private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException { 216 List<HRegion> onlineRegions = 217 (List<HRegion>) rss.getRegions(TableName.valueOf(snapshot.getTable())); 218 Iterator<HRegion> iterator = onlineRegions.iterator(); 219 // remove the non-default regions 220 while (iterator.hasNext()) { 221 HRegion r = iterator.next(); 222 if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 223 iterator.remove(); 224 } 225 } 226 return onlineRegions; 227 } 228 229 /** 230 * Build the actual snapshot runner that will do all the 'hard' work 231 */ 232 public class SnapshotSubprocedureBuilder implements SubprocedureFactory { 233 234 @Override 235 public Subprocedure buildSubprocedure(String name, byte[] data) { 236 try { 237 // unwrap the snapshot information 238 SnapshotDescription snapshot = SnapshotDescription.parseFrom(data); 239 return RegionServerSnapshotManager.this.buildSubprocedure(snapshot); 240 } catch (IOException e) { 241 throw new IllegalArgumentException("Could not read snapshot information from request."); 242 } 243 } 244 245 } 246 247 /** 248 * We use the SnapshotSubprocedurePool, a class specific thread pool instead of 249 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. It uses a 250 * {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of completed 251 * tasks which lets us efficiently cancel pending tasks upon the earliest operation failures. 252 * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't 253 * really built for coordinated tasks where multiple threads as part of one larger task. In RS's 254 * the HBase Executor services are only used for open and close and not other threadpooled 255 * operations such as compactions and replication sinks. 256 */ 257 static class SnapshotSubprocedurePool { 258 private final Abortable abortable; 259 private final ExecutorCompletionService<Void> taskPool; 260 private final ThreadPoolExecutor executor; 261 private volatile boolean stopped; 262 private final List<Future<Void>> futures = new ArrayList<>(); 263 private final String name; 264 265 SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) { 266 this.abortable = abortable; 267 // configure the executor service 268 long keepAlive = conf.getLong(RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, 269 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 270 int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); 271 this.name = name; 272 executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, 273 new ThreadFactoryBuilder().setNameFormat("rs(" + name + ")-snapshot-pool-%d") 274 .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); 275 taskPool = new ExecutorCompletionService<>(executor); 276 } 277 278 boolean hasTasks() { 279 return futures.size() != 0; 280 } 281 282 /** 283 * Submit a task to the pool. NOTE: all must be submitted before you can safely 284 * {@link #waitForOutstandingTasks()}. This version does not support issuing tasks from multiple 285 * concurrent table snapshots requests. 286 */ 287 void submitTask(final Callable<Void> task) { 288 Future<Void> f = this.taskPool.submit(task); 289 futures.add(f); 290 } 291 292 /** 293 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 294 * This *must* be called after all tasks are submitted via submitTask. 295 * @return <tt>true</tt> on success, <tt>false</tt> otherwise n * @throws 296 * SnapshotCreationException if the snapshot failed while we were waiting 297 */ 298 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 299 LOG.debug("Waiting for local region snapshots to finish."); 300 301 int sz = futures.size(); 302 try { 303 // Using the completion service to process the futures that finish first first. 304 for (int i = 0; i < sz; i++) { 305 Future<Void> f = taskPool.take(); 306 f.get(); 307 if (!futures.remove(f)) { 308 LOG.warn("unexpected future" + f); 309 } 310 LOG.debug("Completed " + (i + 1) + "/" + sz + " local region snapshots."); 311 } 312 LOG.debug("Completed " + sz + " local region snapshots."); 313 return true; 314 } catch (InterruptedException e) { 315 LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e); 316 if (!stopped) { 317 Thread.currentThread().interrupt(); 318 throw new ForeignException("SnapshotSubprocedurePool", e); 319 } 320 // we are stopped so we can just exit. 321 } catch (ExecutionException e) { 322 Throwable cause = e.getCause(); 323 if (cause instanceof ForeignException) { 324 LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e); 325 throw (ForeignException) e.getCause(); 326 } else if (cause instanceof DroppedSnapshotException) { 327 // we have to abort the region server according to contract of flush 328 abortable.abort("Received DroppedSnapshotException, aborting", cause); 329 } 330 LOG.warn("Got Exception in SnapshotSubprocedurePool", e); 331 throw new ForeignException(name, e.getCause()); 332 } finally { 333 cancelTasks(); 334 } 335 return false; 336 } 337 338 /** 339 * This attempts to cancel out all pending and in progress tasks (interruptions issues) n 340 */ 341 void cancelTasks() throws InterruptedException { 342 Collection<Future<Void>> tasks = futures; 343 LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name); 344 for (Future<Void> f : tasks) { 345 // TODO Ideally we'd interrupt hbase threads when we cancel. However it seems that there 346 // are places in the HBase code where row/region locks are taken and not released in a 347 // finally block. Thus we cancel without interrupting. Cancellations will be slower to 348 // complete but we won't suffer from unreleased locks due to poor code discipline. 349 f.cancel(false); 350 } 351 352 // evict remaining tasks and futures from taskPool. 353 futures.clear(); 354 while (taskPool.poll() != null) { 355 } 356 stop(); 357 } 358 359 /** 360 * Abruptly shutdown the thread pool. Call when exiting a region server. 361 */ 362 void stop() { 363 if (this.stopped) return; 364 365 this.stopped = true; 366 this.executor.shutdown(); 367 } 368 } 369 370 /** 371 * Create a default snapshot handler - uses a zookeeper based member controller. 372 * @param rss region server running the handler 373 * @throws KeeperException if the zookeeper cluster cannot be reached 374 */ 375 @Override 376 public void initialize(RegionServerServices rss) throws KeeperException { 377 this.rss = rss; 378 ZKWatcher zkw = rss.getZooKeeper(); 379 this.memberRpcs = 380 new ZKProcedureMemberRpcs(zkw, SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION); 381 382 // read in the snapshot request configuration properties 383 Configuration conf = rss.getConfiguration(); 384 long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 385 int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); 386 387 // create the actual snapshot procedure member 388 ThreadPoolExecutor pool = 389 ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive); 390 this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); 391 } 392 393 @Override 394 public String getProcedureSignature() { 395 return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; 396 } 397 398}