001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.assignment; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNotEquals; 022 023import java.io.IOException; 024import java.io.InterruptedIOException; 025import java.net.SocketTimeoutException; 026import java.util.List; 027import java.util.NavigableMap; 028import java.util.Random; 029import java.util.Set; 030import java.util.SortedSet; 031import java.util.concurrent.ConcurrentSkipListMap; 032import java.util.concurrent.ConcurrentSkipListSet; 033import java.util.concurrent.ExecutionException; 034import java.util.concurrent.Executors; 035import java.util.concurrent.Future; 036import java.util.concurrent.ScheduledExecutorService; 037import java.util.concurrent.TimeUnit; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.hbase.CallQueueTooBigException; 041import org.apache.hadoop.hbase.HBaseTestingUtility; 042import org.apache.hadoop.hbase.NotServingRegionException; 043import org.apache.hadoop.hbase.ServerName; 044import org.apache.hadoop.hbase.TableName; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionInfoBuilder; 047import org.apache.hadoop.hbase.ipc.CallTimeoutException; 048import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 049import org.apache.hadoop.hbase.master.MasterServices; 050import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; 051import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 052import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait; 053import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher; 054import org.apache.hadoop.hbase.procedure2.Procedure; 055import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; 056import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; 057import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException; 058import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; 059import org.apache.hadoop.hbase.util.Bytes; 060import org.apache.hadoop.hbase.util.FSUtils; 061import org.apache.hadoop.ipc.RemoteException; 062import org.junit.After; 063import org.junit.Before; 064import org.junit.Rule; 065import org.junit.rules.ExpectedException; 066import org.junit.rules.TestName; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; 073 074/** 075 * Base class for AM test. 076 */ 077public class TestAssignmentManagerBase { 078 private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class); 079 080 @Rule 081 public TestName name = new TestName(); 082 @Rule 083 public final ExpectedException exception = ExpectedException.none(); 084 085 private static final int PROC_NTHREADS = 64; 086 protected static final int NREGIONS = 1 * 1000; 087 protected static final int NSERVERS = Math.max(1, NREGIONS / 100); 088 089 protected HBaseTestingUtility UTIL; 090 protected MockRSProcedureDispatcher rsDispatcher; 091 protected MockMasterServices master; 092 protected AssignmentManager am; 093 protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers = 094 new ConcurrentSkipListMap<>(); 095 // Simple executor to run some simple tasks. 096 protected ScheduledExecutorService executor; 097 098 protected ProcedureMetrics assignProcMetrics; 099 protected ProcedureMetrics unassignProcMetrics; 100 101 protected long assignSubmittedCount = 0; 102 protected long assignFailedCount = 0; 103 protected long unassignSubmittedCount = 0; 104 protected long unassignFailedCount = 0; 105 106 protected void setupConfiguration(Configuration conf) throws Exception { 107 FSUtils.setRootDir(conf, UTIL.getDataTestDir()); 108 conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false); 109 conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10); 110 conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS); 111 conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000); 112 conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually. 113 } 114 115 @Before 116 public void setUp() throws Exception { 117 UTIL = new HBaseTestingUtility(); 118 this.executor = Executors.newSingleThreadScheduledExecutor(); 119 setupConfiguration(UTIL.getConfiguration()); 120 master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers); 121 rsDispatcher = new MockRSProcedureDispatcher(master); 122 master.start(NSERVERS, rsDispatcher); 123 am = master.getAssignmentManager(); 124 assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics(); 125 unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics(); 126 setUpMeta(); 127 } 128 129 private void setUpMeta() throws Exception { 130 rsDispatcher.setMockRsExecutor(new GoodRsExecutor()); 131 am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO); 132 am.wakeMetaLoadedEvent(); 133 } 134 135 @After 136 public void tearDown() throws Exception { 137 master.stop("tearDown"); 138 this.executor.shutdownNow(); 139 } 140 141 protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) { 142 return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc); 143 } 144 145 protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception { 146 try { 147 return future.get(5, TimeUnit.SECONDS); 148 } catch (ExecutionException e) { 149 LOG.info("ExecutionException", e); 150 Exception ee = (Exception) e.getCause(); 151 if (ee instanceof InterruptedIOException) { 152 for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) { 153 LOG.info(p.toStringDetails()); 154 } 155 } 156 throw (Exception) e.getCause(); 157 } 158 } 159 160 // ============================================================================================ 161 // Helpers 162 // ============================================================================================ 163 protected void bulkSubmit(final AssignProcedure[] procs) throws Exception { 164 final Thread[] threads = new Thread[PROC_NTHREADS]; 165 for (int i = 0; i < threads.length; ++i) { 166 final int threadId = i; 167 threads[i] = new Thread() { 168 @Override 169 public void run() { 170 TableName tableName = TableName.valueOf("table-" + threadId); 171 int n = (procs.length / threads.length); 172 int start = threadId * n; 173 int stop = start + n; 174 for (int j = start; j < stop; ++j) { 175 procs[j] = createAndSubmitAssign(tableName, j); 176 } 177 } 178 }; 179 threads[i].start(); 180 } 181 for (int i = 0; i < threads.length; ++i) { 182 threads[i].join(); 183 } 184 for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) { 185 procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i); 186 } 187 } 188 189 private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) { 190 RegionInfo hri = createRegionInfo(tableName, regionId); 191 AssignProcedure proc = am.createAssignProcedure(hri); 192 master.getMasterProcedureExecutor().submitProcedure(proc); 193 return proc; 194 } 195 196 protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) { 197 return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId)) 198 .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build(); 199 } 200 201 private void sendTransitionReport(final ServerName serverName, 202 final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo, 203 final RegionServerStatusProtos.RegionStateTransition.TransitionCode state) 204 throws IOException { 205 RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req = 206 RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder(); 207 req.setServer(ProtobufUtil.toServerName(serverName)); 208 req.addTransition( 209 RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo) 210 .setTransitionCode(state).setOpenSeqNum(1).build()); 211 am.reportRegionStateTransition(req.build()); 212 } 213 214 private void doCrash(final ServerName serverName) { 215 this.am.submitServerCrash(serverName, false/*No WALs here*/); 216 } 217 218 private void doRestart(final ServerName serverName) { 219 try { 220 this.master.restartRegionServer(serverName); 221 } catch (IOException e) { 222 LOG.warn("Can not restart RS with new startcode"); 223 } 224 } 225 226 private class NoopRsExecutor implements MockRSExecutor { 227 @Override 228 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 229 AdminProtos.ExecuteProceduresRequest request) throws IOException { 230 if (request.getOpenRegionCount() > 0) { 231 for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) { 232 for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : req.getOpenInfoList()) { 233 execOpenRegion(server, openReq); 234 } 235 } 236 } 237 if (request.getCloseRegionCount() > 0) { 238 for (AdminProtos.CloseRegionRequest req : request.getCloseRegionList()) { 239 execCloseRegion(server, req.getRegion().getValue().toByteArray()); 240 } 241 } 242 return AdminProtos.ExecuteProceduresResponse.newBuilder().build(); 243 } 244 245 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 246 AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException { 247 return null; 248 } 249 250 protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 251 throws IOException { 252 return null; 253 } 254 } 255 256 protected class GoodRsExecutor extends NoopRsExecutor { 257 @Override 258 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server, 259 AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws IOException { 260 sendTransitionReport(server, openReq.getRegion(), 261 RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); 262 // Concurrency? 263 // Now update the state of our cluster in regionsToRegionServers. 264 SortedSet<byte[]> regions = regionsToRegionServers.get(server); 265 if (regions == null) { 266 regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR); 267 regionsToRegionServers.put(server, regions); 268 } 269 RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion()); 270 if (regions.contains(hri.getRegionName())) { 271 throw new UnsupportedOperationException(hri.getRegionNameAsString()); 272 } 273 regions.add(hri.getRegionName()); 274 return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED; 275 } 276 277 @Override 278 protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 279 throws IOException { 280 RegionInfo hri = am.getRegionInfo(regionName); 281 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), 282 RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED); 283 return AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build(); 284 } 285 } 286 287 protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor { 288 @Override 289 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 290 AdminProtos.ExecuteProceduresRequest req) throws IOException { 291 throw new ServerNotRunningYetException("wait on server startup"); 292 } 293 } 294 295 protected static class FaultyRsExecutor implements MockRSExecutor { 296 private final IOException exception; 297 298 public FaultyRsExecutor(final IOException exception) { 299 this.exception = exception; 300 } 301 302 @Override 303 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 304 AdminProtos.ExecuteProceduresRequest req) throws IOException { 305 throw exception; 306 } 307 } 308 309 protected class SocketTimeoutRsExecutor extends GoodRsExecutor { 310 private final int maxSocketTimeoutRetries; 311 private final int maxServerRetries; 312 313 private ServerName lastServer; 314 private int sockTimeoutRetries; 315 private int serverRetries; 316 317 public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) { 318 this.maxServerRetries = maxServerRetries; 319 this.maxSocketTimeoutRetries = maxSocketTimeoutRetries; 320 } 321 322 @Override 323 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 324 AdminProtos.ExecuteProceduresRequest req) throws IOException { 325 // SocketTimeoutException should be a temporary problem 326 // unless the server will be declared dead. 327 if (sockTimeoutRetries++ < maxSocketTimeoutRetries) { 328 if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server); 329 lastServer = server; 330 LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries); 331 throw new SocketTimeoutException("simulate socket timeout"); 332 } else if (serverRetries++ < maxServerRetries) { 333 LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries); 334 master.getServerManager().moveFromOnlineToDeadServers(server); 335 sockTimeoutRetries = 0; 336 throw new SocketTimeoutException("simulate socket timeout"); 337 } else { 338 return super.sendRequest(server, req); 339 } 340 } 341 } 342 343 /** 344 * Takes open request and then returns nothing so acts like a RS that went zombie. 345 * No response (so proc is stuck/suspended on the Master and won't wake up.). We 346 * then send in a crash for this server after a few seconds; crash is supposed to 347 * take care of the suspended procedures. 348 */ 349 protected class HangThenRSCrashExecutor extends GoodRsExecutor { 350 private int invocations; 351 352 @Override 353 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( 354 final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) 355 throws IOException { 356 if (this.invocations++ > 0) { 357 // Return w/o problem the second time through here. 358 return super.execOpenRegion(server, openReq); 359 } 360 // The procedure on master will just hang forever because nothing comes back 361 // from the RS in this case. 362 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 363 executor.schedule(new Runnable() { 364 @Override 365 public void run() { 366 LOG.info("Sending in CRASH of " + server); 367 doCrash(server); 368 } 369 }, 1, TimeUnit.SECONDS); 370 return null; 371 } 372 } 373 374 /** 375 * Takes open request and then returns nothing so acts like a RS that went zombie. 376 * No response (so proc is stuck/suspended on the Master and won't wake up.). 377 * Different with HangThenRSCrashExecutor, HangThenRSCrashExecutor will create 378 * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor 379 * will restart RS directly, situation for RS crashed when SCP is not enabled. 380 */ 381 protected class HangThenRSRestartExecutor extends GoodRsExecutor { 382 private int invocations; 383 384 @Override 385 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( 386 final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) 387 throws IOException { 388 if (this.invocations++ > 0) { 389 // Return w/o problem the second time through here. 390 return super.execOpenRegion(server, openReq); 391 } 392 // The procedure on master will just hang forever because nothing comes back 393 // from the RS in this case. 394 LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout"); 395 executor.schedule(new Runnable() { 396 @Override 397 public void run() { 398 LOG.info("Restarting RS of " + server); 399 doRestart(server); 400 } 401 }, 1, TimeUnit.SECONDS); 402 return null; 403 } 404 } 405 406 protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor { 407 public static final int TYPES_OF_FAILURE = 6; 408 private int invocations; 409 410 @Override 411 protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 412 throws IOException { 413 switch (this.invocations++) { 414 case 0: 415 throw new NotServingRegionException("Fake"); 416 case 1: 417 executor.schedule(new Runnable() { 418 @Override 419 public void run() { 420 LOG.info("Sending in CRASH of " + server); 421 doCrash(server); 422 } 423 }, 1, TimeUnit.SECONDS); 424 throw new RegionServerAbortedException("Fake!"); 425 case 2: 426 executor.schedule(new Runnable() { 427 @Override 428 public void run() { 429 LOG.info("Sending in CRASH of " + server); 430 doCrash(server); 431 } 432 }, 1, TimeUnit.SECONDS); 433 throw new RegionServerStoppedException("Fake!"); 434 case 3: 435 throw new ServerNotRunningYetException("Fake!"); 436 case 4: 437 LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server); 438 executor.schedule(new Runnable() { 439 @Override 440 public void run() { 441 LOG.info("Sending in CRASH of " + server); 442 doCrash(server); 443 } 444 }, 1, TimeUnit.SECONDS); 445 return null; 446 default: 447 return super.execCloseRegion(server, regionName); 448 } 449 } 450 } 451 452 protected class RandRsExecutor extends NoopRsExecutor { 453 private final Random rand = new Random(); 454 455 @Override 456 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 457 AdminProtos.ExecuteProceduresRequest req) throws IOException { 458 switch (rand.nextInt(5)) { 459 case 0: 460 throw new ServerNotRunningYetException("wait on server startup"); 461 case 1: 462 throw new SocketTimeoutException("simulate socket timeout"); 463 case 2: 464 throw new RemoteException("java.io.IOException", "unexpected exception"); 465 default: 466 // fall out 467 } 468 return super.sendRequest(server, req); 469 } 470 471 @Override 472 protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion( 473 final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) 474 throws IOException { 475 switch (rand.nextInt(6)) { 476 case 0: 477 LOG.info("Return OPENED response"); 478 sendTransitionReport(server, openReq.getRegion(), 479 RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); 480 return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED; 481 case 1: 482 LOG.info("Return transition report that OPENED/ALREADY_OPENED response"); 483 sendTransitionReport(server, openReq.getRegion(), 484 RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED); 485 return AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED; 486 case 2: 487 LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response"); 488 sendTransitionReport(server, openReq.getRegion(), 489 RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN); 490 return AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING; 491 default: 492 // fall out 493 } 494 // The procedure on master will just hang forever because nothing comes back 495 // from the RS in this case. 496 LOG.info( 497 "Return null as response; means proc stuck so we send in a crash report after a few seconds..."); 498 executor.schedule(new Runnable() { 499 @Override 500 public void run() { 501 LOG.info("Delayed CRASHING of " + server); 502 doCrash(server); 503 } 504 }, 5, TimeUnit.SECONDS); 505 return null; 506 } 507 508 @Override 509 protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName) 510 throws IOException { 511 AdminProtos.CloseRegionResponse.Builder resp = AdminProtos.CloseRegionResponse.newBuilder(); 512 boolean closed = rand.nextBoolean(); 513 if (closed) { 514 RegionInfo hri = am.getRegionInfo(regionName); 515 sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), 516 RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED); 517 } 518 resp.setClosed(closed); 519 return resp.build(); 520 } 521 } 522 523 protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor { 524 525 private boolean invoked = false; 526 527 private ServerName lastServer; 528 529 @Override 530 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 531 AdminProtos.ExecuteProceduresRequest req) throws IOException { 532 if (!invoked) { 533 lastServer = server; 534 invoked = true; 535 throw new CallQueueTooBigException("simulate queue full"); 536 } 537 // better select another server since the server is over loaded, but anyway, it is fine to 538 // still select the same server since it is not dead yet... 539 if (lastServer.equals(server)) { 540 LOG.warn("We still select the same server, which is not good."); 541 } 542 return super.sendRequest(server, req); 543 } 544 } 545 546 protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor { 547 548 private final int queueFullTimes; 549 550 private int retries; 551 552 private ServerName lastServer; 553 554 public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) { 555 this.queueFullTimes = queueFullTimes; 556 } 557 558 @Override 559 public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 560 AdminProtos.ExecuteProceduresRequest req) throws IOException { 561 retries++; 562 if (retries == 1) { 563 lastServer = server; 564 throw new CallTimeoutException("simulate call timeout"); 565 } 566 // should always retry on the same server 567 assertEquals(lastServer, server); 568 if (retries < queueFullTimes) { 569 throw new CallQueueTooBigException("simulate queue full"); 570 } 571 return super.sendRequest(server, req); 572 } 573 } 574 575 protected interface MockRSExecutor { 576 AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server, 577 AdminProtos.ExecuteProceduresRequest req) throws IOException; 578 } 579 580 protected class MockRSProcedureDispatcher extends RSProcedureDispatcher { 581 private MockRSExecutor mockRsExec; 582 583 public MockRSProcedureDispatcher(final MasterServices master) { 584 super(master); 585 } 586 587 public void setMockRsExecutor(final MockRSExecutor mockRsExec) { 588 this.mockRsExec = mockRsExec; 589 } 590 591 @Override 592 protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) { 593 submitTask(new MockRemoteCall(serverName, remoteProcedures)); 594 } 595 596 private class MockRemoteCall extends ExecuteProceduresRemoteCall { 597 public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) { 598 super(serverName, operations); 599 } 600 601 @Override 602 public void dispatchOpenRequests(MasterProcedureEnv env, 603 List<RegionOpenOperation> operations) { 604 request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); 605 } 606 607 @Override 608 public void dispatchCloseRequests(MasterProcedureEnv env, 609 List<RegionCloseOperation> operations) { 610 for (RegionCloseOperation op : operations) { 611 request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); 612 } 613 } 614 615 @Override 616 protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName, 617 final AdminProtos.ExecuteProceduresRequest request) throws IOException { 618 return mockRsExec.sendRequest(serverName, request); 619 } 620 } 621 } 622 623 protected void collectAssignmentManagerMetrics() { 624 assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount(); 625 assignFailedCount = assignProcMetrics.getFailedCounter().getCount(); 626 unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount(); 627 unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount(); 628 } 629}