001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotEquals; 023 024import java.io.IOException; 025import java.io.InterruptedIOException; 026import java.io.UncheckedIOException; 027import java.net.SocketTimeoutException; 028import java.util.Arrays; 029import java.util.NavigableMap; 030import java.util.Set; 031import java.util.SortedSet; 032import java.util.concurrent.ConcurrentSkipListMap; 033import java.util.concurrent.ConcurrentSkipListSet; 034import java.util.concurrent.ExecutionException; 035import java.util.concurrent.Executors; 036import java.util.concurrent.Future; 037import java.util.concurrent.ScheduledExecutorService; 038import java.util.concurrent.ThreadLocalRandom; 039import java.util.concurrent.TimeUnit; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.CallQueueTooBigException; 042import org.apache.hadoop.hbase.HBaseTestingUtil; 043import org.apache.hadoop.hbase.NotServingRegionException; 044import org.apache.hadoop.hbase.ServerMetricsBuilder; 045import org.apache.hadoop.hbase.ServerName; 046import org.apache.hadoop.hbase.TableName; 047import org.apache.hadoop.hbase.YouAreDeadException; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.ipc.CallTimeoutException; 051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 052import org.apache.hadoop.hbase.master.MasterServices; 053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 059import org.apache.hadoop.hbase.procedure2.ProcedureUtil; 060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 063import org.apache.hadoop.hbase.util.Bytes; 064import org.apache.hadoop.hbase.util.CommonFSUtils; 065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 066import org.apache.hadoop.ipc.RemoteException; 067import org.junit.After; 068import org.junit.Before; 069import org.junit.Rule; 070import org.junit.rules.TestName; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 075 076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; 088 089/** 090 * Base class for AM test. 091 */ 092public abstract class TestAssignmentManagerBase { 093 094 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); 095 096 @Rule 097 public TestName name = new TestName(); 098 099 protected static final int PROC_NTHREADS = 64; 100 protected static final int NREGIONS = 1 * 1000; 101 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 102 103 protected HBaseTestingUtil util; 104 protected MockRSProcedureDispatcher rsDispatcher; 105 protected MockMasterServices master; 106 protected AssignmentManager am; 107 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 108 new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>(); 109 // Simple executor to run some simple tasks. 110 protected ScheduledExecutorService executor; 111 112 protected ProcedureMetrics assignProcMetrics; 113 protected ProcedureMetrics unassignProcMetrics; 114 protected ProcedureMetrics moveProcMetrics; 115 protected ProcedureMetrics reopenProcMetrics; 116 protected ProcedureMetrics openProcMetrics; 117 protected ProcedureMetrics closeProcMetrics; 118 119 protected long assignSubmittedCount = 0; 120 protected long assignFailedCount = 0; 121 protected long unassignSubmittedCount = 0; 122 protected long unassignFailedCount = 0; 123 protected long moveSubmittedCount = 0; 124 protected long moveFailedCount = 0; 125 protected long reopenSubmittedCount = 0; 126 protected long reopenFailedCount = 0; 127 protected long openSubmittedCount = 0; 128 protected long openFailedCount = 0; 129 protected long closeSubmittedCount = 0; 130 protected long closeFailedCount = 0; 131 132 protected int newRsAdded; 133 134 protected int getAssignMaxAttempts() { 135 // Have many so we succeed eventually. 136 return 1000; 137 } 138 139 protected void setupConfiguration(Configuration conf) throws Exception { 140 CommonFSUtils.setRootDir(conf, util.getDataTestDir()); 141 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 142 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 143 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 144 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 145 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts()); 146 // make retry for TRSP more frequent 147 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10); 148 conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100); 149 conf.setInt("hbase.master.rs.remote.proc.fail.fast.limit", Integer.MAX_VALUE); 150 } 151 152 @Before 153 public void setUp() throws Exception { 154 util = new HBaseTestingUtil(); 155 this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder() 156 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build()); 157 setupConfiguration(util.getConfiguration()); 158 master = new MockMasterServices(util.getConfiguration()); 159 rsDispatcher = new MockRSProcedureDispatcher(master); 160 master.start(NSERVERS, rsDispatcher); 161 newRsAdded = 0; 162 am = master.getAssignmentManager(); 163 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 164 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 165 moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics(); 166 reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics(); 167 openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics(); 168 closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics(); 169 setUpMeta(); 170 } 171 172 protected void setUpMeta() throws Exception { 173 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 174 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 175 am.wakeMetaLoadedEvent(); 176 } 177 178 @After 179 public void tearDown() throws Exception { 180 master.stop("tearDown"); 181 this.executor.shutdownNow(); 182 } 183 184 protected class NoopRsExecutor implements MockRSExecutor { 185 @Override 186 public ExecuteProceduresResponse sendRequest(ServerName server, 187 ExecuteProceduresRequest request) throws IOException { 188 if (request.getOpenRegionCount() > 0) { 189 for (OpenRegionRequest req : request.getOpenRegionList()) { 190 for (RegionOpenInfo openReq : req.getOpenInfoList()) { 191 execOpenRegion(server, openReq); 192 } 193 } 194 } 195 if (request.getCloseRegionCount() > 0) { 196 for (CloseRegionRequest req : request.getCloseRegionList()) { 197 execCloseRegion(server, req.getRegion().getValue().toByteArray()); 198 } 199 } 200 return ExecuteProceduresResponse.newBuilder().build(); 201 } 202 203 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo) 204 throws IOException { 205 return null; 206 } 207 208 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 209 throws IOException { 210 return null; 211 } 212 } 213 214 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 215 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 216 } 217 218 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 219 try { 220 return future.get(3, TimeUnit.MINUTES); 221 } catch (ExecutionException e) { 222 LOG.info("ExecutionException", e); 223 Exception ee = (Exception) e.getCause(); 224 if (ee instanceof InterruptedIOException) { 225 for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) { 226 LOG.info(p.toStringDetails()); 227 } 228 } 229 throw (Exception) e.getCause(); 230 } 231 } 232 233 // ============================================================================================ 234 // Helpers 235 // ============================================================================================ 236 protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception { 237 Thread[] threads = new Thread[PROC_NTHREADS]; 238 for (int i = 0; i < threads.length; ++i) { 239 final int threadId = i; 240 threads[i] = new Thread() { 241 @Override 242 public void run() { 243 TableName tableName = TableName.valueOf("table-" + threadId); 244 int n = (procs.length / threads.length); 245 int start = threadId * n; 246 int stop = start + n; 247 for (int j = start; j < stop; ++j) { 248 procs[j] = createAndSubmitAssign(tableName, j); 249 } 250 } 251 }; 252 threads[i].start(); 253 } 254 for (int i = 0; i < threads.length; ++i) { 255 threads[i].join(); 256 } 257 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 258 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 259 } 260 } 261 262 protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) { 263 RegionInfo hri = createRegionInfo(tableName, regionId); 264 TransitRegionStateProcedure proc = createAssignProcedure(hri); 265 master.getMasterProcedureExecutor().submitProcedure(proc); 266 return proc; 267 } 268 269 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 270 return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) 271 .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); 272 } 273 274 protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) { 275 return am.createAssignProcedures(Arrays.asList(hri))[0]; 276 } 277 278 protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) { 279 RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri); 280 TransitRegionStateProcedure proc; 281 regionNode.lock(); 282 try { 283 assertFalse(regionNode.isInTransition()); 284 proc = TransitRegionStateProcedure 285 .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri); 286 regionNode.setProcedure(proc); 287 } finally { 288 regionNode.unlock(); 289 } 290 return proc; 291 } 292 293 protected void sendTransitionReport(final ServerName serverName, 294 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 295 final TransitionCode state, long seqId) throws IOException { 296 ReportRegionStateTransitionRequest.Builder req = 297 ReportRegionStateTransitionRequest.newBuilder(); 298 req.setServer(ProtobufUtil.toServerName(serverName)); 299 req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo) 300 .setTransitionCode(state).setOpenSeqNum(seqId).build()); 301 am.reportRegionStateTransition(req.build()); 302 } 303 304 protected void doCrash(final ServerName serverName) { 305 this.master.getServerManager().moveFromOnlineToDeadServers(serverName); 306 this.am.submitServerCrash(serverName, false/* No WALs here */, false); 307 // add a new server to avoid killing all the region servers which may hang the UTs 308 ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1); 309 newRsAdded++; 310 try { 311 this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder 312 .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build()); 313 } catch (YouAreDeadException e) { 314 // should not happen 315 throw new UncheckedIOException(e); 316 } 317 } 318 319 protected void doRestart(final ServerName serverName) { 320 try { 321 this.master.restartRegionServer(serverName); 322 } catch (IOException e) { 323 LOG.warn("Can not restart RS with new startcode"); 324 } 325 } 326 327 protected class GoodRsExecutor extends NoopRsExecutor { 328 @Override 329 protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq) 330 throws IOException { 331 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 332 long previousOpenSeqNum = 333 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 334 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 335 previousOpenSeqNum + 2); 336 // Concurrency? 337 // Now update the state of our cluster in regionsToRegionServers. 338 SortedSet<byte[]> regions = regionsToRegionServers.get(server); 339 if (regions == null) { 340 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 341 regionsToRegionServers.put(server, regions); 342 } 343 if (regions.contains(hri.getRegionName())) { 344 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 345 } 346 regions.add(hri.getRegionName()); 347 return RegionOpeningState.OPENED; 348 } 349 350 @Override 351 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 352 throws IOException { 353 RegionInfo hri = am.getRegionInfo(regionName); 354 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 355 return CloseRegionResponse.newBuilder().setClosed(true).build(); 356 } 357 } 358 359 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 360 @Override 361 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 362 throws IOException { 363 throw new ServerNotRunningYetException("wait on server startup"); 364 } 365 } 366 367 protected static class FaultyRsExecutor implements MockRSExecutor { 368 private final IOException exception; 369 370 public FaultyRsExecutor(final IOException exception) { 371 this.exception = exception; 372 } 373 374 @Override 375 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 376 throws IOException { 377 throw exception; 378 } 379 } 380 381 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 382 private final int timeoutTimes; 383 384 private ServerName lastServer; 385 private int retries; 386 387 public SocketTimeoutRsExecutor(int timeoutTimes) { 388 this.timeoutTimes = timeoutTimes; 389 } 390 391 @Override 392 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 393 throws IOException { 394 // SocketTimeoutException should be a temporary problem 395 // unless the server will be declared dead. 396 retries++; 397 if (retries == 1) { 398 lastServer = server; 399 } 400 if (retries <= timeoutTimes) { 401 LOG.debug("Socket timeout for server=" + server + " retries=" + retries); 402 // should not change the server if the server is not dead yet. 403 assertEquals(lastServer, server); 404 if (retries == timeoutTimes) { 405 LOG.info("Mark server=" + server + " as dead. retries=" + retries); 406 master.getServerManager().moveFromOnlineToDeadServers(server); 407 executor.schedule(new Runnable() { 408 @Override 409 public void run() { 410 LOG.info("Sending in CRASH of " + server); 411 doCrash(server); 412 } 413 }, 1, TimeUnit.SECONDS); 414 } 415 throw new SocketTimeoutException("simulate socket timeout"); 416 } else { 417 // should select another server 418 assertNotEquals(lastServer, server); 419 return super.sendRequest(server, req); 420 } 421 } 422 } 423 424 protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { 425 426 private boolean invoked = false; 427 428 private ServerName lastServer; 429 430 @Override 431 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 432 throws IOException { 433 if (!invoked) { 434 lastServer = server; 435 invoked = true; 436 throw new CallQueueTooBigException("simulate queue full"); 437 } 438 // better select another server since the server is over loaded, but anyway, it is fine to 439 // still select the same server since it is not dead yet... 440 if (lastServer.equals(server)) { 441 LOG.warn("We still select the same server, which is not good."); 442 } 443 return super.sendRequest(server, req); 444 } 445 } 446 447 protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { 448 449 private final int queueFullTimes; 450 451 private int retries; 452 453 private ServerName lastServer; 454 455 public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { 456 this.queueFullTimes = queueFullTimes; 457 } 458 459 @Override 460 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 461 throws IOException { 462 retries++; 463 if (retries == 1) { 464 lastServer = server; 465 throw new CallTimeoutException("simulate call timeout"); 466 } 467 // should always retry on the same server 468 assertEquals(lastServer, server); 469 if (retries < queueFullTimes) { 470 throw new CallQueueTooBigException("simulate queue full"); 471 } 472 return super.sendRequest(server, req); 473 } 474 } 475 476 /** 477 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 478 * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this 479 * server after a few seconds; crash is supposed to take care of the suspended procedures. 480 */ 481 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 482 private int invocations; 483 484 @Override 485 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 486 throws IOException { 487 if (this.invocations++ > 0) { 488 // Return w/o problem the second time through here. 489 return super.execOpenRegion(server, openReq); 490 } 491 // The procedure on master will just hang forever because nothing comes back 492 // from the RS in this case. 493 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 494 executor.schedule(new Runnable() { 495 @Override 496 public void run() { 497 LOG.info("Sending in CRASH of " + server); 498 doCrash(server); 499 } 500 }, 1, TimeUnit.SECONDS); 501 return null; 502 } 503 } 504 505 /** 506 * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so 507 * proc is stuck/suspended on the Master and won't wake up.). Different with 508 * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the 509 * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for 510 * RS crashed when SCP is not enabled. 511 */ 512 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 513 private int invocations; 514 515 @Override 516 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 517 throws IOException { 518 if (this.invocations++ > 0) { 519 // Return w/o problem the second time through here. 520 return super.execOpenRegion(server, openReq); 521 } 522 // The procedure on master will just hang forever because nothing comes back 523 // from the RS in this case. 524 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 525 executor.schedule(new Runnable() { 526 @Override 527 public void run() { 528 LOG.info("Restarting RS of " + server); 529 doRestart(server); 530 } 531 }, 1, TimeUnit.SECONDS); 532 return null; 533 } 534 } 535 536 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 537 public static final int TYPES_OF_FAILURE = 6; 538 private int invocations; 539 540 @Override 541 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 542 throws IOException { 543 switch (this.invocations++) { 544 case 0: 545 throw new NotServingRegionException("Fake"); 546 case 1: 547 executor.schedule(new Runnable() { 548 @Override 549 public void run() { 550 LOG.info("Sending in CRASH of " + server); 551 doCrash(server); 552 } 553 }, 1, TimeUnit.SECONDS); 554 throw new RegionServerAbortedException("Fake!"); 555 case 2: 556 executor.schedule(new Runnable() { 557 @Override 558 public void run() { 559 LOG.info("Sending in CRASH of " + server); 560 doCrash(server); 561 } 562 }, 1, TimeUnit.SECONDS); 563 throw new RegionServerStoppedException("Fake!"); 564 case 3: 565 throw new ServerNotRunningYetException("Fake!"); 566 case 4: 567 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 568 executor.schedule(new Runnable() { 569 @Override 570 public void run() { 571 LOG.info("Sending in CRASH of " + server); 572 doCrash(server); 573 } 574 }, 1, TimeUnit.SECONDS); 575 return null; 576 default: 577 return super.execCloseRegion(server, regionName); 578 } 579 } 580 } 581 582 protected class RandRsExecutor extends NoopRsExecutor { 583 @Override 584 public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 585 throws IOException { 586 switch (ThreadLocalRandom.current().nextInt(5)) { 587 case 0: 588 throw new ServerNotRunningYetException("wait on server startup"); 589 case 1: 590 throw new SocketTimeoutException("simulate socket timeout"); 591 case 2: 592 throw new RemoteException("java.io.IOException", "unexpected exception"); 593 default: 594 // fall out 595 } 596 return super.sendRequest(server, req); 597 } 598 599 @Override 600 protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq) 601 throws IOException { 602 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 603 long previousOpenSeqNum = 604 am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum(); 605 switch (ThreadLocalRandom.current().nextInt(3)) { 606 case 0: 607 LOG.info("Return OPENED response"); 608 sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED, 609 previousOpenSeqNum + 2); 610 return OpenRegionResponse.RegionOpeningState.OPENED; 611 case 1: 612 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 613 sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1); 614 return OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 615 default: 616 // fall out 617 } 618 // The procedure on master will just hang forever because nothing comes back 619 // from the RS in this case. 620 LOG.info("Return null as response; means proc stuck so we send in a crash report after" 621 + " a few seconds..."); 622 executor.schedule(new Runnable() { 623 @Override 624 public void run() { 625 LOG.info("Delayed CRASHING of " + server); 626 doCrash(server); 627 } 628 }, 5, TimeUnit.SECONDS); 629 return null; 630 } 631 632 @Override 633 protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 634 throws IOException { 635 CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder(); 636 boolean closed = ThreadLocalRandom.current().nextBoolean(); 637 if (closed) { 638 RegionInfo hri = am.getRegionInfo(regionName); 639 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1); 640 } 641 resp.setClosed(closed); 642 return resp.build(); 643 } 644 } 645 646 protected interface MockRSExecutor { 647 ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req) 648 throws IOException; 649 } 650 651 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 652 private MockRSExecutor mockRsExec; 653 654 public MockRSProcedureDispatcher(final MasterServices master) { 655 super(master); 656 } 657 658 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 659 this.mockRsExec = mockRsExec; 660 } 661 662 @Override 663 protected void remoteDispatch(ServerName serverName, 664 @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) { 665 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 666 } 667 668 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 669 public MockRemoteCall(final ServerName serverName, 670 @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) { 671 super(serverName, operations); 672 } 673 674 @Override 675 protected ExecuteProceduresResponse sendRequest(final ServerName serverName, 676 final ExecuteProceduresRequest request) throws IOException { 677 return mockRsExec.sendRequest(serverName, request); 678 } 679 } 680 } 681 682 protected final void collectAssignmentManagerMetrics() { 683 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 684 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 685 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 686 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 687 moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount(); 688 moveFailedCount = moveProcMetrics.getFailedCounter().getCount(); 689 reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount(); 690 reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount(); 691 openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount(); 692 openFailedCount = openProcMetrics.getFailedCounter().getCount(); 693 closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount(); 694 closeFailedCount = closeProcMetrics.getFailedCounter().getCount(); 695 } 696}