001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.CallQueueTooBigException; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 030import org.apache.hadoop.hbase.master.MasterServices; 031import org.apache.hadoop.hbase.master.ServerListener; 032import org.apache.hadoop.hbase.master.ServerManager; 033import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 035import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 036import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 038import org.apache.hadoop.hbase.util.RetryCounter; 039import org.apache.hadoop.ipc.RemoteException; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 044import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 045import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 046import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 055 056/** 057 * A remote procecdure dispatcher for regionservers. 058 */ 059@InterfaceAudience.Private 060public class RSProcedureDispatcher 061 extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> 062 implements ServerListener { 063 private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class); 064 065 public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = 066 "hbase.regionserver.rpc.startup.waittime"; 067 private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; 068 069 protected final MasterServices master; 070 private final long rsStartupWaitTime; 071 private MasterProcedureEnv procedureEnv; 072 073 public RSProcedureDispatcher(final MasterServices master) { 074 super(master.getConfiguration()); 075 076 this.master = master; 077 this.rsStartupWaitTime = master.getConfiguration().getLong( 078 RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); 079 } 080 081 @Override 082 protected UncaughtExceptionHandler getUncaughtExceptionHandler() { 083 return new UncaughtExceptionHandler() { 084 085 @Override 086 public void uncaughtException(Thread t, Throwable e) { 087 LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e); 088 } 089 }; 090 } 091 092 @Override 093 public boolean start() { 094 if (!super.start()) { 095 return false; 096 } 097 if (master.isStopped()) { 098 LOG.debug("Stopped"); 099 return false; 100 } 101 // Around startup, if failed, some of the below may be set back to null so NPE is possible. 102 ServerManager sm = master.getServerManager(); 103 if (sm == null) { 104 LOG.debug("ServerManager is null"); 105 return false; 106 } 107 sm.registerListener(this); 108 ProcedureExecutor<MasterProcedureEnv> pe = master.getMasterProcedureExecutor(); 109 if (pe == null) { 110 LOG.debug("ProcedureExecutor is null"); 111 return false; 112 } 113 this.procedureEnv = pe.getEnvironment(); 114 if (this.procedureEnv == null) { 115 LOG.debug("ProcedureEnv is null; stopping={}", master.isStopping()); 116 return false; 117 } 118 try { 119 for (ServerName serverName : sm.getOnlineServersList()) { 120 addNode(serverName); 121 } 122 } catch (Exception e) { 123 LOG.info("Failed start", e); 124 return false; 125 } 126 return true; 127 } 128 129 @Override 130 public boolean stop() { 131 if (!super.stop()) { 132 return false; 133 } 134 135 master.getServerManager().unregisterListener(this); 136 return true; 137 } 138 139 @Override 140 protected void remoteDispatch(final ServerName serverName, 141 final Set<RemoteProcedure> remoteProcedures) { 142 if (!master.getServerManager().isServerOnline(serverName)) { 143 // fail fast 144 submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); 145 } else { 146 submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); 147 } 148 } 149 150 @Override 151 protected void abortPendingOperations(final ServerName serverName, 152 final Set<RemoteProcedure> operations) { 153 // TODO: Replace with a ServerNotOnlineException() 154 final IOException e = new DoNotRetryIOException("server not online " + serverName); 155 for (RemoteProcedure proc: operations) { 156 proc.remoteCallFailed(procedureEnv, serverName, e); 157 } 158 } 159 160 @Override 161 public void serverAdded(final ServerName serverName) { 162 addNode(serverName); 163 } 164 165 @Override 166 public void serverRemoved(final ServerName serverName) { 167 removeNode(serverName); 168 } 169 170 private interface RemoteProcedureResolver { 171 void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); 172 173 void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); 174 175 void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations); 176 } 177 178 /** 179 * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s 180 * from the given {@code remoteProcedures} and groups them by class of the returned operation. 181 * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and 182 * {@link RegionCloseOperation}s. 183 * @param serverName RegionServer to which the remote operations are sent 184 * @param operations Remote procedures which are dispatched to the given server 185 * @param resolver Used to dispatch remote procedures to given server. 186 */ 187 public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations, 188 RemoteProcedureResolver resolver) { 189 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 190 ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = 191 buildAndGroupRequestByType(env, serverName, operations); 192 193 List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); 194 if (!openOps.isEmpty()) { 195 resolver.dispatchOpenRequests(env, openOps); 196 } 197 198 List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); 199 if (!closeOps.isEmpty()) { 200 resolver.dispatchCloseRequests(env, closeOps); 201 } 202 203 List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class); 204 if (!refreshOps.isEmpty()) { 205 resolver.dispatchServerOperations(env, refreshOps); 206 } 207 208 if (!reqsByType.isEmpty()) { 209 LOG.warn("unknown request type in the queue: " + reqsByType); 210 } 211 } 212 213 private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall { 214 215 public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 216 super(serverName, remoteProcedures); 217 } 218 219 @Override 220 public void run() { 221 remoteCallFailed(procedureEnv, 222 new RegionServerStoppedException("Server " + getServerName() + " is not online")); 223 } 224 } 225 226 // ========================================================================== 227 // Compatibility calls 228 // ========================================================================== 229 protected class ExecuteProceduresRemoteCall implements RemoteProcedureResolver, Runnable { 230 231 private final ServerName serverName; 232 233 private final Set<RemoteProcedure> remoteProcedures; 234 235 private int numberOfAttemptsSoFar = 0; 236 private long maxWaitTime = -1; 237 238 private final long rsRpcRetryInterval; 239 private static final String RS_RPC_RETRY_INTERVAL_CONF_KEY = 240 "hbase.regionserver.rpc.retry.interval"; 241 private static final int DEFAULT_RS_RPC_RETRY_INTERVAL = 100; 242 243 private ExecuteProceduresRequest.Builder request = null; 244 245 public ExecuteProceduresRemoteCall(final ServerName serverName, 246 final Set<RemoteProcedure> remoteProcedures) { 247 this.serverName = serverName; 248 this.remoteProcedures = remoteProcedures; 249 this.rsRpcRetryInterval = master.getConfiguration().getLong(RS_RPC_RETRY_INTERVAL_CONF_KEY, 250 DEFAULT_RS_RPC_RETRY_INTERVAL); 251 } 252 253 private AdminService.BlockingInterface getRsAdmin() throws IOException { 254 final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); 255 if (admin == null) { 256 throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + 257 " failed because no RPC connection found to this server"); 258 } 259 return admin; 260 } 261 262 protected final ServerName getServerName() { 263 return serverName; 264 } 265 266 private boolean scheduleForRetry(IOException e) { 267 LOG.debug("Request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); 268 // Should we wait a little before retrying? If the server is starting it's yes. 269 if (e instanceof ServerNotRunningYetException) { 270 long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime(); 271 if (remainingTime > 0) { 272 LOG.warn("Waiting a little before retrying {}, try={}, can wait up to {}ms", 273 serverName, numberOfAttemptsSoFar, remainingTime); 274 numberOfAttemptsSoFar++; 275 // Retry every rsRpcRetryInterval millis up to maximum wait time. 276 submitTask(this, rsRpcRetryInterval, TimeUnit.MILLISECONDS); 277 return true; 278 } 279 LOG.warn("{} is throwing ServerNotRunningYetException for {}ms; trying another server", 280 serverName, getMaxWaitTime()); 281 return false; 282 } 283 if (e instanceof DoNotRetryIOException) { 284 LOG.warn("{} tells us DoNotRetry due to {}, try={}, give up", serverName, 285 e.toString(), numberOfAttemptsSoFar); 286 return false; 287 } 288 // This exception is thrown in the rpc framework, where we can make sure that the call has not 289 // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd 290 // better choose another region server. 291 // Notice that, it is safe to quit only if this is the first time we send request to region 292 // server. Maybe the region server has accepted our request the first time, and then there is 293 // a network error which prevents we receive the response, and the second time we hit a 294 // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a 295 // double assign... 296 if (e instanceof CallQueueTooBigException && numberOfAttemptsSoFar == 0) { 297 LOG.warn("request to {} failed due to {}, try={}, this usually because" + 298 " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); 299 return false; 300 } 301 // Always retry for other exception types if the region server is not dead yet. 302 if (!master.getServerManager().isServerOnline(serverName)) { 303 LOG.warn("Request to {} failed due to {}, try={} and the server is not online, give up", 304 serverName, e.toString(), numberOfAttemptsSoFar); 305 return false; 306 } 307 if (e instanceof RegionServerAbortedException || e instanceof RegionServerStoppedException) { 308 // A better way is to return true here to let the upper layer quit, and then schedule a 309 // background task to check whether the region server is dead. And if it is dead, call 310 // remoteCallFailed to tell the upper layer. Keep retrying here does not lead to incorrect 311 // result, but waste some resources. 312 LOG.warn("{} is aborted or stopped, for safety we still need to" + 313 " wait until it is fully dead, try={}", serverName, numberOfAttemptsSoFar); 314 } else { 315 LOG.warn("request to {} failed due to {}, try={}, retrying...", serverName, 316 e.toString(), numberOfAttemptsSoFar); 317 } 318 numberOfAttemptsSoFar++; 319 // Add some backoff here as the attempts rise otherwise if a stuck condition, will fill logs 320 // with failed attempts. None of our backoff classes -- RetryCounter or ClientBackoffPolicy 321 // -- fit here nicely so just do something simple; increment by rsRpcRetryInterval millis * 322 // retry^2 on each try 323 // up to max of 10 seconds (don't want to back off too much in case of situation change). 324 submitTask(this, 325 Math.min(rsRpcRetryInterval * (this.numberOfAttemptsSoFar * this.numberOfAttemptsSoFar), 326 10 * 1000), 327 TimeUnit.MILLISECONDS); 328 return true; 329 } 330 331 private long getMaxWaitTime() { 332 if (this.maxWaitTime < 0) { 333 // This is the max attempts, not retries, so it should be at least 1. 334 this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; 335 } 336 return this.maxWaitTime; 337 } 338 339 private IOException unwrapException(IOException e) { 340 if (e instanceof RemoteException) { 341 e = ((RemoteException)e).unwrapRemoteException(); 342 } 343 return e; 344 } 345 346 @Override 347 public void run() { 348 request = ExecuteProceduresRequest.newBuilder(); 349 if (LOG.isTraceEnabled()) { 350 LOG.trace("Building request with operations count=" + remoteProcedures.size()); 351 } 352 splitAndResolveOperation(getServerName(), remoteProcedures, this); 353 354 try { 355 sendRequest(getServerName(), request.build()); 356 } catch (IOException e) { 357 e = unwrapException(e); 358 // TODO: In the future some operation may want to bail out early. 359 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 360 if (!scheduleForRetry(e)) { 361 remoteCallFailed(procedureEnv, e); 362 } 363 } 364 } 365 366 @Override 367 public void dispatchOpenRequests(final MasterProcedureEnv env, 368 final List<RegionOpenOperation> operations) { 369 request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); 370 } 371 372 @Override 373 public void dispatchCloseRequests(final MasterProcedureEnv env, 374 final List<RegionCloseOperation> operations) { 375 for (RegionCloseOperation op: operations) { 376 request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); 377 } 378 } 379 380 @Override 381 public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) { 382 operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); 383 } 384 385 // will be overridden in test. 386 @VisibleForTesting 387 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 388 final ExecuteProceduresRequest request) throws IOException { 389 try { 390 return getRsAdmin().executeProcedures(null, request); 391 } catch (ServiceException se) { 392 throw ProtobufUtil.getRemoteException(se); 393 } 394 } 395 396 protected final void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 397 for (RemoteProcedure proc : remoteProcedures) { 398 proc.remoteCallFailed(env, getServerName(), e); 399 } 400 } 401 } 402 403 private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, 404 final ServerName serverName, final List<RegionOpenOperation> operations) { 405 final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); 406 builder.setServerStartCode(serverName.getStartcode()); 407 builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); 408 for (RegionOpenOperation op: operations) { 409 builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); 410 } 411 return builder.build(); 412 } 413 414 // ========================================================================== 415 // RPC Messages 416 // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) 417 // - RegionOperation: open, close, flush, snapshot, ... 418 // ========================================================================== 419 420 public static final class ServerOperation extends RemoteOperation { 421 422 private final long procId; 423 424 private final Class<?> rsProcClass; 425 426 private final byte[] rsProcData; 427 428 public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass, 429 byte[] rsProcData) { 430 super(remoteProcedure); 431 this.procId = procId; 432 this.rsProcClass = rsProcClass; 433 this.rsProcData = rsProcData; 434 } 435 436 public RemoteProcedureRequest buildRequest() { 437 return RemoteProcedureRequest.newBuilder().setProcId(procId) 438 .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build(); 439 } 440 } 441 442 public static abstract class RegionOperation extends RemoteOperation { 443 protected final RegionInfo regionInfo; 444 protected final long procId; 445 446 protected RegionOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId) { 447 super(remoteProcedure); 448 this.regionInfo = regionInfo; 449 this.procId = procId; 450 } 451 } 452 453 public static class RegionOpenOperation extends RegionOperation { 454 455 public RegionOpenOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, 456 long procId) { 457 super(remoteProcedure, regionInfo, procId); 458 } 459 460 public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( 461 final MasterProcedureEnv env) { 462 return RequestConverter.buildRegionOpenInfo(regionInfo, 463 env.getAssignmentManager().getFavoredNodes(regionInfo), procId); 464 } 465 } 466 467 public static class RegionCloseOperation extends RegionOperation { 468 private final ServerName destinationServer; 469 470 public RegionCloseOperation(RemoteProcedure remoteProcedure, RegionInfo regionInfo, long procId, 471 ServerName destinationServer) { 472 super(remoteProcedure, regionInfo, procId); 473 this.destinationServer = destinationServer; 474 } 475 476 public ServerName getDestinationServer() { 477 return destinationServer; 478 } 479 480 public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { 481 return ProtobufUtil.buildCloseRegionRequest(serverName, regionInfo.getRegionName(), 482 getDestinationServer(), procId); 483 } 484 } 485}