001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022
023import java.io.IOException;
024import java.io.InterruptedIOException;
025import java.net.SocketTimeoutException;
026import java.util.List;
027import java.util.NavigableMap;
028import java.util.Random;
029import java.util.Set;
030import java.util.SortedSet;
031import java.util.concurrent.ConcurrentSkipListMap;
032import java.util.concurrent.ConcurrentSkipListSet;
033import java.util.concurrent.ExecutionException;
034import java.util.concurrent.Executors;
035import java.util.concurrent.Future;
036import java.util.concurrent.ScheduledExecutorService;
037import java.util.concurrent.TimeUnit;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.CallQueueTooBigException;
041import org.apache.hadoop.hbase.HBaseTestingUtility;
042import org.apache.hadoop.hbase.NotServingRegionException;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionInfoBuilder;
047import org.apache.hadoop.hbase.ipc.CallTimeoutException;
048import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
049import org.apache.hadoop.hbase.master.MasterServices;
050import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
051import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
052import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
053import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
054import org.apache.hadoop.hbase.procedure2.Procedure;
055import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
056import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
057import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
058import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.util.FSUtils;
061import org.apache.hadoop.ipc.RemoteException;
062import org.junit.After;
063import org.junit.Before;
064import org.junit.Rule;
065import org.junit.rules.ExpectedException;
066import org.junit.rules.TestName;
067import org.slf4j.Logger;
068import org.slf4j.LoggerFactory;
069
070import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
073
074/**
075 * Base class for AM test.
076 */
077public class TestAssignmentManagerBase {
078  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class);
079
080  @Rule
081  public TestName name = new TestName();
082  @Rule
083  public final ExpectedException exception = ExpectedException.none();
084
085  private static final int PROC_NTHREADS = 64;
086  protected static final int NREGIONS = 1 * 1000;
087  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
088
089  protected HBaseTestingUtility UTIL;
090  protected MockRSProcedureDispatcher rsDispatcher;
091  protected MockMasterServices master;
092  protected AssignmentManager am;
093  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
094      new ConcurrentSkipListMap<>();
095  // Simple executor to run some simple tasks.
096  protected ScheduledExecutorService executor;
097
098  protected ProcedureMetrics assignProcMetrics;
099  protected ProcedureMetrics unassignProcMetrics;
100
101  protected long assignSubmittedCount = 0;
102  protected long assignFailedCount = 0;
103  protected long unassignSubmittedCount = 0;
104  protected long unassignFailedCount = 0;
105
106  protected void setupConfiguration(Configuration conf) throws Exception {
107    FSUtils.setRootDir(conf, UTIL.getDataTestDir());
108    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
109    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
110    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
111    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
112    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
113  }
114
115  @Before
116  public void setUp() throws Exception {
117    UTIL = new HBaseTestingUtility();
118    this.executor = Executors.newSingleThreadScheduledExecutor();
119    setupConfiguration(UTIL.getConfiguration());
120    master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
121    rsDispatcher = new MockRSProcedureDispatcher(master);
122    master.start(NSERVERS, rsDispatcher);
123    am = master.getAssignmentManager();
124    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
125    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
126    setUpMeta();
127  }
128
129  private void setUpMeta() throws Exception {
130    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
131    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
132    am.wakeMetaLoadedEvent();
133  }
134
135  @After
136  public void tearDown() throws Exception {
137    master.stop("tearDown");
138    this.executor.shutdownNow();
139  }
140
141  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
142    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
143  }
144
145  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
146    try {
147      return future.get(5, TimeUnit.SECONDS);
148    } catch (ExecutionException e) {
149      LOG.info("ExecutionException", e);
150      Exception ee = (Exception) e.getCause();
151      if (ee instanceof InterruptedIOException) {
152        for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) {
153          LOG.info(p.toStringDetails());
154        }
155      }
156      throw (Exception) e.getCause();
157    }
158  }
159
160  // ============================================================================================
161  //  Helpers
162  // ============================================================================================
163  protected void bulkSubmit(final AssignProcedure[] procs) throws Exception {
164    final Thread[] threads = new Thread[PROC_NTHREADS];
165    for (int i = 0; i < threads.length; ++i) {
166      final int threadId = i;
167      threads[i] = new Thread() {
168        @Override
169        public void run() {
170          TableName tableName = TableName.valueOf("table-" + threadId);
171          int n = (procs.length / threads.length);
172          int start = threadId * n;
173          int stop = start + n;
174          for (int j = start; j < stop; ++j) {
175            procs[j] = createAndSubmitAssign(tableName, j);
176          }
177        }
178      };
179      threads[i].start();
180    }
181    for (int i = 0; i < threads.length; ++i) {
182      threads[i].join();
183    }
184    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
185      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
186    }
187  }
188
189  private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) {
190    RegionInfo hri = createRegionInfo(tableName, regionId);
191    AssignProcedure proc = am.createAssignProcedure(hri);
192    master.getMasterProcedureExecutor().submitProcedure(proc);
193    return proc;
194  }
195
196  protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
197    return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
198        .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build();
199  }
200
201  private void sendTransitionReport(final ServerName serverName,
202      final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
203      final RegionServerStatusProtos.RegionStateTransition.TransitionCode state)
204      throws IOException {
205    RegionServerStatusProtos.ReportRegionStateTransitionRequest.Builder req =
206        RegionServerStatusProtos.ReportRegionStateTransitionRequest.newBuilder();
207    req.setServer(ProtobufUtil.toServerName(serverName));
208    req.addTransition(
209        RegionServerStatusProtos.RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
210            .setTransitionCode(state).setOpenSeqNum(1).build());
211    am.reportRegionStateTransition(req.build());
212  }
213
214  private void doCrash(final ServerName serverName) {
215    this.am.submitServerCrash(serverName, false/*No WALs here*/);
216  }
217
218  private void doRestart(final ServerName serverName) {
219    try {
220      this.master.restartRegionServer(serverName);
221    } catch (IOException e) {
222      LOG.warn("Can not restart RS with new startcode");
223    }
224  }
225
226  private class NoopRsExecutor implements MockRSExecutor {
227    @Override
228    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
229        AdminProtos.ExecuteProceduresRequest request) throws IOException {
230      if (request.getOpenRegionCount() > 0) {
231        for (AdminProtos.OpenRegionRequest req : request.getOpenRegionList()) {
232          for (AdminProtos.OpenRegionRequest.RegionOpenInfo openReq : req.getOpenInfoList()) {
233            execOpenRegion(server, openReq);
234          }
235        }
236      }
237      if (request.getCloseRegionCount() > 0) {
238        for (AdminProtos.CloseRegionRequest req : request.getCloseRegionList()) {
239          execCloseRegion(server, req.getRegion().getValue().toByteArray());
240        }
241      }
242      return AdminProtos.ExecuteProceduresResponse.newBuilder().build();
243    }
244
245    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
246        AdminProtos.OpenRegionRequest.RegionOpenInfo regionInfo) throws IOException {
247      return null;
248    }
249
250    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
251        throws IOException {
252      return null;
253    }
254  }
255
256  protected class GoodRsExecutor extends NoopRsExecutor {
257    @Override
258    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(ServerName server,
259        AdminProtos.OpenRegionRequest.RegionOpenInfo openReq) throws IOException {
260      sendTransitionReport(server, openReq.getRegion(),
261          RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
262      // Concurrency?
263      // Now update the state of our cluster in regionsToRegionServers.
264      SortedSet<byte[]> regions = regionsToRegionServers.get(server);
265      if (regions == null) {
266        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
267        regionsToRegionServers.put(server, regions);
268      }
269      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
270      if (regions.contains(hri.getRegionName())) {
271        throw new UnsupportedOperationException(hri.getRegionNameAsString());
272      }
273      regions.add(hri.getRegionName());
274      return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
275    }
276
277    @Override
278    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
279        throws IOException {
280      RegionInfo hri = am.getRegionInfo(regionName);
281      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
282          RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
283      return AdminProtos.CloseRegionResponse.newBuilder().setClosed(true).build();
284    }
285  }
286
287  protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
288    @Override
289    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
290        AdminProtos.ExecuteProceduresRequest req) throws IOException {
291      throw new ServerNotRunningYetException("wait on server startup");
292    }
293  }
294
295  protected static class FaultyRsExecutor implements MockRSExecutor {
296    private final IOException exception;
297
298    public FaultyRsExecutor(final IOException exception) {
299      this.exception = exception;
300    }
301
302    @Override
303    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
304        AdminProtos.ExecuteProceduresRequest req) throws IOException {
305      throw exception;
306    }
307  }
308
309  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
310    private final int maxSocketTimeoutRetries;
311    private final int maxServerRetries;
312
313    private ServerName lastServer;
314    private int sockTimeoutRetries;
315    private int serverRetries;
316
317    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
318      this.maxServerRetries = maxServerRetries;
319      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
320    }
321
322    @Override
323    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
324        AdminProtos.ExecuteProceduresRequest req) throws IOException {
325      // SocketTimeoutException should be a temporary problem
326      // unless the server will be declared dead.
327      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
328        if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
329        lastServer = server;
330        LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
331        throw new SocketTimeoutException("simulate socket timeout");
332      } else if (serverRetries++ < maxServerRetries) {
333        LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
334        master.getServerManager().moveFromOnlineToDeadServers(server);
335        sockTimeoutRetries = 0;
336        throw new SocketTimeoutException("simulate socket timeout");
337      } else {
338        return super.sendRequest(server, req);
339      }
340    }
341  }
342
343  /**
344   * Takes open request and then returns nothing so acts like a RS that went zombie.
345   * No response (so proc is stuck/suspended on the Master and won't wake up.). We
346   * then send in a crash for this server after a few seconds; crash is supposed to
347   * take care of the suspended procedures.
348   */
349  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
350    private int invocations;
351
352    @Override
353    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
354        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
355        throws IOException {
356      if (this.invocations++ > 0) {
357        // Return w/o problem the second time through here.
358        return super.execOpenRegion(server, openReq);
359      }
360      // The procedure on master will just hang forever because nothing comes back
361      // from the RS in this case.
362      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
363      executor.schedule(new Runnable() {
364        @Override
365        public void run() {
366          LOG.info("Sending in CRASH of " + server);
367          doCrash(server);
368        }
369      }, 1, TimeUnit.SECONDS);
370      return null;
371    }
372  }
373
374  /**
375   * Takes open request and then returns nothing so acts like a RS that went zombie.
376   * No response (so proc is stuck/suspended on the Master and won't wake up.).
377   * Different with HangThenRSCrashExecutor,  HangThenRSCrashExecutor will create
378   * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor
379   * will restart RS directly, situation for RS crashed when SCP is not enabled.
380   */
381  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
382    private int invocations;
383
384    @Override
385    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
386        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
387        throws IOException {
388      if (this.invocations++ > 0) {
389        // Return w/o problem the second time through here.
390        return super.execOpenRegion(server, openReq);
391      }
392      // The procedure on master will just hang forever because nothing comes back
393      // from the RS in this case.
394      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
395      executor.schedule(new Runnable() {
396        @Override
397        public void run() {
398          LOG.info("Restarting RS of " + server);
399          doRestart(server);
400        }
401      }, 1, TimeUnit.SECONDS);
402      return null;
403    }
404  }
405
406  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
407    public static final int TYPES_OF_FAILURE = 6;
408    private int invocations;
409
410    @Override
411    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
412        throws IOException {
413      switch (this.invocations++) {
414        case 0:
415          throw new NotServingRegionException("Fake");
416        case 1:
417          executor.schedule(new Runnable() {
418            @Override
419            public void run() {
420              LOG.info("Sending in CRASH of " + server);
421              doCrash(server);
422            }
423          }, 1, TimeUnit.SECONDS);
424          throw new RegionServerAbortedException("Fake!");
425        case 2:
426          executor.schedule(new Runnable() {
427            @Override
428            public void run() {
429              LOG.info("Sending in CRASH of " + server);
430              doCrash(server);
431            }
432          }, 1, TimeUnit.SECONDS);
433          throw new RegionServerStoppedException("Fake!");
434        case 3:
435          throw new ServerNotRunningYetException("Fake!");
436        case 4:
437          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
438          executor.schedule(new Runnable() {
439            @Override
440            public void run() {
441              LOG.info("Sending in CRASH of " + server);
442              doCrash(server);
443            }
444          }, 1, TimeUnit.SECONDS);
445          return null;
446        default:
447          return super.execCloseRegion(server, regionName);
448      }
449    }
450  }
451
452  protected class RandRsExecutor extends NoopRsExecutor {
453    private final Random rand = new Random();
454
455    @Override
456    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
457        AdminProtos.ExecuteProceduresRequest req) throws IOException {
458      switch (rand.nextInt(5)) {
459        case 0:
460          throw new ServerNotRunningYetException("wait on server startup");
461        case 1:
462          throw new SocketTimeoutException("simulate socket timeout");
463        case 2:
464          throw new RemoteException("java.io.IOException", "unexpected exception");
465        default:
466          // fall out
467      }
468      return super.sendRequest(server, req);
469    }
470
471    @Override
472    protected AdminProtos.OpenRegionResponse.RegionOpeningState execOpenRegion(
473        final ServerName server, AdminProtos.OpenRegionRequest.RegionOpenInfo openReq)
474        throws IOException {
475      switch (rand.nextInt(6)) {
476        case 0:
477          LOG.info("Return OPENED response");
478          sendTransitionReport(server, openReq.getRegion(),
479              RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
480          return AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED;
481        case 1:
482          LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
483          sendTransitionReport(server, openReq.getRegion(),
484              RegionServerStatusProtos.RegionStateTransition.TransitionCode.OPENED);
485          return AdminProtos.OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
486        case 2:
487          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
488          sendTransitionReport(server, openReq.getRegion(),
489              RegionServerStatusProtos.RegionStateTransition.TransitionCode.FAILED_OPEN);
490          return AdminProtos.OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
491        default:
492          // fall out
493      }
494      // The procedure on master will just hang forever because nothing comes back
495      // from the RS in this case.
496      LOG.info(
497          "Return null as response; means proc stuck so we send in a crash report after a few seconds...");
498      executor.schedule(new Runnable() {
499        @Override
500        public void run() {
501          LOG.info("Delayed CRASHING of " + server);
502          doCrash(server);
503        }
504      }, 5, TimeUnit.SECONDS);
505      return null;
506    }
507
508    @Override
509    protected AdminProtos.CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
510        throws IOException {
511      AdminProtos.CloseRegionResponse.Builder resp = AdminProtos.CloseRegionResponse.newBuilder();
512      boolean closed = rand.nextBoolean();
513      if (closed) {
514        RegionInfo hri = am.getRegionInfo(regionName);
515        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri),
516            RegionServerStatusProtos.RegionStateTransition.TransitionCode.CLOSED);
517      }
518      resp.setClosed(closed);
519      return resp.build();
520    }
521  }
522
523  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
524
525    private boolean invoked = false;
526
527    private ServerName lastServer;
528
529    @Override
530    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
531        AdminProtos.ExecuteProceduresRequest req) throws IOException {
532      if (!invoked) {
533        lastServer = server;
534        invoked = true;
535        throw new CallQueueTooBigException("simulate queue full");
536      }
537      // better select another server since the server is over loaded, but anyway, it is fine to
538      // still select the same server since it is not dead yet...
539      if (lastServer.equals(server)) {
540        LOG.warn("We still select the same server, which is not good.");
541      }
542      return super.sendRequest(server, req);
543    }
544  }
545
546  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
547
548    private final int queueFullTimes;
549
550    private int retries;
551
552    private ServerName lastServer;
553
554    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
555      this.queueFullTimes = queueFullTimes;
556    }
557
558    @Override
559    public AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
560        AdminProtos.ExecuteProceduresRequest req) throws IOException {
561      retries++;
562      if (retries == 1) {
563        lastServer = server;
564        throw new CallTimeoutException("simulate call timeout");
565      }
566      // should always retry on the same server
567      assertEquals(lastServer, server);
568      if (retries < queueFullTimes) {
569        throw new CallQueueTooBigException("simulate queue full");
570      }
571      return super.sendRequest(server, req);
572    }
573  }
574
575  protected interface MockRSExecutor {
576    AdminProtos.ExecuteProceduresResponse sendRequest(ServerName server,
577        AdminProtos.ExecuteProceduresRequest req) throws IOException;
578  }
579
580  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
581    private MockRSExecutor mockRsExec;
582
583    public MockRSProcedureDispatcher(final MasterServices master) {
584      super(master);
585    }
586
587    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
588      this.mockRsExec = mockRsExec;
589    }
590
591    @Override
592    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
593      submitTask(new MockRemoteCall(serverName, remoteProcedures));
594    }
595
596    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
597      public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
598        super(serverName, operations);
599      }
600
601      @Override
602      public void dispatchOpenRequests(MasterProcedureEnv env,
603          List<RegionOpenOperation> operations) {
604        request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
605      }
606
607      @Override
608      public void dispatchCloseRequests(MasterProcedureEnv env,
609          List<RegionCloseOperation> operations) {
610        for (RegionCloseOperation op : operations) {
611          request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
612        }
613      }
614
615      @Override
616      protected AdminProtos.ExecuteProceduresResponse sendRequest(final ServerName serverName,
617          final AdminProtos.ExecuteProceduresRequest request) throws IOException {
618        return mockRsExec.sendRequest(serverName, request);
619      }
620    }
621  }
622
623  protected void collectAssignmentManagerMetrics() {
624    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
625    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
626    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
627    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
628  }
629}