001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.snapshot; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutionException; 027import java.util.concurrent.ExecutorCompletionService; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.DaemonThreadFactory; 035import org.apache.hadoop.hbase.DroppedSnapshotException; 036import org.apache.hadoop.hbase.HBaseInterfaceAudience; 037import org.apache.hadoop.hbase.TableName; 038import org.apache.hadoop.hbase.util.Threads; 039import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.apache.yetus.audience.InterfaceStability; 042import org.apache.hadoop.hbase.client.RegionReplicaUtil; 043import org.apache.hadoop.hbase.errorhandling.ForeignException; 044import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 045import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier; 046import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; 047import org.apache.hadoop.hbase.procedure.ProcedureMember; 048import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs; 049import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager; 050import org.apache.hadoop.hbase.procedure.Subprocedure; 051import org.apache.hadoop.hbase.procedure.SubprocedureFactory; 052import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs; 053import org.apache.hadoop.hbase.regionserver.HRegion; 054import org.apache.hadoop.hbase.regionserver.HRegionServer; 055import org.apache.hadoop.hbase.regionserver.RegionServerServices; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; 057import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; 058import org.apache.zookeeper.KeeperException; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * This manager class handles the work dealing with snapshots for a {@link HRegionServer}. 064 * <p> 065 * This provides the mechanism necessary to kick off a online snapshot specific 066 * {@link Subprocedure} that is responsible for the regions being served by this region server. 067 * If any failures occur with the subprocedure, the RegionSeverSnapshotManager's subprocedure 068 * handler, {@link ProcedureMember}, notifies the master's ProcedureCoordinator to abort all 069 * others. 070 * <p> 071 * On startup, requires {@link #start()} to be called. 072 * <p> 073 * On shutdown, requires {@link #stop(boolean)} to be called 074 */ 075@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 076@InterfaceStability.Unstable 077public class RegionServerSnapshotManager extends RegionServerProcedureManager { 078 private static final Logger LOG = LoggerFactory.getLogger(RegionServerSnapshotManager.class); 079 080 /** Maximum number of snapshot region tasks that can run concurrently */ 081 private static final String CONCURENT_SNAPSHOT_TASKS_KEY = "hbase.snapshot.region.concurrentTasks"; 082 private static final int DEFAULT_CONCURRENT_SNAPSHOT_TASKS = 3; 083 084 /** Conf key for number of request threads to start snapshots on regionservers */ 085 public static final String SNAPSHOT_REQUEST_THREADS_KEY = "hbase.snapshot.region.pool.threads"; 086 /** # of threads for snapshotting regions on the rs. */ 087 public static final int SNAPSHOT_REQUEST_THREADS_DEFAULT = 10; 088 089 /** Conf key for max time to keep threads in snapshot request pool waiting */ 090 public static final String SNAPSHOT_TIMEOUT_MILLIS_KEY = "hbase.snapshot.region.timeout"; 091 /** Keep threads alive in request pool for max of 300 seconds */ 092 public static final long SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5 * 60000; 093 094 /** Conf key for millis between checks to see if snapshot completed or if there are errors*/ 095 public static final String SNAPSHOT_REQUEST_WAKE_MILLIS_KEY = "hbase.snapshot.region.wakefrequency"; 096 /** Default amount of time to check for errors while regions finish snapshotting */ 097 private static final long SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT = 500; 098 099 private RegionServerServices rss; 100 private ProcedureMemberRpcs memberRpcs; 101 private ProcedureMember member; 102 103 /** 104 * Exposed for testing. 105 * @param conf HBase configuration. 106 * @param parent parent running the snapshot handler 107 * @param memberRpc use specified memberRpc instance 108 * @param procMember use specified ProcedureMember 109 */ 110 RegionServerSnapshotManager(Configuration conf, HRegionServer parent, 111 ProcedureMemberRpcs memberRpc, ProcedureMember procMember) { 112 this.rss = parent; 113 this.memberRpcs = memberRpc; 114 this.member = procMember; 115 } 116 117 public RegionServerSnapshotManager() {} 118 119 /** 120 * Start accepting snapshot requests. 121 */ 122 @Override 123 public void start() { 124 LOG.debug("Start Snapshot Manager " + rss.getServerName().toString()); 125 this.memberRpcs.start(rss.getServerName().toString(), member); 126 } 127 128 /** 129 * Close <tt>this</tt> and all running snapshot tasks 130 * @param force forcefully stop all running tasks 131 * @throws IOException 132 */ 133 @Override 134 public void stop(boolean force) throws IOException { 135 String mode = force ? "abruptly" : "gracefully"; 136 LOG.info("Stopping RegionServerSnapshotManager " + mode + "."); 137 138 try { 139 this.member.close(); 140 } finally { 141 this.memberRpcs.close(); 142 } 143 } 144 145 /** 146 * If in a running state, creates the specified subprocedure for handling an online snapshot. 147 * 148 * Because this gets the local list of regions to snapshot and not the set the master had, 149 * there is a possibility of a race where regions may be missed. This detected by the master in 150 * the snapshot verification step. 151 * 152 * @param snapshot 153 * @return Subprocedure to submit to the ProcedureMemeber. 154 */ 155 public Subprocedure buildSubprocedure(SnapshotDescription snapshot) { 156 157 // don't run a snapshot if the parent is stop(ping) 158 if (rss.isStopping() || rss.isStopped()) { 159 throw new IllegalStateException("Can't start snapshot on RS: " + rss.getServerName() 160 + ", because stopping/stopped!"); 161 } 162 163 // check to see if this server is hosting any regions for the snapshots 164 // check to see if we have regions for the snapshot 165 List<HRegion> involvedRegions; 166 try { 167 involvedRegions = getRegionsToSnapshot(snapshot); 168 } catch (IOException e1) { 169 throw new IllegalStateException("Failed to figure out if we should handle a snapshot - " 170 + "something has gone awry with the online regions.", e1); 171 } 172 173 // We need to run the subprocedure even if we have no relevant regions. The coordinator 174 // expects participation in the procedure and without sending message the snapshot attempt 175 // will hang and fail. 176 177 LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table " 178 + snapshot.getTable() + " type " + snapshot.getType()); 179 ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName()); 180 Configuration conf = rss.getConfiguration(); 181 long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, 182 SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 183 long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, 184 SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT); 185 186 switch (snapshot.getType()) { 187 case FLUSH: 188 SnapshotSubprocedurePool taskManager = 189 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 190 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, 191 timeoutMillis, involvedRegions, snapshot, taskManager); 192 case SKIPFLUSH: 193 /* 194 * This is to take an online-snapshot without force a coordinated flush to prevent pause 195 * The snapshot type is defined inside the snapshot description. FlushSnapshotSubprocedure 196 * should be renamed to distributedSnapshotSubprocedure, and the flush() behavior can be 197 * turned on/off based on the flush type. 198 * To minimized the code change, class name is not changed. 199 */ 200 SnapshotSubprocedurePool taskManager2 = 201 new SnapshotSubprocedurePool(rss.getServerName().toString(), conf, rss); 202 return new FlushSnapshotSubprocedure(member, exnDispatcher, wakeMillis, 203 timeoutMillis, involvedRegions, snapshot, taskManager2); 204 205 default: 206 throw new UnsupportedOperationException("Unrecognized snapshot type:" + snapshot.getType()); 207 } 208 } 209 210 /** 211 * Determine if the snapshot should be handled on this server 212 * 213 * NOTE: This is racy -- the master expects a list of regionservers. 214 * This means if a region moves somewhere between the calls we'll miss some regions. 215 * For example, a region move during a snapshot could result in a region to be skipped or done 216 * twice. This is manageable because the {@link MasterSnapshotVerifier} will double check the 217 * region lists after the online portion of the snapshot completes and will explicitly fail the 218 * snapshot. 219 * 220 * @param snapshot 221 * @return the list of online regions. Empty list is returned if no regions are responsible for 222 * the given snapshot. 223 * @throws IOException 224 */ 225 private List<HRegion> getRegionsToSnapshot(SnapshotDescription snapshot) throws IOException { 226 List<HRegion> onlineRegions = (List<HRegion>) rss 227 .getRegions(TableName.valueOf(snapshot.getTable())); 228 Iterator<HRegion> iterator = onlineRegions.iterator(); 229 // remove the non-default regions 230 while (iterator.hasNext()) { 231 HRegion r = iterator.next(); 232 if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) { 233 iterator.remove(); 234 } 235 } 236 return onlineRegions; 237 } 238 239 /** 240 * Build the actual snapshot runner that will do all the 'hard' work 241 */ 242 public class SnapshotSubprocedureBuilder implements SubprocedureFactory { 243 244 @Override 245 public Subprocedure buildSubprocedure(String name, byte[] data) { 246 try { 247 // unwrap the snapshot information 248 SnapshotDescription snapshot = SnapshotDescription.parseFrom(data); 249 return RegionServerSnapshotManager.this.buildSubprocedure(snapshot); 250 } catch (IOException e) { 251 throw new IllegalArgumentException("Could not read snapshot information from request."); 252 } 253 } 254 255 } 256 257 /** 258 * We use the SnapshotSubprocedurePool, a class specific thread pool instead of 259 * {@link org.apache.hadoop.hbase.executor.ExecutorService}. 260 * 261 * It uses a {@link java.util.concurrent.ExecutorCompletionService} which provides queuing of 262 * completed tasks which lets us efficiently cancel pending tasks upon the earliest operation 263 * failures. 264 * 265 * HBase's ExecutorService (different from {@link java.util.concurrent.ExecutorService}) isn't 266 * really built for coordinated tasks where multiple threads as part of one larger task. In 267 * RS's the HBase Executor services are only used for open and close and not other threadpooled 268 * operations such as compactions and replication sinks. 269 */ 270 static class SnapshotSubprocedurePool { 271 private final Abortable abortable; 272 private final ExecutorCompletionService<Void> taskPool; 273 private final ThreadPoolExecutor executor; 274 private volatile boolean stopped; 275 private final List<Future<Void>> futures = new ArrayList<>(); 276 private final String name; 277 278 SnapshotSubprocedurePool(String name, Configuration conf, Abortable abortable) { 279 this.abortable = abortable; 280 // configure the executor service 281 long keepAlive = conf.getLong( 282 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_KEY, 283 RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 284 int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS); 285 this.name = name; 286 executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS, 287 new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-")); 288 taskPool = new ExecutorCompletionService<>(executor); 289 } 290 291 boolean hasTasks() { 292 return futures.size() != 0; 293 } 294 295 /** 296 * Submit a task to the pool. 297 * 298 * NOTE: all must be submitted before you can safely {@link #waitForOutstandingTasks()}. This 299 * version does not support issuing tasks from multiple concurrent table snapshots requests. 300 */ 301 void submitTask(final Callable<Void> task) { 302 Future<Void> f = this.taskPool.submit(task); 303 futures.add(f); 304 } 305 306 /** 307 * Wait for all of the currently outstanding tasks submitted via {@link #submitTask(Callable)}. 308 * This *must* be called after all tasks are submitted via submitTask. 309 * 310 * @return <tt>true</tt> on success, <tt>false</tt> otherwise 311 * @throws InterruptedException 312 * @throws SnapshotCreationException if the snapshot failed while we were waiting 313 */ 314 boolean waitForOutstandingTasks() throws ForeignException, InterruptedException { 315 LOG.debug("Waiting for local region snapshots to finish."); 316 317 int sz = futures.size(); 318 try { 319 // Using the completion service to process the futures that finish first first. 320 for (int i = 0; i < sz; i++) { 321 Future<Void> f = taskPool.take(); 322 f.get(); 323 if (!futures.remove(f)) { 324 LOG.warn("unexpected future" + f); 325 } 326 LOG.debug("Completed " + (i+1) + "/" + sz + " local region snapshots."); 327 } 328 LOG.debug("Completed " + sz + " local region snapshots."); 329 return true; 330 } catch (InterruptedException e) { 331 LOG.warn("Got InterruptedException in SnapshotSubprocedurePool", e); 332 if (!stopped) { 333 Thread.currentThread().interrupt(); 334 throw new ForeignException("SnapshotSubprocedurePool", e); 335 } 336 // we are stopped so we can just exit. 337 } catch (ExecutionException e) { 338 Throwable cause = e.getCause(); 339 if (cause instanceof ForeignException) { 340 LOG.warn("Rethrowing ForeignException from SnapshotSubprocedurePool", e); 341 throw (ForeignException)e.getCause(); 342 } else if (cause instanceof DroppedSnapshotException) { 343 // we have to abort the region server according to contract of flush 344 abortable.abort("Received DroppedSnapshotException, aborting", cause); 345 } 346 LOG.warn("Got Exception in SnapshotSubprocedurePool", e); 347 throw new ForeignException(name, e.getCause()); 348 } finally { 349 cancelTasks(); 350 } 351 return false; 352 } 353 354 /** 355 * This attempts to cancel out all pending and in progress tasks (interruptions issues) 356 * @throws InterruptedException 357 */ 358 void cancelTasks() throws InterruptedException { 359 Collection<Future<Void>> tasks = futures; 360 LOG.debug("cancelling " + tasks.size() + " tasks for snapshot " + name); 361 for (Future<Void> f: tasks) { 362 // TODO Ideally we'd interrupt hbase threads when we cancel. However it seems that there 363 // are places in the HBase code where row/region locks are taken and not released in a 364 // finally block. Thus we cancel without interrupting. Cancellations will be slower to 365 // complete but we won't suffer from unreleased locks due to poor code discipline. 366 f.cancel(false); 367 } 368 369 // evict remaining tasks and futures from taskPool. 370 futures.clear(); 371 while (taskPool.poll() != null) {} 372 stop(); 373 } 374 375 /** 376 * Abruptly shutdown the thread pool. Call when exiting a region server. 377 */ 378 void stop() { 379 if (this.stopped) return; 380 381 this.stopped = true; 382 this.executor.shutdown(); 383 } 384 } 385 386 /** 387 * Create a default snapshot handler - uses a zookeeper based member controller. 388 * @param rss region server running the handler 389 * @throws KeeperException if the zookeeper cluster cannot be reached 390 */ 391 @Override 392 public void initialize(RegionServerServices rss) throws KeeperException { 393 this.rss = rss; 394 ZKWatcher zkw = rss.getZooKeeper(); 395 this.memberRpcs = new ZKProcedureMemberRpcs(zkw, 396 SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION); 397 398 // read in the snapshot request configuration properties 399 Configuration conf = rss.getConfiguration(); 400 long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT); 401 int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT); 402 403 // create the actual snapshot procedure member 404 ThreadPoolExecutor pool = ProcedureMember.defaultPool(rss.getServerName().toString(), 405 opThreads, keepAlive); 406 this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder()); 407 } 408 409 @Override 410 public String getProcedureSignature() { 411 return SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION; 412 } 413 414}