001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.io.UncheckedIOException; 027import java.net.SocketTimeoutException; 028import java.util.Arrays; 029import java.util.NavigableMap; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ConcurrentSkipListSet; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.concurrent.ScheduledExecutorService; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.CallQueueTooBigException; 042import org.apache.hadoop.hbase.HBaseTestingUtility; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.ServerMetricsBuilder; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.YouAreDeadException; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.ipc.CallTimeoutException; 051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 052import org.apache.hadoop.hbase.master.MasterServices; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 059import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.CommonFSUtils; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.ipc.RemoteException; 067import org.junit.After; 068import org.junit.Before; 069import org.junit.Rule; 070import org.junit.rules.ExpectedException; 071import org.junit.rules.TestName; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 076 077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 089 090/** 091 * Base class for AM test. 092 */ 093public abstract class TestAssignmentManagerBase { 094 095 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); 096 097 @Rule 098 public TestName name = new TestName(); 099 @Rule 100 public final ExpectedException exception = ExpectedException.none(); 101 102 protected static final int PROC_NTHREADS = 64; 103 protected static final int NREGIONS = 1 * 1000; 104 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 105 106 protected HBaseTestingUtility util; 107 protected MockRSProcedureDispatcher rsDispatcher; 108 protected MockMasterServices master; 109 protected AssignmentManager am; 110 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 111 new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>(); 112 // Simple executor to run some simple tasks. 113 protected ScheduledExecutorService executor; 114 115 protected ProcedureMetrics assignProcMetrics; 116 protected ProcedureMetrics unassignProcMetrics; 117 protected ProcedureMetrics moveProcMetrics; 118 protected ProcedureMetrics reopenProcMetrics; 119 protected ProcedureMetrics openProcMetrics; 120 protected ProcedureMetrics closeProcMetrics; 121 122 protected long assignSubmittedCount = 0; 123 protected long assignFailedCount = 0; 124 protected long unassignSubmittedCount = 0; 125 protected long unassignFailedCount = 0; 126 protected long moveSubmittedCount = 0; 127 protected long moveFailedCount = 0; 128 protected long reopenSubmittedCount = 0; 129 protected long reopenFailedCount = 0; 130 protected long openSubmittedCount = 0; 131 protected long openFailedCount = 0; 132 protected long closeSubmittedCount = 0; 133 protected long closeFailedCount = 0; 134 135 protected int newRsAdded; 136 137 protected int getAssignMaxAttempts() { 138 // Have many so we succeed eventually. 139 return 1000; 140 } 141 142 protected void setupConfiguration(Configuration conf) throws Exception { 143 CommonFSUtils.setRootDir(conf, util.getDataTestDir()); 144 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 145 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 146 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 147 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 148 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); 149 // make retry for TRSP more frequent 150 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); 151 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); 152 } 153 154 @Before 155 public void setUp() throws Exception { 156 util = new HBaseTestingUtility(); 157 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 158 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 159 setupConfiguration(util.getConfiguration()); 160 master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers); 161 rsDispatcher = new MockRSProcedureDispatcher(master); 162 master.start(NSERVERS, rsDispatcher); 163 newRsAdded = 0; 164 am = master.getAssignmentManager(); 165 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 166 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 167 moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics(); 168 reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics(); 169 openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics(); 170 closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics(); 171 setUpMeta(); 172 } 173 174 protected void setUpMeta() throws Exception { 175 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 176 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 177 am.wakeMetaLoadedEvent(); 178 } 179 180 @After 181 public void tearDown() throws Exception { 182 master.stop("tearDown"); 183 this.executor.shutdownNow(); 184 } 185 186 protected class NoopRsExecutor implements MockRSExecutor { 187 @Override 188 public ExecuteProceduresResponse sendRequest(ServerName server, 189 ExecuteProceduresRequest request) throws IOException { 190 if (request.getOpenRegionCount() > 0) { 191 for (OpenRegionRequest req : request.getOpenRegionList()) { 192 for (RegionOpenInfo openReq : req.getOpenInfoList()) { 193 execOpenRegion(server, openReq); 194 } 195 } 196 } 197 if (request.getCloseRegionCount() > 0) { 198 for (CloseRegionRequest req : request.getCloseRegionList()) { 199 execCloseRegion(server, req.getRegion().getValue().toByteArray()); 200 } 201 } 202 return ExecuteProceduresResponse.newBuilder().build(); 203 } 204 205 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) 206 throws IOException { 207 return null; 208 } 209 210 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 211 throws IOException { 212 return null; 213 } 214 } 215 216 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 217 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 218 } 219 220 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 221 try { 222 return future.get(3, TimeUnit.MINUTES); 223 } catch (ExecutionException e) { 224 LOG.info("ExecutionException", e); 225 Exception ee = (Exception) e.getCause(); 226 if (ee instanceof InterruptedIOException) { 227 for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) { 228 LOG.info(p.toStringDetails()); 229 } 230 } 231 throw (Exception) e.getCause(); 232 } 233 } 234 235 // ============================================================================================ 236 // Helpers 237 // ============================================================================================ 238 protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception { 239 Thread[] threads = new Thread[PROC_NTHREADS]; 240 for (int i = 0; i < threads.length; ++i) { 241 final int threadId = i; 242 threads[i] = new Thread() { 243 @Override 244 public void run() { 245 TableName tableName = TableName.valueOf("table-" + threadId); 246 int n = (procs.length / threads.length); 247 int start = threadId * n; 248 int stop = start + n; 249 for (int j = start; j < stop; ++j) { 250 procs[j] = createAndSubmitAssign(tableName, j); 251 } 252 } 253 }; 254 threads[i].start(); 255 } 256 for (int i = 0; i < threads.length; ++i) { 257 threads[i].join(); 258 } 259 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 260 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 261 } 262 } 263 264 protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) { 265 RegionInfo hri = createRegionInfo(tableName, regionId); 266 TransitRegionStateProcedure proc = createAssignProcedure(hri); 267 master.getMasterProcedureExecutor().submitProcedure(proc); 268 return proc; 269 } 270 271 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 272 return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) 273 .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); 274 } 275 276 protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) { 277 return am.createAssignProcedures(Arrays.asList(hri))[0]; 278 } 279 280 protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) { 281 RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri); 282 TransitRegionStateProcedure proc; 283 regionNode.lock(); 284 try { 285 assertFalse(regionNode.isInTransition()); 286 proc = TransitRegionStateProcedure 287 .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri); 288 regionNode.setProcedure(proc); 289 } finally { 290 regionNode.unlock(); 291 } 292 return proc; 293 } 294 295 protected void sendTransitionReport(final ServerName serverName, 296 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 297 final TransitionCode state, long seqId) throws IOException { 298 ReportRegionStateTransitionRequest.Builder req = 299 ReportRegionStateTransitionRequest.newBuilder(); 300 req.setServer(ProtobufUtil.toServerName(serverName)); 301 req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) 302 .setTransitionCode(state).setOpenSeqNum(seqId).build()); 303 am.reportRegionStateTransition(req.build()); 304 } 305 306 protected void doCrash(final ServerName serverName) { 307 this.master.getServerManager().moveFromOnlineToDeadServers(serverName); 308 this.am.submitServerCrash(serverName, false/* No WALs here */, false); 309 // add a new server to avoid killing all the region servers which may hang the UTs 310 ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1); 311 newRsAdded++; 312 try { 313 this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder 314 .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 315 } catch (YouAreDeadException e) { 316 // should not happen 317 throw new UncheckedIOException(e); 318 } 319 } 320 321 protected void doRestart(final ServerName serverName) { 322 try { 323 this.master.restartRegionServer(serverName); 324 } catch (IOException e) { 325 LOG.warn("Can not restart RS with new startcode"); 326 } 327 } 328 329 protected class GoodRsExecutor extends NoopRsExecutor { 330 @Override 331 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) 332 throws IOException { 333 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 334 long previousOpenSeqNum = 335 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 336 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 337 previousOpenSeqNum + 2); 338 // Concurrency? 339 // Now update the state of our cluster in regionsToRegionServers. 340 SortedSet<byte[]> regions = regionsToRegionServers.get(server); 341 if (regions == null) { 342 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 343 regionsToRegionServers.put(server, regions); 344 } 345 if (regions.contains(hri.getRegionName())) { 346 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 347 } 348 regions.add(hri.getRegionName()); 349 return RegionOpeningState.OPENED; 350 } 351 352 @Override 353 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 354 throws IOException { 355 RegionInfo hri = am.getRegionInfo(regionName); 356 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 357 return CloseRegionResponse.newBuilder().setClosed(true).build(); 358 } 359 } 360 361 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 362 @Override 363 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 364 throws IOException { 365 throw new ServerNotRunningYetException("wait on server startup"); 366 } 367 } 368 369 protected static class FaultyRsExecutor implements MockRSExecutor { 370 private final IOException exception; 371 372 public FaultyRsExecutor(final IOException exception) { 373 this.exception = exception; 374 } 375 376 @Override 377 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 378 throws IOException { 379 throw exception; 380 } 381 } 382 383 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 384 private final int timeoutTimes; 385 386 private ServerName lastServer; 387 private int retries; 388 389 public SocketTimeoutRsExecutor(int timeoutTimes) { 390 this.timeoutTimes = timeoutTimes; 391 } 392 393 @Override 394 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 395 throws IOException { 396 // SocketTimeoutException should be a temporary problem 397 // unless the server will be declared dead. 398 retries++; 399 if (retries == 1) { 400 lastServer = server; 401 } 402 if (retries <= timeoutTimes) { 403 LOG.debug("Socket timeout for server=" + server + " retries=" + retries); 404 // should not change the server if the server is not dead yet. 405 assertEquals(lastServer, server); 406 if (retries == timeoutTimes) { 407 LOG.info("Mark server=" + server + " as dead. retries=" + retries); 408 master.getServerManager().moveFromOnlineToDeadServers(server); 409 executor.schedule(new Runnable() { 410 @Override 411 public void run() { 412 LOG.info("Sending in CRASH of " + server); 413 doCrash(server); 414 } 415 }, 1, TimeUnit.SECONDS); 416 } 417 throw new SocketTimeoutException("simulate socket timeout"); 418 } else { 419 // should select another server 420 assertNotEquals(lastServer, server); 421 return super.sendRequest(server, req); 422 } 423 } 424 } 425 426 protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { 427 428 private boolean invoked = false; 429 430 private ServerName lastServer; 431 432 @Override 433 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 434 throws IOException { 435 if (!invoked) { 436 lastServer = server; 437 invoked = true; 438 throw new CallQueueTooBigException("simulate queue full"); 439 } 440 // better select another server since the server is over loaded, but anyway, it is fine to 441 // still select the same server since it is not dead yet... 442 if (lastServer.equals(server)) { 443 LOG.warn("We still select the same server, which is not good."); 444 } 445 return super.sendRequest(server, req); 446 } 447 } 448 449 protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { 450 451 private final int queueFullTimes; 452 453 private int retries; 454 455 private ServerName lastServer; 456 457 public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { 458 this.queueFullTimes = queueFullTimes; 459 } 460 461 @Override 462 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 463 throws IOException { 464 retries++; 465 if (retries == 1) { 466 lastServer = server; 467 throw new CallTimeoutException("simulate call timeout"); 468 } 469 // should always retry on the same server 470 assertEquals(lastServer, server); 471 if (retries < queueFullTimes) { 472 throw new CallQueueTooBigException("simulate queue full"); 473 } 474 return super.sendRequest(server, req); 475 } 476 } 477 478 /** 479 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 480 * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this 481 * server after a few seconds; crash is supposed to take care of the suspended procedures. 482 */ 483 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 484 private int invocations; 485 486 @Override 487 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 488 throws IOException { 489 if (this.invocations++ > 0) { 490 // Return w/o problem the second time through here. 491 return super.execOpenRegion(server, openReq); 492 } 493 // The procedure on master will just hang forever because nothing comes back 494 // from the RS in this case. 495 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 496 executor.schedule(new Runnable() { 497 @Override 498 public void run() { 499 LOG.info("Sending in CRASH of " + server); 500 doCrash(server); 501 } 502 }, 1, TimeUnit.SECONDS); 503 return null; 504 } 505 } 506 507 /** 508 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 509 * proc is stuck/suspended on the Master and won't wake up.). Different with 510 * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the 511 * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for 512 * RS crashed when SCP is not enabled. 513 */ 514 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 515 private int invocations; 516 517 @Override 518 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 519 throws IOException { 520 if (this.invocations++ > 0) { 521 // Return w/o problem the second time through here. 522 return super.execOpenRegion(server, openReq); 523 } 524 // The procedure on master will just hang forever because nothing comes back 525 // from the RS in this case. 526 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 527 executor.schedule(new Runnable() { 528 @Override 529 public void run() { 530 LOG.info("Restarting RS of " + server); 531 doRestart(server); 532 } 533 }, 1, TimeUnit.SECONDS); 534 return null; 535 } 536 } 537 538 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 539 public static final int TYPES_OF_FAILURE = 6; 540 private int invocations; 541 542 @Override 543 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 544 throws IOException { 545 switch (this.invocations++) { 546 case 0: 547 throw new NotServingRegionException("Fake"); 548 case 1: 549 executor.schedule(new Runnable() { 550 @Override 551 public void run() { 552 LOG.info("Sending in CRASH of " + server); 553 doCrash(server); 554 } 555 }, 1, TimeUnit.SECONDS); 556 throw new RegionServerAbortedException("Fake!"); 557 case 2: 558 executor.schedule(new Runnable() { 559 @Override 560 public void run() { 561 LOG.info("Sending in CRASH of " + server); 562 doCrash(server); 563 } 564 }, 1, TimeUnit.SECONDS); 565 throw new RegionServerStoppedException("Fake!"); 566 case 3: 567 throw new ServerNotRunningYetException("Fake!"); 568 case 4: 569 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 570 executor.schedule(new Runnable() { 571 @Override 572 public void run() { 573 LOG.info("Sending in CRASH of " + server); 574 doCrash(server); 575 } 576 }, 1, TimeUnit.SECONDS); 577 return null; 578 default: 579 return super.execCloseRegion(server, regionName); 580 } 581 } 582 } 583 584 protected class RandRsExecutor extends NoopRsExecutor { 585 @Override 586 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 587 throws IOException { 588 switch (ThreadLocalRandom.current().nextInt(5)) { 589 case 0: 590 throw new ServerNotRunningYetException("wait on server startup"); 591 case 1: 592 throw new SocketTimeoutException("simulate socket timeout"); 593 case 2: 594 throw new RemoteException("java.io.IOException", "unexpected exception"); 595 default: 596 // fall out 597 } 598 return super.sendRequest(server, req); 599 } 600 601 @Override 602 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 603 throws IOException { 604 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 605 long previousOpenSeqNum = 606 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 607 switch (ThreadLocalRandom.current().nextInt(3)) { 608 case 0: 609 LOG.info("Return OPENED response"); 610 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 611 previousOpenSeqNum + 2); 612 return OpenRegionResponse.RegionOpeningState.OPENED; 613 case 1: 614 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 615 sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); 616 return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 617 default: 618 // fall out 619 } 620 // The procedure on master will just hang forever because nothing comes back 621 // from the RS in this case. 622 LOG.info("Return null as response; means proc stuck so we send in a crash report after" 623 + " a few seconds..."); 624 executor.schedule(new Runnable() { 625 @Override 626 public void run() { 627 LOG.info("Delayed CRASHING of " + server); 628 doCrash(server); 629 } 630 }, 5, TimeUnit.SECONDS); 631 return null; 632 } 633 634 @Override 635 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 636 throws IOException { 637 CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); 638 boolean closed = ThreadLocalRandom.current().nextBoolean(); 639 if (closed) { 640 RegionInfo hri = am.getRegionInfo(regionName); 641 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 642 } 643 resp.setClosed(closed); 644 return resp.build(); 645 } 646 } 647 648 protected interface MockRSExecutor { 649 ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 650 throws IOException; 651 } 652 653 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 654 private MockRSExecutor mockRsExec; 655 656 public MockRSProcedureDispatcher(final MasterServices master) { 657 super(master); 658 } 659 660 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 661 this.mockRsExec = mockRsExec; 662 } 663 664 @Override 665 protected void remoteDispatch(ServerName serverName, 666 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 667 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 668 } 669 670 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 671 public MockRemoteCall(final ServerName serverName, 672 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 673 super(serverName, operations); 674 } 675 676 @Override 677 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 678 final ExecuteProceduresRequest request) throws IOException { 679 return mockRsExec.sendRequest(serverName, request); 680 } 681 } 682 } 683 684 protected final void collectAssignmentManagerMetrics() { 685 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 686 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 687 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 688 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 689 moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount(); 690 moveFailedCount = moveProcMetrics.getFailedCounter().getCount(); 691 reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount(); 692 reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount(); 693 openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount(); 694 openFailedCount = openProcMetrics.getFailedCounter().getCount(); 695 closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount(); 696 closeFailedCount = closeProcMetrics.getFailedCounter().getCount(); 697 } 698}