001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import java.io.IOException; 021import java.lang.Thread.UncaughtExceptionHandler; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.hbase.CallQueueTooBigException; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 031import org.apache.hadoop.hbase.master.MasterServices; 032import org.apache.hadoop.hbase.master.ServerListener; 033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 042import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 043import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 044import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 046 047import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 048import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; 057 058/** 059 * A remote procecdure dispatcher for regionservers. 060 */ 061@InterfaceAudience.Private 062public class RSProcedureDispatcher 063 extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> 064 implements ServerListener { 065 private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class); 066 067 public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = 068 "hbase.regionserver.rpc.startup.waittime"; 069 private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; 070 071 private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0 072 073 protected final MasterServices master; 074 private final long rsStartupWaitTime; 075 private MasterProcedureEnv procedureEnv; 076 077 public RSProcedureDispatcher(final MasterServices master) { 078 super(master.getConfiguration()); 079 080 this.master = master; 081 this.rsStartupWaitTime = master.getConfiguration().getLong( 082 RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); 083 } 084 085 @Override 086 protected UncaughtExceptionHandler getUncaughtExceptionHandler() { 087 return new UncaughtExceptionHandler() { 088 089 @Override 090 public void uncaughtException(Thread t, Throwable e) { 091 LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e); 092 } 093 }; 094 } 095 096 @Override 097 public boolean start() { 098 if (!super.start()) { 099 return false; 100 } 101 102 master.getServerManager().registerListener(this); 103 procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); 104 for (ServerName serverName: master.getServerManager().getOnlineServersList()) { 105 addNode(serverName); 106 } 107 return true; 108 } 109 110 @Override 111 public boolean stop() { 112 if (!super.stop()) { 113 return false; 114 } 115 116 master.getServerManager().unregisterListener(this); 117 return true; 118 } 119 120 @Override 121 protected void remoteDispatch(final ServerName serverName, 122 final Set<RemoteProcedure> remoteProcedures) { 123 final int rsVersion = master.getServerManager().getVersionNumber(serverName); 124 if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) { 125 submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); 126 } else { 127 submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); 128 } 129 } 130 131 @Override 132 protected void abortPendingOperations(final ServerName serverName, 133 final Set<RemoteProcedure> operations) { 134 // TODO: Replace with a ServerNotOnlineException() 135 final IOException e = new DoNotRetryIOException("server not online " + serverName); 136 for (RemoteProcedure proc: operations) { 137 proc.remoteCallFailed(procedureEnv, serverName, e); 138 } 139 } 140 141 @Override 142 public void serverAdded(final ServerName serverName) { 143 addNode(serverName); 144 } 145 146 @Override 147 public void serverRemoved(final ServerName serverName) { 148 removeNode(serverName); 149 } 150 151 /** 152 * Base remote call 153 */ 154 protected abstract class AbstractRSRemoteCall implements Runnable { 155 156 private final ServerName serverName; 157 158 private int numberOfAttemptsSoFar = 0; 159 private long maxWaitTime = -1; 160 161 public AbstractRSRemoteCall(final ServerName serverName) { 162 this.serverName = serverName; 163 } 164 165 protected AdminService.BlockingInterface getRsAdmin() throws IOException { 166 final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); 167 if (admin == null) { 168 throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + 169 " failed because no RPC connection found to this server"); 170 } 171 return admin; 172 } 173 174 protected ServerName getServerName() { 175 return serverName; 176 } 177 178 protected boolean scheduleForRetry(final IOException e) { 179 LOG.debug("request to {} failed, try={}", serverName, numberOfAttemptsSoFar, e); 180 // Should we wait a little before retrying? If the server is starting it's yes. 181 if (e instanceof ServerNotRunningYetException) { 182 long remainingTime = getMaxWaitTime() - EnvironmentEdgeManager.currentTime(); 183 if (remainingTime > 0) { 184 LOG.warn("waiting a little before trying on the same server={}," + 185 " try={}, can wait up to {}ms", serverName, numberOfAttemptsSoFar, remainingTime); 186 numberOfAttemptsSoFar++; 187 submitTask(this, 100, TimeUnit.MILLISECONDS); 188 return true; 189 } 190 LOG.warn("server {} is not up for a while; try a new one", serverName); 191 return false; 192 } 193 194 boolean queueFull = e instanceof CallQueueTooBigException; 195 // this exception is thrown in the rpc framework, where we can make sure that the call has not 196 // been executed yet, so it is safe to mark it as fail. Especially for open a region, we'd 197 // better choose another region server 198 // notice that, it is safe to quit only if this is the first time we send request to region 199 // server. Maybe the region server has accept our request the first time, and then there is a 200 // network error which prevents we receive the response, and the second time we hit a 201 // CallQueueTooBigException, obviously it is not safe to quit here, otherwise it may lead to a 202 // double assign.. 203 if (queueFull && numberOfAttemptsSoFar == 0) { 204 LOG.warn("request to {} failed due to {}, try={}, this usually because" + 205 " server is overloaded, give up", serverName, e.toString(), numberOfAttemptsSoFar); 206 return false; 207 } 208 // In case it is a connection exception and the region server is still online, the openRegion 209 // RPC could have been accepted by the server and just the response didn't go through. So we 210 // will retry to open the region on the same server. 211 if ((queueFull || ClientExceptionsUtil.isConnectionException(e)) && 212 master.getServerManager().isServerOnline(serverName)) { 213 // we want to retry as many times as needed as long as the RS is not dead. 214 LOG.debug("Retrying to same RegionServer {} because: {}", serverName, e.getMessage()); 215 numberOfAttemptsSoFar++; 216 submitTask(this, 100, TimeUnit.MILLISECONDS); 217 return true; 218 } 219 // trying to send the request elsewhere instead 220 LOG.warn("Failed dispatch to server={} try={}", serverName, numberOfAttemptsSoFar, e); 221 return false; 222 } 223 224 private long getMaxWaitTime() { 225 if (this.maxWaitTime < 0) { 226 // This is the max attempts, not retries, so it should be at least 1. 227 this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; 228 } 229 return this.maxWaitTime; 230 } 231 232 protected IOException unwrapException(IOException e) { 233 if (e instanceof RemoteException) { 234 e = ((RemoteException)e).unwrapRemoteException(); 235 } 236 return e; 237 } 238 } 239 240 private interface RemoteProcedureResolver { 241 void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); 242 243 void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); 244 245 void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations); 246 } 247 248 /** 249 * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s 250 * from the given {@code remoteProcedures} and groups them by class of the returned operation. 251 * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and 252 * {@link RegionCloseOperation}s. 253 * @param serverName RegionServer to which the remote operations are sent 254 * @param operations Remote procedures which are dispatched to the given server 255 * @param resolver Used to dispatch remote procedures to given server. 256 */ 257 public void splitAndResolveOperation(ServerName serverName, Set<RemoteProcedure> operations, 258 RemoteProcedureResolver resolver) { 259 MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); 260 ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = 261 buildAndGroupRequestByType(env, serverName, operations); 262 263 List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); 264 if (!openOps.isEmpty()) { 265 resolver.dispatchOpenRequests(env, openOps); 266 } 267 268 List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); 269 if (!closeOps.isEmpty()) { 270 resolver.dispatchCloseRequests(env, closeOps); 271 } 272 273 List<ServerOperation> refreshOps = fetchType(reqsByType, ServerOperation.class); 274 if (!refreshOps.isEmpty()) { 275 resolver.dispatchServerOperations(env, refreshOps); 276 } 277 278 if (!reqsByType.isEmpty()) { 279 LOG.warn("unknown request type in the queue: " + reqsByType); 280 } 281 } 282 283 private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall { 284 285 public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 286 super(serverName, remoteProcedures); 287 } 288 289 @Override 290 public void run() { 291 remoteCallFailed(procedureEnv, 292 new RegionServerStoppedException("Server " + getServerName() + " is not online")); 293 } 294 } 295 296 // ========================================================================== 297 // Compatibility calls 298 // ========================================================================== 299 protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall 300 implements RemoteProcedureResolver { 301 protected final Set<RemoteProcedure> remoteProcedures; 302 303 protected ExecuteProceduresRequest.Builder request = null; 304 305 public ExecuteProceduresRemoteCall(final ServerName serverName, 306 final Set<RemoteProcedure> remoteProcedures) { 307 super(serverName); 308 this.remoteProcedures = remoteProcedures; 309 } 310 311 @Override 312 public void run() { 313 request = ExecuteProceduresRequest.newBuilder(); 314 if (LOG.isTraceEnabled()) { 315 LOG.trace("Building request with operations count=" + remoteProcedures.size()); 316 } 317 splitAndResolveOperation(getServerName(), remoteProcedures, this); 318 319 try { 320 sendRequest(getServerName(), request.build()); 321 } catch (IOException e) { 322 e = unwrapException(e); 323 // TODO: In the future some operation may want to bail out early. 324 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 325 if (!scheduleForRetry(e)) { 326 remoteCallFailed(procedureEnv, e); 327 } 328 } 329 } 330 331 @Override 332 public void dispatchOpenRequests(final MasterProcedureEnv env, 333 final List<RegionOpenOperation> operations) { 334 submitTask(new OpenRegionRemoteCall(getServerName(), operations)); 335 } 336 337 @Override 338 public void dispatchCloseRequests(final MasterProcedureEnv env, 339 final List<RegionCloseOperation> operations) { 340 for (RegionCloseOperation op: operations) { 341 submitTask(new CloseRegionRemoteCall(getServerName(), op)); 342 } 343 } 344 345 @Override 346 public void dispatchServerOperations(MasterProcedureEnv env, List<ServerOperation> operations) { 347 operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); 348 } 349 350 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 351 final ExecuteProceduresRequest request) throws IOException { 352 try { 353 return getRsAdmin().executeProcedures(null, request); 354 } catch (ServiceException se) { 355 throw ProtobufUtil.getRemoteException(se); 356 } 357 } 358 359 protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 360 for (RemoteProcedure proc : remoteProcedures) { 361 proc.remoteCallFailed(env, getServerName(), e); 362 } 363 } 364 } 365 366 protected static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, 367 final ServerName serverName, final List<RegionOpenOperation> operations) { 368 final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); 369 builder.setServerStartCode(serverName.getStartcode()); 370 builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); 371 for (RegionOpenOperation op: operations) { 372 builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); 373 } 374 return builder.build(); 375 } 376 377 // ========================================================================== 378 // Compatibility calls 379 // Since we don't have a "batch proc-exec" request on the target RS 380 // we have to chunk the requests by type and dispatch the specific request. 381 // ========================================================================== 382 /** 383 * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old 384 * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc. 385 */ 386 private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { 387 private final List<RegionOpenOperation> operations; 388 389 public OpenRegionRemoteCall(final ServerName serverName, 390 final List<RegionOpenOperation> operations) { 391 super(serverName); 392 this.operations = operations; 393 } 394 395 @Override 396 public void run() { 397 final OpenRegionRequest request = 398 buildOpenRegionRequest(procedureEnv, getServerName(), operations); 399 400 try { 401 sendRequest(getServerName(), request); 402 } catch (IOException e) { 403 e = unwrapException(e); 404 // TODO: In the future some operation may want to bail out early. 405 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 406 if (!scheduleForRetry(e)) { 407 remoteCallFailed(procedureEnv, e); 408 } 409 } 410 } 411 412 private OpenRegionResponse sendRequest(final ServerName serverName, 413 final OpenRegionRequest request) throws IOException { 414 try { 415 return getRsAdmin().openRegion(null, request); 416 } catch (ServiceException se) { 417 throw ProtobufUtil.getRemoteException(se); 418 } 419 } 420 421 private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 422 for (RegionOpenOperation op: operations) { 423 op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); 424 } 425 } 426 } 427 428 /** 429 * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old 430 * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc. 431 */ 432 private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { 433 private final RegionCloseOperation operation; 434 435 public CloseRegionRemoteCall(final ServerName serverName, 436 final RegionCloseOperation operation) { 437 super(serverName); 438 this.operation = operation; 439 } 440 441 @Override 442 public void run() { 443 final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); 444 try { 445 CloseRegionResponse response = sendRequest(getServerName(), request); 446 remoteCallCompleted(procedureEnv, response); 447 } catch (IOException e) { 448 e = unwrapException(e); 449 // TODO: In the future some operation may want to bail out early. 450 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 451 if (!scheduleForRetry(e)) { 452 remoteCallFailed(procedureEnv, e); 453 } 454 } 455 } 456 457 private CloseRegionResponse sendRequest(final ServerName serverName, 458 final CloseRegionRequest request) throws IOException { 459 try { 460 return getRsAdmin().closeRegion(null, request); 461 } catch (ServiceException se) { 462 throw ProtobufUtil.getRemoteException(se); 463 } 464 } 465 466 private void remoteCallCompleted(final MasterProcedureEnv env, 467 final CloseRegionResponse response) { 468 operation.setClosed(response.getClosed()); 469 } 470 471 private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 472 operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); 473 } 474 } 475 476 // ========================================================================== 477 // RPC Messages 478 // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) 479 // - RegionOperation: open, close, flush, snapshot, ... 480 // ========================================================================== 481 482 public static final class ServerOperation extends RemoteOperation { 483 484 private final long procId; 485 486 private final Class<?> rsProcClass; 487 488 private final byte[] rsProcData; 489 490 public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class<?> rsProcClass, 491 byte[] rsProcData) { 492 super(remoteProcedure); 493 this.procId = procId; 494 this.rsProcClass = rsProcClass; 495 this.rsProcData = rsProcData; 496 } 497 498 public RemoteProcedureRequest buildRequest() { 499 return RemoteProcedureRequest.newBuilder().setProcId(procId) 500 .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build(); 501 } 502 } 503 504 public static abstract class RegionOperation extends RemoteOperation { 505 private final RegionInfo regionInfo; 506 507 protected RegionOperation(final RemoteProcedure remoteProcedure, 508 final RegionInfo regionInfo) { 509 super(remoteProcedure); 510 this.regionInfo = regionInfo; 511 } 512 513 public RegionInfo getRegionInfo() { 514 return this.regionInfo; 515 } 516 } 517 518 public static class RegionOpenOperation extends RegionOperation { 519 private final List<ServerName> favoredNodes; 520 private final boolean openForReplay; 521 private boolean failedOpen; 522 523 public RegionOpenOperation(final RemoteProcedure remoteProcedure, 524 final RegionInfo regionInfo, final List<ServerName> favoredNodes, 525 final boolean openForReplay) { 526 super(remoteProcedure, regionInfo); 527 this.favoredNodes = favoredNodes; 528 this.openForReplay = openForReplay; 529 } 530 531 protected void setFailedOpen(final boolean failedOpen) { 532 this.failedOpen = failedOpen; 533 } 534 535 public boolean isFailedOpen() { 536 return failedOpen; 537 } 538 539 public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( 540 final MasterProcedureEnv env) { 541 return RequestConverter.buildRegionOpenInfo(getRegionInfo(), 542 env.getAssignmentManager().getFavoredNodes(getRegionInfo())); 543 } 544 } 545 546 public static class RegionCloseOperation extends RegionOperation { 547 private final ServerName destinationServer; 548 private boolean closed = false; 549 550 public RegionCloseOperation(final RemoteProcedure remoteProcedure, 551 final RegionInfo regionInfo, final ServerName destinationServer) { 552 super(remoteProcedure, regionInfo); 553 this.destinationServer = destinationServer; 554 } 555 556 public ServerName getDestinationServer() { 557 return destinationServer; 558 } 559 560 protected void setClosed(final boolean closed) { 561 this.closed = closed; 562 } 563 564 public boolean isClosed() { 565 return closed; 566 } 567 568 public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { 569 return ProtobufUtil.buildCloseRegionRequest(serverName, 570 getRegionInfo().getRegionName(), getDestinationServer()); 571 } 572 } 573}