001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import java.io.IOException; 022import java.lang.Thread.UncaughtExceptionHandler; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.hbase.DoNotRetryIOException; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.RegionInfo; 029import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; 030import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 031import org.apache.hadoop.hbase.master.MasterServices; 032import org.apache.hadoop.hbase.master.ServerListener; 033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; 034import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.hadoop.ipc.RemoteException; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap; 042import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 043import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 044import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 045 046import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 047import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 055 056/** 057 * A remote procecdure dispatcher for regionservers. 058 */ 059@InterfaceAudience.Private 060public class RSProcedureDispatcher 061 extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> 062 implements ServerListener { 063 private static final Logger LOG = LoggerFactory.getLogger(RSProcedureDispatcher.class); 064 065 public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = 066 "hbase.regionserver.rpc.startup.waittime"; 067 private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; 068 069 private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0200000; // 2.0 070 071 protected final MasterServices master; 072 private final long rsStartupWaitTime; 073 private MasterProcedureEnv procedureEnv; 074 075 public RSProcedureDispatcher(final MasterServices master) { 076 super(master.getConfiguration()); 077 078 this.master = master; 079 this.rsStartupWaitTime = master.getConfiguration().getLong( 080 RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); 081 } 082 083 @Override 084 protected UncaughtExceptionHandler getUncaughtExceptionHandler() { 085 return new UncaughtExceptionHandler() { 086 087 @Override 088 public void uncaughtException(Thread t, Throwable e) { 089 LOG.error("Unexpected error caught, this may cause the procedure to hang forever", e); 090 } 091 }; 092 } 093 094 @Override 095 public boolean start() { 096 if (!super.start()) { 097 return false; 098 } 099 100 master.getServerManager().registerListener(this); 101 procedureEnv = master.getMasterProcedureExecutor().getEnvironment(); 102 for (ServerName serverName: master.getServerManager().getOnlineServersList()) { 103 addNode(serverName); 104 } 105 return true; 106 } 107 108 @Override 109 public boolean stop() { 110 if (!super.stop()) { 111 return false; 112 } 113 114 master.getServerManager().unregisterListener(this); 115 return true; 116 } 117 118 @Override 119 protected void remoteDispatch(final ServerName serverName, 120 final Set<RemoteProcedure> remoteProcedures) { 121 final int rsVersion = master.getServerManager().getServerVersion(serverName); 122 if (rsVersion == 0 && !master.getServerManager().isServerOnline(serverName)) { 123 submitTask(new DeadRSRemoteCall(serverName, remoteProcedures)); 124 } else { 125 submitTask(new ExecuteProceduresRemoteCall(serverName, remoteProcedures)); 126 } 127 } 128 129 @Override 130 protected void abortPendingOperations(final ServerName serverName, 131 final Set<RemoteProcedure> operations) { 132 // TODO: Replace with a ServerNotOnlineException() 133 final IOException e = new DoNotRetryIOException("server not online " + serverName); 134 for (RemoteProcedure proc: operations) { 135 proc.remoteCallFailed(procedureEnv, serverName, e); 136 } 137 } 138 139 @Override 140 public void serverAdded(final ServerName serverName) { 141 addNode(serverName); 142 } 143 144 @Override 145 public void serverRemoved(final ServerName serverName) { 146 removeNode(serverName); 147 } 148 149 /** 150 * Base remote call 151 */ 152 protected abstract class AbstractRSRemoteCall implements Runnable { 153 154 private final ServerName serverName; 155 156 private int numberOfAttemptsSoFar = 0; 157 private long maxWaitTime = -1; 158 159 public AbstractRSRemoteCall(final ServerName serverName) { 160 this.serverName = serverName; 161 } 162 163 protected AdminService.BlockingInterface getRsAdmin() throws IOException { 164 final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); 165 if (admin == null) { 166 throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + 167 " failed because no RPC connection found to this server"); 168 } 169 return admin; 170 } 171 172 protected ServerName getServerName() { 173 return serverName; 174 } 175 176 protected boolean scheduleForRetry(final IOException e) { 177 // Should we wait a little before retrying? If the server is starting it's yes. 178 final boolean hold = (e instanceof ServerNotRunningYetException); 179 if (hold) { 180 LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", 181 serverName, numberOfAttemptsSoFar), e); 182 long now = EnvironmentEdgeManager.currentTime(); 183 if (now < getMaxWaitTime()) { 184 if (LOG.isDebugEnabled()) { 185 LOG.debug(String.format("server is not yet up; waiting up to %dms", 186 (getMaxWaitTime() - now)), e); 187 } 188 submitTask(this, 100, TimeUnit.MILLISECONDS); 189 return true; 190 } 191 192 LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); 193 return false; 194 } 195 196 // In case it is a connection exception and the region server is still online, 197 // the openRegion RPC could have been accepted by the server and 198 // just the response didn't go through. So we will retry to 199 // open the region on the same server. 200 final boolean retry = !hold && (ClientExceptionsUtil.isConnectionException(e) 201 && master.getServerManager().isServerOnline(serverName)); 202 if (retry) { 203 // we want to retry as many times as needed as long as the RS is not dead. 204 if (LOG.isDebugEnabled()) { 205 LOG.debug(String.format("Retrying to same RegionServer %s because: %s", 206 serverName, e.getMessage()), e); 207 } 208 submitTask(this, 100, TimeUnit.MILLISECONDS); 209 return true; 210 } 211 // trying to send the request elsewhere instead 212 LOG.warn(String.format("Failed dispatch to server=%s try=%d", 213 serverName, numberOfAttemptsSoFar), e); 214 return false; 215 } 216 217 private long getMaxWaitTime() { 218 if (this.maxWaitTime < 0) { 219 // This is the max attempts, not retries, so it should be at least 1. 220 this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; 221 } 222 return this.maxWaitTime; 223 } 224 225 protected IOException unwrapException(IOException e) { 226 if (e instanceof RemoteException) { 227 e = ((RemoteException)e).unwrapRemoteException(); 228 } 229 return e; 230 } 231 } 232 233 private interface RemoteProcedureResolver { 234 void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); 235 void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); 236 } 237 238 /** 239 * Fetches {@link org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation}s 240 * from the given {@code remoteProcedures} and groups them by class of the returned operation. 241 * Then {@code resolver} is used to dispatch {@link RegionOpenOperation}s and 242 * {@link RegionCloseOperation}s. 243 * @param serverName RegionServer to which the remote operations are sent 244 * @param remoteProcedures Remote procedures which are dispatched to the given server 245 * @param resolver Used to dispatch remote procedures to given server. 246 */ 247 public void splitAndResolveOperation(final ServerName serverName, 248 final Set<RemoteProcedure> remoteProcedures, final RemoteProcedureResolver resolver) { 249 final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = 250 buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures); 251 252 final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); 253 if (!openOps.isEmpty()) { 254 resolver.dispatchOpenRequests(procedureEnv, openOps); 255 } 256 257 final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); 258 if (!closeOps.isEmpty()) { 259 resolver.dispatchCloseRequests(procedureEnv, closeOps); 260 } 261 262 if (!reqsByType.isEmpty()) { 263 LOG.warn("unknown request type in the queue: " + reqsByType); 264 } 265 } 266 267 private class DeadRSRemoteCall extends ExecuteProceduresRemoteCall { 268 269 public DeadRSRemoteCall(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 270 super(serverName, remoteProcedures); 271 } 272 273 @Override 274 public void run() { 275 remoteCallFailed(procedureEnv, 276 new RegionServerStoppedException("Server " + getServerName() + " is not online")); 277 } 278 } 279 280 // ========================================================================== 281 // Compatibility calls 282 // ========================================================================== 283 protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall 284 implements RemoteProcedureResolver { 285 protected final Set<RemoteProcedure> remoteProcedures; 286 287 protected ExecuteProceduresRequest.Builder request = null; 288 289 public ExecuteProceduresRemoteCall(final ServerName serverName, 290 final Set<RemoteProcedure> remoteProcedures) { 291 super(serverName); 292 this.remoteProcedures = remoteProcedures; 293 } 294 295 @Override 296 public void run() { 297 request = ExecuteProceduresRequest.newBuilder(); 298 if (LOG.isTraceEnabled()) { 299 LOG.trace("Building request with operations count=" + remoteProcedures.size()); 300 } 301 splitAndResolveOperation(getServerName(), remoteProcedures, this); 302 303 try { 304 final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); 305 remoteCallCompleted(procedureEnv, response); 306 } catch (IOException e) { 307 e = unwrapException(e); 308 // TODO: In the future some operation may want to bail out early. 309 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 310 if (!scheduleForRetry(e)) { 311 remoteCallFailed(procedureEnv, e); 312 } 313 } 314 } 315 316 @Override 317 public void dispatchOpenRequests(final MasterProcedureEnv env, 318 final List<RegionOpenOperation> operations) { 319 submitTask(new OpenRegionRemoteCall(getServerName(), operations)); 320 } 321 322 @Override 323 public void dispatchCloseRequests(final MasterProcedureEnv env, 324 final List<RegionCloseOperation> operations) { 325 for (RegionCloseOperation op: operations) { 326 submitTask(new CloseRegionRemoteCall(getServerName(), op)); 327 } 328 } 329 330 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 331 final ExecuteProceduresRequest request) throws IOException { 332 try { 333 return getRsAdmin().executeProcedures(null, request); 334 } catch (ServiceException se) { 335 throw ProtobufUtil.getRemoteException(se); 336 } 337 } 338 339 340 private void remoteCallCompleted(final MasterProcedureEnv env, 341 final ExecuteProceduresResponse response) { 342 /* 343 for (RemoteProcedure proc: operations) { 344 proc.remoteCallCompleted(env, getServerName(), response); 345 }*/ 346 } 347 348 protected void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 349 for (RemoteProcedure proc: remoteProcedures) { 350 proc.remoteCallFailed(env, getServerName(), e); 351 } 352 } 353 } 354 355 protected static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, 356 final ServerName serverName, final List<RegionOpenOperation> operations) { 357 final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); 358 builder.setServerStartCode(serverName.getStartcode()); 359 builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); 360 for (RegionOpenOperation op: operations) { 361 builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); 362 } 363 return builder.build(); 364 } 365 366 // ========================================================================== 367 // Compatibility calls 368 // Since we don't have a "batch proc-exec" request on the target RS 369 // we have to chunk the requests by type and dispatch the specific request. 370 // ========================================================================== 371 /** 372 * Compatibility class used by {@link CompatRemoteProcedureResolver} to open regions using old 373 * {@link AdminService#openRegion(RpcController, OpenRegionRequest, RpcCallback)} rpc. 374 */ 375 private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { 376 private final List<RegionOpenOperation> operations; 377 378 public OpenRegionRemoteCall(final ServerName serverName, 379 final List<RegionOpenOperation> operations) { 380 super(serverName); 381 this.operations = operations; 382 } 383 384 @Override 385 public void run() { 386 final OpenRegionRequest request = 387 buildOpenRegionRequest(procedureEnv, getServerName(), operations); 388 389 try { 390 OpenRegionResponse response = sendRequest(getServerName(), request); 391 remoteCallCompleted(procedureEnv, response); 392 } catch (IOException e) { 393 e = unwrapException(e); 394 // TODO: In the future some operation may want to bail out early. 395 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 396 if (!scheduleForRetry(e)) { 397 remoteCallFailed(procedureEnv, e); 398 } 399 } 400 } 401 402 private OpenRegionResponse sendRequest(final ServerName serverName, 403 final OpenRegionRequest request) throws IOException { 404 try { 405 return getRsAdmin().openRegion(null, request); 406 } catch (ServiceException se) { 407 throw ProtobufUtil.getRemoteException(se); 408 } 409 } 410 411 private void remoteCallCompleted(final MasterProcedureEnv env, 412 final OpenRegionResponse response) { 413 int index = 0; 414 for (RegionOpenOperation op: operations) { 415 OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++); 416 op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING); 417 op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op); 418 } 419 } 420 421 private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 422 for (RegionOpenOperation op: operations) { 423 op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); 424 } 425 } 426 } 427 428 /** 429 * Compatibility class used by {@link CompatRemoteProcedureResolver} to close regions using old 430 * {@link AdminService#closeRegion(RpcController, CloseRegionRequest, RpcCallback)} rpc. 431 */ 432 private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { 433 private final RegionCloseOperation operation; 434 435 public CloseRegionRemoteCall(final ServerName serverName, 436 final RegionCloseOperation operation) { 437 super(serverName); 438 this.operation = operation; 439 } 440 441 @Override 442 public void run() { 443 final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); 444 try { 445 CloseRegionResponse response = sendRequest(getServerName(), request); 446 remoteCallCompleted(procedureEnv, response); 447 } catch (IOException e) { 448 e = unwrapException(e); 449 // TODO: In the future some operation may want to bail out early. 450 // TODO: How many times should we retry (use numberOfAttemptsSoFar) 451 if (!scheduleForRetry(e)) { 452 remoteCallFailed(procedureEnv, e); 453 } 454 } 455 } 456 457 private CloseRegionResponse sendRequest(final ServerName serverName, 458 final CloseRegionRequest request) throws IOException { 459 try { 460 return getRsAdmin().closeRegion(null, request); 461 } catch (ServiceException se) { 462 throw ProtobufUtil.getRemoteException(se); 463 } 464 } 465 466 private void remoteCallCompleted(final MasterProcedureEnv env, 467 final CloseRegionResponse response) { 468 operation.setClosed(response.getClosed()); 469 operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation); 470 } 471 472 private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { 473 operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); 474 } 475 } 476 477 // ========================================================================== 478 // RPC Messages 479 // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) 480 // - RegionOperation: open, close, flush, snapshot, ... 481 // ========================================================================== 482 /* Currently unused 483 public static abstract class ServerOperation extends RemoteOperation { 484 protected ServerOperation(final RemoteProcedure remoteProcedure) { 485 super(remoteProcedure); 486 } 487 } 488 */ 489 490 public static abstract class RegionOperation extends RemoteOperation { 491 private final RegionInfo regionInfo; 492 493 protected RegionOperation(final RemoteProcedure remoteProcedure, 494 final RegionInfo regionInfo) { 495 super(remoteProcedure); 496 this.regionInfo = regionInfo; 497 } 498 499 public RegionInfo getRegionInfo() { 500 return this.regionInfo; 501 } 502 } 503 504 public static class RegionOpenOperation extends RegionOperation { 505 private final List<ServerName> favoredNodes; 506 private final boolean openForReplay; 507 private boolean failedOpen; 508 509 public RegionOpenOperation(final RemoteProcedure remoteProcedure, 510 final RegionInfo regionInfo, final List<ServerName> favoredNodes, 511 final boolean openForReplay) { 512 super(remoteProcedure, regionInfo); 513 this.favoredNodes = favoredNodes; 514 this.openForReplay = openForReplay; 515 } 516 517 protected void setFailedOpen(final boolean failedOpen) { 518 this.failedOpen = failedOpen; 519 } 520 521 public boolean isFailedOpen() { 522 return failedOpen; 523 } 524 525 public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( 526 final MasterProcedureEnv env) { 527 return RequestConverter.buildRegionOpenInfo(getRegionInfo(), 528 env.getAssignmentManager().getFavoredNodes(getRegionInfo())); 529 } 530 } 531 532 public static class RegionCloseOperation extends RegionOperation { 533 private final ServerName destinationServer; 534 private boolean closed = false; 535 536 public RegionCloseOperation(final RemoteProcedure remoteProcedure, 537 final RegionInfo regionInfo, final ServerName destinationServer) { 538 super(remoteProcedure, regionInfo); 539 this.destinationServer = destinationServer; 540 } 541 542 public ServerName getDestinationServer() { 543 return destinationServer; 544 } 545 546 protected void setClosed(final boolean closed) { 547 this.closed = closed; 548 } 549 550 public boolean isClosed() { 551 return closed; 552 } 553 554 public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { 555 return ProtobufUtil.buildCloseRegionRequest(serverName, 556 getRegionInfo().getRegionName(), getDestinationServer()); 557 } 558 } 559}