001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.io.UncheckedIOException; 027import java.net.SocketTimeoutException; 028import java.util.Arrays; 029import java.util.NavigableMap; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ConcurrentSkipListSet; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.concurrent.ScheduledExecutorService; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.CallQueueTooBigException; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.ServerMetricsBuilder; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.YouAreDeadException; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.ipc.CallTimeoutException; 051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 052import org.apache.hadoop.hbase.master.MasterServices; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 059import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.CommonFSUtils; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.ipc.RemoteException; 067import org.junit.After; 068import org.junit.Before; 069import org.junit.Rule; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 088 089/** 090 * Base class for AM test. 091 */ 092public abstract class TestAssignmentManagerBase { 093 094 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); 095 096 @Rule 097 public TestName name = new TestName(); 098 099 protected static final int PROC_NTHREADS = 64; 100 protected static final int NREGIONS = 1 * 1000; 101 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 102 103 protected HBaseTestingUtil util; 104 protected MockRSProcedureDispatcher rsDispatcher; 105 protected MockMasterServices master; 106 protected AssignmentManager am; 107 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 108 new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>(); 109 // Simple executor to run some simple tasks. 110 protected ScheduledExecutorService executor; 111 112 protected ProcedureMetrics assignProcMetrics; 113 protected ProcedureMetrics unassignProcMetrics; 114 protected ProcedureMetrics moveProcMetrics; 115 protected ProcedureMetrics reopenProcMetrics; 116 protected ProcedureMetrics openProcMetrics; 117 protected ProcedureMetrics closeProcMetrics; 118 119 protected long assignSubmittedCount = 0; 120 protected long assignFailedCount = 0; 121 protected long unassignSubmittedCount = 0; 122 protected long unassignFailedCount = 0; 123 protected long moveSubmittedCount = 0; 124 protected long moveFailedCount = 0; 125 protected long reopenSubmittedCount = 0; 126 protected long reopenFailedCount = 0; 127 protected long openSubmittedCount = 0; 128 protected long openFailedCount = 0; 129 protected long closeSubmittedCount = 0; 130 protected long closeFailedCount = 0; 131 132 protected int newRsAdded; 133 134 protected int getAssignMaxAttempts() { 135 // Have many so we succeed eventually. 136 return 1000; 137 } 138 139 protected void setupConfiguration(Configuration conf) throws Exception { 140 CommonFSUtils.setRootDir(conf, util.getDataTestDir()); 141 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 142 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 143 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 144 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 145 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); 146 // make retry for TRSP more frequent 147 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); 148 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); 149 } 150 151 @Before 152 public void setUp() throws Exception { 153 util = new HBaseTestingUtil(); 154 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 155 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 156 setupConfiguration(util.getConfiguration()); 157 master = new MockMasterServices(util.getConfiguration()); 158 rsDispatcher = new MockRSProcedureDispatcher(master); 159 master.start(NSERVERS, rsDispatcher); 160 newRsAdded = 0; 161 am = master.getAssignmentManager(); 162 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 163 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 164 moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics(); 165 reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics(); 166 openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics(); 167 closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics(); 168 setUpMeta(); 169 } 170 171 protected void setUpMeta() throws Exception { 172 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 173 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 174 am.wakeMetaLoadedEvent(); 175 } 176 177 @After 178 public void tearDown() throws Exception { 179 master.stop("tearDown"); 180 this.executor.shutdownNow(); 181 } 182 183 protected class NoopRsExecutor implements MockRSExecutor { 184 @Override 185 public ExecuteProceduresResponse sendRequest(ServerName server, 186 ExecuteProceduresRequest request) throws IOException { 187 if (request.getOpenRegionCount() > 0) { 188 for (OpenRegionRequest req : request.getOpenRegionList()) { 189 for (RegionOpenInfo openReq : req.getOpenInfoList()) { 190 execOpenRegion(server, openReq); 191 } 192 } 193 } 194 if (request.getCloseRegionCount() > 0) { 195 for (CloseRegionRequest req : request.getCloseRegionList()) { 196 execCloseRegion(server, req.getRegion().getValue().toByteArray()); 197 } 198 } 199 return ExecuteProceduresResponse.newBuilder().build(); 200 } 201 202 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) 203 throws IOException { 204 return null; 205 } 206 207 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 208 throws IOException { 209 return null; 210 } 211 } 212 213 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 214 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 215 } 216 217 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 218 try { 219 return future.get(3, TimeUnit.MINUTES); 220 } catch (ExecutionException e) { 221 LOG.info("ExecutionException", e); 222 Exception ee = (Exception) e.getCause(); 223 if (ee instanceof InterruptedIOException) { 224 for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) { 225 LOG.info(p.toStringDetails()); 226 } 227 } 228 throw (Exception) e.getCause(); 229 } 230 } 231 232 // ============================================================================================ 233 // Helpers 234 // ============================================================================================ 235 protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception { 236 Thread[] threads = new Thread[PROC_NTHREADS]; 237 for (int i = 0; i < threads.length; ++i) { 238 final int threadId = i; 239 threads[i] = new Thread() { 240 @Override 241 public void run() { 242 TableName tableName = TableName.valueOf("table-" + threadId); 243 int n = (procs.length / threads.length); 244 int start = threadId * n; 245 int stop = start + n; 246 for (int j = start; j < stop; ++j) { 247 procs[j] = createAndSubmitAssign(tableName, j); 248 } 249 } 250 }; 251 threads[i].start(); 252 } 253 for (int i = 0; i < threads.length; ++i) { 254 threads[i].join(); 255 } 256 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 257 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 258 } 259 } 260 261 protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) { 262 RegionInfo hri = createRegionInfo(tableName, regionId); 263 TransitRegionStateProcedure proc = createAssignProcedure(hri); 264 master.getMasterProcedureExecutor().submitProcedure(proc); 265 return proc; 266 } 267 268 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 269 return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) 270 .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); 271 } 272 273 protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) { 274 return am.createAssignProcedures(Arrays.asList(hri))[0]; 275 } 276 277 protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) { 278 RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri); 279 TransitRegionStateProcedure proc; 280 regionNode.lock(); 281 try { 282 assertFalse(regionNode.isInTransition()); 283 proc = TransitRegionStateProcedure 284 .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri); 285 regionNode.setProcedure(proc); 286 } finally { 287 regionNode.unlock(); 288 } 289 return proc; 290 } 291 292 protected void sendTransitionReport(final ServerName serverName, 293 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 294 final TransitionCode state, long seqId) throws IOException { 295 ReportRegionStateTransitionRequest.Builder req = 296 ReportRegionStateTransitionRequest.newBuilder(); 297 req.setServer(ProtobufUtil.toServerName(serverName)); 298 req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) 299 .setTransitionCode(state).setOpenSeqNum(seqId).build()); 300 am.reportRegionStateTransition(req.build()); 301 } 302 303 protected void doCrash(final ServerName serverName) { 304 this.master.getServerManager().moveFromOnlineToDeadServers(serverName); 305 this.am.submitServerCrash(serverName, false/* No WALs here */, false); 306 // add a new server to avoid killing all the region servers which may hang the UTs 307 ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1); 308 newRsAdded++; 309 try { 310 this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder 311 .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 312 } catch (YouAreDeadException e) { 313 // should not happen 314 throw new UncheckedIOException(e); 315 } 316 } 317 318 protected void doRestart(final ServerName serverName) { 319 try { 320 this.master.restartRegionServer(serverName); 321 } catch (IOException e) { 322 LOG.warn("Can not restart RS with new startcode"); 323 } 324 } 325 326 protected class GoodRsExecutor extends NoopRsExecutor { 327 @Override 328 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) 329 throws IOException { 330 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 331 long previousOpenSeqNum = 332 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 333 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 334 previousOpenSeqNum + 2); 335 // Concurrency? 336 // Now update the state of our cluster in regionsToRegionServers. 337 SortedSet<byte[]> regions = regionsToRegionServers.get(server); 338 if (regions == null) { 339 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 340 regionsToRegionServers.put(server, regions); 341 } 342 if (regions.contains(hri.getRegionName())) { 343 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 344 } 345 regions.add(hri.getRegionName()); 346 return RegionOpeningState.OPENED; 347 } 348 349 @Override 350 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 351 throws IOException { 352 RegionInfo hri = am.getRegionInfo(regionName); 353 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 354 return CloseRegionResponse.newBuilder().setClosed(true).build(); 355 } 356 } 357 358 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 359 @Override 360 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 361 throws IOException { 362 throw new ServerNotRunningYetException("wait on server startup"); 363 } 364 } 365 366 protected static class FaultyRsExecutor implements MockRSExecutor { 367 private final IOException exception; 368 369 public FaultyRsExecutor(final IOException exception) { 370 this.exception = exception; 371 } 372 373 @Override 374 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 375 throws IOException { 376 throw exception; 377 } 378 } 379 380 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 381 private final int timeoutTimes; 382 383 private ServerName lastServer; 384 private int retries; 385 386 public SocketTimeoutRsExecutor(int timeoutTimes) { 387 this.timeoutTimes = timeoutTimes; 388 } 389 390 @Override 391 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 392 throws IOException { 393 // SocketTimeoutException should be a temporary problem 394 // unless the server will be declared dead. 395 retries++; 396 if (retries == 1) { 397 lastServer = server; 398 } 399 if (retries <= timeoutTimes) { 400 LOG.debug("Socket timeout for server=" + server + " retries=" + retries); 401 // should not change the server if the server is not dead yet. 402 assertEquals(lastServer, server); 403 if (retries == timeoutTimes) { 404 LOG.info("Mark server=" + server + " as dead. retries=" + retries); 405 master.getServerManager().moveFromOnlineToDeadServers(server); 406 executor.schedule(new Runnable() { 407 @Override 408 public void run() { 409 LOG.info("Sending in CRASH of " + server); 410 doCrash(server); 411 } 412 }, 1, TimeUnit.SECONDS); 413 } 414 throw new SocketTimeoutException("simulate socket timeout"); 415 } else { 416 // should select another server 417 assertNotEquals(lastServer, server); 418 return super.sendRequest(server, req); 419 } 420 } 421 } 422 423 protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { 424 425 private boolean invoked = false; 426 427 private ServerName lastServer; 428 429 @Override 430 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 431 throws IOException { 432 if (!invoked) { 433 lastServer = server; 434 invoked = true; 435 throw new CallQueueTooBigException("simulate queue full"); 436 } 437 // better select another server since the server is over loaded, but anyway, it is fine to 438 // still select the same server since it is not dead yet... 439 if (lastServer.equals(server)) { 440 LOG.warn("We still select the same server, which is not good."); 441 } 442 return super.sendRequest(server, req); 443 } 444 } 445 446 protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { 447 448 private final int queueFullTimes; 449 450 private int retries; 451 452 private ServerName lastServer; 453 454 public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { 455 this.queueFullTimes = queueFullTimes; 456 } 457 458 @Override 459 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 460 throws IOException { 461 retries++; 462 if (retries == 1) { 463 lastServer = server; 464 throw new CallTimeoutException("simulate call timeout"); 465 } 466 // should always retry on the same server 467 assertEquals(lastServer, server); 468 if (retries < queueFullTimes) { 469 throw new CallQueueTooBigException("simulate queue full"); 470 } 471 return super.sendRequest(server, req); 472 } 473 } 474 475 /** 476 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 477 * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this 478 * server after a few seconds; crash is supposed to take care of the suspended procedures. 479 */ 480 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 481 private int invocations; 482 483 @Override 484 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 485 throws IOException { 486 if (this.invocations++ > 0) { 487 // Return w/o problem the second time through here. 488 return super.execOpenRegion(server, openReq); 489 } 490 // The procedure on master will just hang forever because nothing comes back 491 // from the RS in this case. 492 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 493 executor.schedule(new Runnable() { 494 @Override 495 public void run() { 496 LOG.info("Sending in CRASH of " + server); 497 doCrash(server); 498 } 499 }, 1, TimeUnit.SECONDS); 500 return null; 501 } 502 } 503 504 /** 505 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 506 * proc is stuck/suspended on the Master and won't wake up.). Different with 507 * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the 508 * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for 509 * RS crashed when SCP is not enabled. 510 */ 511 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 512 private int invocations; 513 514 @Override 515 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 516 throws IOException { 517 if (this.invocations++ > 0) { 518 // Return w/o problem the second time through here. 519 return super.execOpenRegion(server, openReq); 520 } 521 // The procedure on master will just hang forever because nothing comes back 522 // from the RS in this case. 523 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 524 executor.schedule(new Runnable() { 525 @Override 526 public void run() { 527 LOG.info("Restarting RS of " + server); 528 doRestart(server); 529 } 530 }, 1, TimeUnit.SECONDS); 531 return null; 532 } 533 } 534 535 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 536 public static final int TYPES_OF_FAILURE = 6; 537 private int invocations; 538 539 @Override 540 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 541 throws IOException { 542 switch (this.invocations++) { 543 case 0: 544 throw new NotServingRegionException("Fake"); 545 case 1: 546 executor.schedule(new Runnable() { 547 @Override 548 public void run() { 549 LOG.info("Sending in CRASH of " + server); 550 doCrash(server); 551 } 552 }, 1, TimeUnit.SECONDS); 553 throw new RegionServerAbortedException("Fake!"); 554 case 2: 555 executor.schedule(new Runnable() { 556 @Override 557 public void run() { 558 LOG.info("Sending in CRASH of " + server); 559 doCrash(server); 560 } 561 }, 1, TimeUnit.SECONDS); 562 throw new RegionServerStoppedException("Fake!"); 563 case 3: 564 throw new ServerNotRunningYetException("Fake!"); 565 case 4: 566 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 567 executor.schedule(new Runnable() { 568 @Override 569 public void run() { 570 LOG.info("Sending in CRASH of " + server); 571 doCrash(server); 572 } 573 }, 1, TimeUnit.SECONDS); 574 return null; 575 default: 576 return super.execCloseRegion(server, regionName); 577 } 578 } 579 } 580 581 protected class RandRsExecutor extends NoopRsExecutor { 582 @Override 583 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 584 throws IOException { 585 switch (ThreadLocalRandom.current().nextInt(5)) { 586 case 0: 587 throw new ServerNotRunningYetException("wait on server startup"); 588 case 1: 589 throw new SocketTimeoutException("simulate socket timeout"); 590 case 2: 591 throw new RemoteException("java.io.IOException", "unexpected exception"); 592 default: 593 // fall out 594 } 595 return super.sendRequest(server, req); 596 } 597 598 @Override 599 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 600 throws IOException { 601 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 602 long previousOpenSeqNum = 603 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 604 switch (ThreadLocalRandom.current().nextInt(3)) { 605 case 0: 606 LOG.info("Return OPENED response"); 607 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 608 previousOpenSeqNum + 2); 609 return OpenRegionResponse.RegionOpeningState.OPENED; 610 case 1: 611 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 612 sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); 613 return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 614 default: 615 // fall out 616 } 617 // The procedure on master will just hang forever because nothing comes back 618 // from the RS in this case. 619 LOG.info("Return null as response; means proc stuck so we send in a crash report after" 620 + " a few seconds..."); 621 executor.schedule(new Runnable() { 622 @Override 623 public void run() { 624 LOG.info("Delayed CRASHING of " + server); 625 doCrash(server); 626 } 627 }, 5, TimeUnit.SECONDS); 628 return null; 629 } 630 631 @Override 632 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 633 throws IOException { 634 CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); 635 boolean closed = ThreadLocalRandom.current().nextBoolean(); 636 if (closed) { 637 RegionInfo hri = am.getRegionInfo(regionName); 638 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 639 } 640 resp.setClosed(closed); 641 return resp.build(); 642 } 643 } 644 645 protected interface MockRSExecutor { 646 ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 647 throws IOException; 648 } 649 650 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 651 private MockRSExecutor mockRsExec; 652 653 public MockRSProcedureDispatcher(final MasterServices master) { 654 super(master); 655 } 656 657 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 658 this.mockRsExec = mockRsExec; 659 } 660 661 @Override 662 protected void remoteDispatch(ServerName serverName, 663 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 664 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 665 } 666 667 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 668 public MockRemoteCall(final ServerName serverName, 669 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 670 super(serverName, operations); 671 } 672 673 @Override 674 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 675 final ExecuteProceduresRequest request) throws IOException { 676 return mockRsExec.sendRequest(serverName, request); 677 } 678 } 679 } 680 681 protected final void collectAssignmentManagerMetrics() { 682 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 683 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 684 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 685 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 686 moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount(); 687 moveFailedCount = moveProcMetrics.getFailedCounter().getCount(); 688 reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount(); 689 reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount(); 690 openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount(); 691 openFailedCount = openProcMetrics.getFailedCounter().getCount(); 692 closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount(); 693 closeFailedCount = closeProcMetrics.getFailedCounter().getCount(); 694 } 695}