001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.io.UncheckedIOException; 027import java.net.SocketTimeoutException; 028import java.util.Arrays; 029import java.util.NavigableMap; 030import java.util.Random; 031import java.util.Set; 032import java.util.SortedSet; 033import java.util.concurrent.ConcurrentSkipListMap; 034import java.util.concurrent.ConcurrentSkipListSet; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.Executors; 037import java.util.concurrent.Future; 038import java.util.concurrent.ScheduledExecutorService; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.CallQueueTooBigException; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.ServerMetricsBuilder; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.YouAreDeadException; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.ipc.CallTimeoutException; 051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 052import org.apache.hadoop.hbase.master.MasterServices; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 059import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.CommonFSUtils; 065import org.apache.hadoop.ipc.RemoteException; 066import org.junit.After; 067import org.junit.Before; 068import org.junit.Rule; 069import org.junit.rules.ExpectedException; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 088 089/** 090 * Base class for AM test. 091 */ 092public abstract class TestAssignmentManagerBase { 093 094 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); 095 096 @Rule 097 public TestName name = new TestName(); 098 @Rule 099 public final ExpectedException exception = ExpectedException.none(); 100 101 protected static final int PROC_NTHREADS = 64; 102 protected static final int NREGIONS = 1 * 1000; 103 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 104 105 protected HBaseTestingUtility util; 106 protected MockRSProcedureDispatcher rsDispatcher; 107 protected MockMasterServices master; 108 protected AssignmentManager am; 109 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 110 new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>(); 111 // Simple executor to run some simple tasks. 112 protected ScheduledExecutorService executor; 113 114 protected ProcedureMetrics assignProcMetrics; 115 protected ProcedureMetrics unassignProcMetrics; 116 protected ProcedureMetrics moveProcMetrics; 117 protected ProcedureMetrics reopenProcMetrics; 118 protected ProcedureMetrics openProcMetrics; 119 protected ProcedureMetrics closeProcMetrics; 120 121 protected long assignSubmittedCount = 0; 122 protected long assignFailedCount = 0; 123 protected long unassignSubmittedCount = 0; 124 protected long unassignFailedCount = 0; 125 protected long moveSubmittedCount = 0; 126 protected long moveFailedCount = 0; 127 protected long reopenSubmittedCount = 0; 128 protected long reopenFailedCount = 0; 129 protected long openSubmittedCount = 0; 130 protected long openFailedCount = 0; 131 protected long closeSubmittedCount = 0; 132 protected long closeFailedCount = 0; 133 134 protected int newRsAdded; 135 136 protected int getAssignMaxAttempts() { 137 // Have many so we succeed eventually. 138 return 1000; 139 } 140 141 protected void setupConfiguration(Configuration conf) throws Exception { 142 CommonFSUtils.setRootDir(conf, util.getDataTestDir()); 143 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 144 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 145 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 146 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 147 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); 148 // make retry for TRSP more frequent 149 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); 150 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); 151 } 152 153 @Before 154 public void setUp() throws Exception { 155 util = new HBaseTestingUtility(); 156 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 157 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 158 setupConfiguration(util.getConfiguration()); 159 master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); 160 rsDispatcher = new MockRSProcedureDispatcher(master); 161 master.start(NSERVERS, rsDispatcher); 162 newRsAdded = 0; 163 am = master.getAssignmentManager(); 164 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 165 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 166 moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics(); 167 reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics(); 168 openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics(); 169 closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics(); 170 setUpMeta(); 171 } 172 173 protected void setUpMeta() throws Exception { 174 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 175 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 176 am.wakeMetaLoadedEvent(); 177 } 178 179 @After 180 public void tearDown() throws Exception { 181 master.stop("tearDown"); 182 this.executor.shutdownNow(); 183 } 184 185 protected class NoopRsExecutor implements MockRSExecutor { 186 @Override 187 public ExecuteProceduresResponse sendRequest(ServerName server, 188 ExecuteProceduresRequest request) throws IOException { 189 if (request.getOpenRegionCount() > 0) { 190 for (OpenRegionRequest req : request.getOpenRegionList()) { 191 for (RegionOpenInfo openReq : req.getOpenInfoList()) { 192 execOpenRegion(server, openReq); 193 } 194 } 195 } 196 if (request.getCloseRegionCount() > 0) { 197 for (CloseRegionRequest req : request.getCloseRegionList()) { 198 execCloseRegion(server, req.getRegion().getValue().toByteArray()); 199 } 200 } 201 return ExecuteProceduresResponse.newBuilder().build(); 202 } 203 204 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) 205 throws IOException { 206 return null; 207 } 208 209 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 210 throws IOException { 211 return null; 212 } 213 } 214 215 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 216 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 217 } 218 219 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 220 try { 221 return future.get(3, TimeUnit.MINUTES); 222 } catch (ExecutionException e) { 223 LOG.info("ExecutionException", e); 224 Exception ee = (Exception) e.getCause(); 225 if (ee instanceof InterruptedIOException) { 226 for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) { 227 LOG.info(p.toStringDetails()); 228 } 229 } 230 throw (Exception) e.getCause(); 231 } 232 } 233 234 // ============================================================================================ 235 // Helpers 236 // ============================================================================================ 237 protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception { 238 Thread[] threads = new Thread[PROC_NTHREADS]; 239 for (int i = 0; i < threads.length; ++i) { 240 final int threadId = i; 241 threads[i] = new Thread() { 242 @Override 243 public void run() { 244 TableName tableName = TableName.valueOf("table-" + threadId); 245 int n = (procs.length / threads.length); 246 int start = threadId * n; 247 int stop = start + n; 248 for (int j = start; j < stop; ++j) { 249 procs[j] = createAndSubmitAssign(tableName, j); 250 } 251 } 252 }; 253 threads[i].start(); 254 } 255 for (int i = 0; i < threads.length; ++i) { 256 threads[i].join(); 257 } 258 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 259 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 260 } 261 } 262 263 protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) { 264 RegionInfo hri = createRegionInfo(tableName, regionId); 265 TransitRegionStateProcedure proc = createAssignProcedure(hri); 266 master.getMasterProcedureExecutor().submitProcedure(proc); 267 return proc; 268 } 269 270 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 271 return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) 272 .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); 273 } 274 275 protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) { 276 return am.createAssignProcedures(Arrays.asList(hri))[0]; 277 } 278 279 protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) { 280 RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri); 281 TransitRegionStateProcedure proc; 282 regionNode.lock(); 283 try { 284 assertFalse(regionNode.isInTransition()); 285 proc = TransitRegionStateProcedure 286 .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri); 287 regionNode.setProcedure(proc); 288 } finally { 289 regionNode.unlock(); 290 } 291 return proc; 292 } 293 294 protected void sendTransitionReport(final ServerName serverName, 295 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 296 final TransitionCode state, long seqId) throws IOException { 297 ReportRegionStateTransitionRequest.Builder req = 298 ReportRegionStateTransitionRequest.newBuilder(); 299 req.setServer(ProtobufUtil.toServerName(serverName)); 300 req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) 301 .setTransitionCode(state).setOpenSeqNum(seqId).build()); 302 am.reportRegionStateTransition(req.build()); 303 } 304 305 protected void doCrash(final ServerName serverName) { 306 this.master.getServerManager().moveFromOnlineToDeadServers(serverName); 307 this.am.submitServerCrash(serverName, false/* No WALs here */, false); 308 // add a new server to avoid killing all the region servers which may hang the UTs 309 ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1); 310 newRsAdded++; 311 try { 312 this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder.of(newSn)); 313 } catch (YouAreDeadException e) { 314 // should not happen 315 throw new UncheckedIOException(e); 316 } 317 } 318 319 protected void doRestart(final ServerName serverName) { 320 try { 321 this.master.restartRegionServer(serverName); 322 } catch (IOException e) { 323 LOG.warn("Can not restart RS with new startcode"); 324 } 325 } 326 327 protected class GoodRsExecutor extends NoopRsExecutor { 328 @Override 329 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) 330 throws IOException { 331 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 332 long previousOpenSeqNum = 333 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 334 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 335 previousOpenSeqNum + 2); 336 // Concurrency? 337 // Now update the state of our cluster in regionsToRegionServers. 338 SortedSet<byte[]> regions = regionsToRegionServers.get(server); 339 if (regions == null) { 340 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 341 regionsToRegionServers.put(server, regions); 342 } 343 if (regions.contains(hri.getRegionName())) { 344 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 345 } 346 regions.add(hri.getRegionName()); 347 return RegionOpeningState.OPENED; 348 } 349 350 @Override 351 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 352 throws IOException { 353 RegionInfo hri = am.getRegionInfo(regionName); 354 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 355 return CloseRegionResponse.newBuilder().setClosed(true).build(); 356 } 357 } 358 359 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 360 @Override 361 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 362 throws IOException { 363 throw new ServerNotRunningYetException("wait on server startup"); 364 } 365 } 366 367 protected static class FaultyRsExecutor implements MockRSExecutor { 368 private final IOException exception; 369 370 public FaultyRsExecutor(final IOException exception) { 371 this.exception = exception; 372 } 373 374 @Override 375 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 376 throws IOException { 377 throw exception; 378 } 379 } 380 381 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 382 private final int timeoutTimes; 383 384 private ServerName lastServer; 385 private int retries; 386 387 public SocketTimeoutRsExecutor(int timeoutTimes) { 388 this.timeoutTimes = timeoutTimes; 389 } 390 391 @Override 392 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 393 throws IOException { 394 // SocketTimeoutException should be a temporary problem 395 // unless the server will be declared dead. 396 retries++; 397 if (retries == 1) { 398 lastServer = server; 399 } 400 if (retries <= timeoutTimes) { 401 LOG.debug("Socket timeout for server=" + server + " retries=" + retries); 402 // should not change the server if the server is not dead yet. 403 assertEquals(lastServer, server); 404 if (retries == timeoutTimes) { 405 LOG.info("Mark server=" + server + " as dead. retries=" + retries); 406 master.getServerManager().moveFromOnlineToDeadServers(server); 407 executor.schedule(new Runnable() { 408 @Override 409 public void run() { 410 LOG.info("Sending in CRASH of " + server); 411 doCrash(server); 412 } 413 }, 1, TimeUnit.SECONDS); 414 } 415 throw new SocketTimeoutException("simulate socket timeout"); 416 } else { 417 // should select another server 418 assertNotEquals(lastServer, server); 419 return super.sendRequest(server, req); 420 } 421 } 422 } 423 424 protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { 425 426 private boolean invoked = false; 427 428 private ServerName lastServer; 429 430 @Override 431 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 432 throws IOException { 433 if (!invoked) { 434 lastServer = server; 435 invoked = true; 436 throw new CallQueueTooBigException("simulate queue full"); 437 } 438 // better select another server since the server is over loaded, but anyway, it is fine to 439 // still select the same server since it is not dead yet... 440 if (lastServer.equals(server)) { 441 LOG.warn("We still select the same server, which is not good."); 442 } 443 return super.sendRequest(server, req); 444 } 445 } 446 447 protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { 448 449 private final int queueFullTimes; 450 451 private int retries; 452 453 private ServerName lastServer; 454 455 public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { 456 this.queueFullTimes = queueFullTimes; 457 } 458 459 @Override 460 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 461 throws IOException { 462 retries++; 463 if (retries == 1) { 464 lastServer = server; 465 throw new CallTimeoutException("simulate call timeout"); 466 } 467 // should always retry on the same server 468 assertEquals(lastServer, server); 469 if (retries < queueFullTimes) { 470 throw new CallQueueTooBigException("simulate queue full"); 471 } 472 return super.sendRequest(server, req); 473 } 474 } 475 476 /** 477 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 478 * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this 479 * server after a few seconds; crash is supposed to take care of the suspended procedures. 480 */ 481 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 482 private int invocations; 483 484 @Override 485 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 486 throws IOException { 487 if (this.invocations++ > 0) { 488 // Return w/o problem the second time through here. 489 return super.execOpenRegion(server, openReq); 490 } 491 // The procedure on master will just hang forever because nothing comes back 492 // from the RS in this case. 493 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 494 executor.schedule(new Runnable() { 495 @Override 496 public void run() { 497 LOG.info("Sending in CRASH of " + server); 498 doCrash(server); 499 } 500 }, 1, TimeUnit.SECONDS); 501 return null; 502 } 503 } 504 505 /** 506 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 507 * proc is stuck/suspended on the Master and won't wake up.). Different with 508 * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the 509 * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for 510 * RS crashed when SCP is not enabled. 511 */ 512 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 513 private int invocations; 514 515 @Override 516 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 517 throws IOException { 518 if (this.invocations++ > 0) { 519 // Return w/o problem the second time through here. 520 return super.execOpenRegion(server, openReq); 521 } 522 // The procedure on master will just hang forever because nothing comes back 523 // from the RS in this case. 524 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 525 executor.schedule(new Runnable() { 526 @Override 527 public void run() { 528 LOG.info("Restarting RS of " + server); 529 doRestart(server); 530 } 531 }, 1, TimeUnit.SECONDS); 532 return null; 533 } 534 } 535 536 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 537 public static final int TYPES_OF_FAILURE = 6; 538 private int invocations; 539 540 @Override 541 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 542 throws IOException { 543 switch (this.invocations++) { 544 case 0: 545 throw new NotServingRegionException("Fake"); 546 case 1: 547 executor.schedule(new Runnable() { 548 @Override 549 public void run() { 550 LOG.info("Sending in CRASH of " + server); 551 doCrash(server); 552 } 553 }, 1, TimeUnit.SECONDS); 554 throw new RegionServerAbortedException("Fake!"); 555 case 2: 556 executor.schedule(new Runnable() { 557 @Override 558 public void run() { 559 LOG.info("Sending in CRASH of " + server); 560 doCrash(server); 561 } 562 }, 1, TimeUnit.SECONDS); 563 throw new RegionServerStoppedException("Fake!"); 564 case 3: 565 throw new ServerNotRunningYetException("Fake!"); 566 case 4: 567 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 568 executor.schedule(new Runnable() { 569 @Override 570 public void run() { 571 LOG.info("Sending in CRASH of " + server); 572 doCrash(server); 573 } 574 }, 1, TimeUnit.SECONDS); 575 return null; 576 default: 577 return super.execCloseRegion(server, regionName); 578 } 579 } 580 } 581 582 protected class RandRsExecutor extends NoopRsExecutor { 583 private final Random rand = new Random(); 584 585 @Override 586 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 587 throws IOException { 588 switch (rand.nextInt(5)) { 589 case 0: 590 throw new ServerNotRunningYetException("wait on server startup"); 591 case 1: 592 throw new SocketTimeoutException("simulate socket timeout"); 593 case 2: 594 throw new RemoteException("java.io.IOException", "unexpected exception"); 595 default: 596 // fall out 597 } 598 return super.sendRequest(server, req); 599 } 600 601 @Override 602 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 603 throws IOException { 604 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 605 long previousOpenSeqNum = 606 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 607 switch (rand.nextInt(3)) { 608 case 0: 609 LOG.info("Return OPENED response"); 610 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 611 previousOpenSeqNum + 2); 612 return OpenRegionResponse.RegionOpeningState.OPENED; 613 case 1: 614 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 615 sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); 616 return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 617 default: 618 // fall out 619 } 620 // The procedure on master will just hang forever because nothing comes back 621 // from the RS in this case. 622 LOG.info("Return null as response; means proc stuck so we send in a crash report after" + 623 " a few seconds..."); 624 executor.schedule(new Runnable() { 625 @Override 626 public void run() { 627 LOG.info("Delayed CRASHING of " + server); 628 doCrash(server); 629 } 630 }, 5, TimeUnit.SECONDS); 631 return null; 632 } 633 634 @Override 635 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 636 throws IOException { 637 CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); 638 boolean closed = rand.nextBoolean(); 639 if (closed) { 640 RegionInfo hri = am.getRegionInfo(regionName); 641 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 642 } 643 resp.setClosed(closed); 644 return resp.build(); 645 } 646 } 647 648 protected interface MockRSExecutor { 649 ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 650 throws IOException; 651 } 652 653 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 654 private MockRSExecutor mockRsExec; 655 656 public MockRSProcedureDispatcher(final MasterServices master) { 657 super(master); 658 } 659 660 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 661 this.mockRsExec = mockRsExec; 662 } 663 664 @Override 665 protected void remoteDispatch(ServerName serverName, 666 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 667 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 668 } 669 670 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 671 public MockRemoteCall(final ServerName serverName, 672 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 673 super(serverName, operations); 674 } 675 676 @Override 677 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 678 final ExecuteProceduresRequest request) throws IOException { 679 return mockRsExec.sendRequest(serverName, request); 680 } 681 } 682 } 683 684 protected final void collectAssignmentManagerMetrics() { 685 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 686 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 687 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 688 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 689 moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount(); 690 moveFailedCount = moveProcMetrics.getFailedCounter().getCount(); 691 reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount(); 692 reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount(); 693 openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount(); 694 openFailedCount = openProcMetrics.getFailedCounter().getCount(); 695 closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount(); 696 closeFailedCount = closeProcMetrics.getFailedCounter().getCount(); 697 } 698}