001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.io.UncheckedIOException;
027import java.net.SocketTimeoutException;
028import java.util.Arrays;
029import java.util.NavigableMap;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ConcurrentSkipListSet;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.CallQueueTooBigException;
042import org.apache.hadoop.hbase.HBaseTestingUtility;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.ServerMetricsBuilder;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.YouAreDeadException;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.ipc.CallTimeoutException;
051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
052import org.apache.hadoop.hbase.master.MasterServices;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
059import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.CommonFSUtils;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.ipc.RemoteException;
067import org.junit.After;
068import org.junit.Before;
069import org.junit.Rule;
070import org.junit.rules.ExpectedException;
071import org.junit.rules.TestName;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
076
077import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
089
090/**
091 * Base class for AM test.
092 */
093public abstract class TestAssignmentManagerBase {
094
095  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class);
096
097  @Rule
098  public TestName name = new TestName();
099  @Rule
100  public final ExpectedException exception = ExpectedException.none();
101
102  protected static final int PROC_NTHREADS = 64;
103  protected static final int NREGIONS = 1 * 1000;
104  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
105
106  protected HBaseTestingUtility util;
107  protected MockRSProcedureDispatcher rsDispatcher;
108  protected MockMasterServices master;
109  protected AssignmentManager am;
110  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
111    new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>();
112  // Simple executor to run some simple tasks.
113  protected ScheduledExecutorService executor;
114
115  protected ProcedureMetrics assignProcMetrics;
116  protected ProcedureMetrics unassignProcMetrics;
117  protected ProcedureMetrics moveProcMetrics;
118  protected ProcedureMetrics reopenProcMetrics;
119  protected ProcedureMetrics openProcMetrics;
120  protected ProcedureMetrics closeProcMetrics;
121
122  protected long assignSubmittedCount = 0;
123  protected long assignFailedCount = 0;
124  protected long unassignSubmittedCount = 0;
125  protected long unassignFailedCount = 0;
126  protected long moveSubmittedCount = 0;
127  protected long moveFailedCount = 0;
128  protected long reopenSubmittedCount = 0;
129  protected long reopenFailedCount = 0;
130  protected long openSubmittedCount = 0;
131  protected long openFailedCount = 0;
132  protected long closeSubmittedCount = 0;
133  protected long closeFailedCount = 0;
134
135  protected int newRsAdded;
136
137  protected int getAssignMaxAttempts() {
138    // Have many so we succeed eventually.
139    return 1000;
140  }
141
142  protected void setupConfiguration(Configuration conf) throws Exception {
143    CommonFSUtils.setRootDir(conf, util.getDataTestDir());
144    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
145    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
146    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
147    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
148    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts());
149    // make retry for TRSP more frequent
150    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
151    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
152  }
153
154  @Before
155  public void setUp() throws Exception {
156    util = new HBaseTestingUtility();
157    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
158      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
159    setupConfiguration(util.getConfiguration());
160    master = new MockMasterServices(util.getConfiguration(), this.regionsToRegionServers);
161    rsDispatcher = new MockRSProcedureDispatcher(master);
162    master.start(NSERVERS, rsDispatcher);
163    newRsAdded = 0;
164    am = master.getAssignmentManager();
165    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
166    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
167    moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics();
168    reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics();
169    openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics();
170    closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics();
171    setUpMeta();
172  }
173
174  protected void setUpMeta() throws Exception {
175    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
176    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
177    am.wakeMetaLoadedEvent();
178  }
179
180  @After
181  public void tearDown() throws Exception {
182    master.stop("tearDown");
183    this.executor.shutdownNow();
184  }
185
186  protected class NoopRsExecutor implements MockRSExecutor {
187    @Override
188    public ExecuteProceduresResponse sendRequest(ServerName server,
189      ExecuteProceduresRequest request) throws IOException {
190      if (request.getOpenRegionCount() > 0) {
191        for (OpenRegionRequest req : request.getOpenRegionList()) {
192          for (RegionOpenInfo openReq : req.getOpenInfoList()) {
193            execOpenRegion(server, openReq);
194          }
195        }
196      }
197      if (request.getCloseRegionCount() > 0) {
198        for (CloseRegionRequest req : request.getCloseRegionList()) {
199          execCloseRegion(server, req.getRegion().getValue().toByteArray());
200        }
201      }
202      return ExecuteProceduresResponse.newBuilder().build();
203    }
204
205    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
206      throws IOException {
207      return null;
208    }
209
210    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
211      throws IOException {
212      return null;
213    }
214  }
215
216  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
217    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
218  }
219
220  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
221    try {
222      return future.get(3, TimeUnit.MINUTES);
223    } catch (ExecutionException e) {
224      LOG.info("ExecutionException", e);
225      Exception ee = (Exception) e.getCause();
226      if (ee instanceof InterruptedIOException) {
227        for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) {
228          LOG.info(p.toStringDetails());
229        }
230      }
231      throw (Exception) e.getCause();
232    }
233  }
234
235  // ============================================================================================
236  // Helpers
237  // ============================================================================================
238  protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception {
239    Thread[] threads = new Thread[PROC_NTHREADS];
240    for (int i = 0; i < threads.length; ++i) {
241      final int threadId = i;
242      threads[i] = new Thread() {
243        @Override
244        public void run() {
245          TableName tableName = TableName.valueOf("table-" + threadId);
246          int n = (procs.length / threads.length);
247          int start = threadId * n;
248          int stop = start + n;
249          for (int j = start; j < stop; ++j) {
250            procs[j] = createAndSubmitAssign(tableName, j);
251          }
252        }
253      };
254      threads[i].start();
255    }
256    for (int i = 0; i < threads.length; ++i) {
257      threads[i].join();
258    }
259    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
260      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
261    }
262  }
263
264  protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) {
265    RegionInfo hri = createRegionInfo(tableName, regionId);
266    TransitRegionStateProcedure proc = createAssignProcedure(hri);
267    master.getMasterProcedureExecutor().submitProcedure(proc);
268    return proc;
269  }
270
271  protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
272    return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
273      .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build();
274  }
275
276  protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) {
277    return am.createAssignProcedures(Arrays.asList(hri))[0];
278  }
279
280  protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) {
281    RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri);
282    TransitRegionStateProcedure proc;
283    regionNode.lock();
284    try {
285      assertFalse(regionNode.isInTransition());
286      proc = TransitRegionStateProcedure
287        .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri);
288      regionNode.setProcedure(proc);
289    } finally {
290      regionNode.unlock();
291    }
292    return proc;
293  }
294
295  protected void sendTransitionReport(final ServerName serverName,
296    final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
297    final TransitionCode state, long seqId) throws IOException {
298    ReportRegionStateTransitionRequest.Builder req =
299      ReportRegionStateTransitionRequest.newBuilder();
300    req.setServer(ProtobufUtil.toServerName(serverName));
301    req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
302      .setTransitionCode(state).setOpenSeqNum(seqId).build());
303    am.reportRegionStateTransition(req.build());
304  }
305
306  protected void doCrash(final ServerName serverName) {
307    this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
308    this.am.submitServerCrash(serverName, false/* No WALs here */, false);
309    // add a new server to avoid killing all the region servers which may hang the UTs
310    ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
311    newRsAdded++;
312    try {
313      this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder
314        .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
315    } catch (YouAreDeadException e) {
316      // should not happen
317      throw new UncheckedIOException(e);
318    }
319  }
320
321  protected void doRestart(final ServerName serverName) {
322    try {
323      this.master.restartRegionServer(serverName);
324    } catch (IOException e) {
325      LOG.warn("Can not restart RS with new startcode");
326    }
327  }
328
329  protected class GoodRsExecutor extends NoopRsExecutor {
330    @Override
331    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
332      throws IOException {
333      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
334      long previousOpenSeqNum =
335        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
336      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
337        previousOpenSeqNum + 2);
338      // Concurrency?
339      // Now update the state of our cluster in regionsToRegionServers.
340      SortedSet<byte[]> regions = regionsToRegionServers.get(server);
341      if (regions == null) {
342        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
343        regionsToRegionServers.put(server, regions);
344      }
345      if (regions.contains(hri.getRegionName())) {
346        throw new UnsupportedOperationException(hri.getRegionNameAsString());
347      }
348      regions.add(hri.getRegionName());
349      return RegionOpeningState.OPENED;
350    }
351
352    @Override
353    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
354      throws IOException {
355      RegionInfo hri = am.getRegionInfo(regionName);
356      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
357      return CloseRegionResponse.newBuilder().setClosed(true).build();
358    }
359  }
360
361  protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
362    @Override
363    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
364      throws IOException {
365      throw new ServerNotRunningYetException("wait on server startup");
366    }
367  }
368
369  protected static class FaultyRsExecutor implements MockRSExecutor {
370    private final IOException exception;
371
372    public FaultyRsExecutor(final IOException exception) {
373      this.exception = exception;
374    }
375
376    @Override
377    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
378      throws IOException {
379      throw exception;
380    }
381  }
382
383  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
384    private final int timeoutTimes;
385
386    private ServerName lastServer;
387    private int retries;
388
389    public SocketTimeoutRsExecutor(int timeoutTimes) {
390      this.timeoutTimes = timeoutTimes;
391    }
392
393    @Override
394    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
395      throws IOException {
396      // SocketTimeoutException should be a temporary problem
397      // unless the server will be declared dead.
398      retries++;
399      if (retries == 1) {
400        lastServer = server;
401      }
402      if (retries <= timeoutTimes) {
403        LOG.debug("Socket timeout for server=" + server + " retries=" + retries);
404        // should not change the server if the server is not dead yet.
405        assertEquals(lastServer, server);
406        if (retries == timeoutTimes) {
407          LOG.info("Mark server=" + server + " as dead. retries=" + retries);
408          master.getServerManager().moveFromOnlineToDeadServers(server);
409          executor.schedule(new Runnable() {
410            @Override
411            public void run() {
412              LOG.info("Sending in CRASH of " + server);
413              doCrash(server);
414            }
415          }, 1, TimeUnit.SECONDS);
416        }
417        throw new SocketTimeoutException("simulate socket timeout");
418      } else {
419        // should select another server
420        assertNotEquals(lastServer, server);
421        return super.sendRequest(server, req);
422      }
423    }
424  }
425
426  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
427
428    private boolean invoked = false;
429
430    private ServerName lastServer;
431
432    @Override
433    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
434      throws IOException {
435      if (!invoked) {
436        lastServer = server;
437        invoked = true;
438        throw new CallQueueTooBigException("simulate queue full");
439      }
440      // better select another server since the server is over loaded, but anyway, it is fine to
441      // still select the same server since it is not dead yet...
442      if (lastServer.equals(server)) {
443        LOG.warn("We still select the same server, which is not good.");
444      }
445      return super.sendRequest(server, req);
446    }
447  }
448
449  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
450
451    private final int queueFullTimes;
452
453    private int retries;
454
455    private ServerName lastServer;
456
457    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
458      this.queueFullTimes = queueFullTimes;
459    }
460
461    @Override
462    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
463      throws IOException {
464      retries++;
465      if (retries == 1) {
466        lastServer = server;
467        throw new CallTimeoutException("simulate call timeout");
468      }
469      // should always retry on the same server
470      assertEquals(lastServer, server);
471      if (retries < queueFullTimes) {
472        throw new CallQueueTooBigException("simulate queue full");
473      }
474      return super.sendRequest(server, req);
475    }
476  }
477
478  /**
479   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
480   * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this
481   * server after a few seconds; crash is supposed to take care of the suspended procedures.
482   */
483  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
484    private int invocations;
485
486    @Override
487    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
488      throws IOException {
489      if (this.invocations++ > 0) {
490        // Return w/o problem the second time through here.
491        return super.execOpenRegion(server, openReq);
492      }
493      // The procedure on master will just hang forever because nothing comes back
494      // from the RS in this case.
495      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
496      executor.schedule(new Runnable() {
497        @Override
498        public void run() {
499          LOG.info("Sending in CRASH of " + server);
500          doCrash(server);
501        }
502      }, 1, TimeUnit.SECONDS);
503      return null;
504    }
505  }
506
507  /**
508   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
509   * proc is stuck/suspended on the Master and won't wake up.). Different with
510   * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the
511   * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for
512   * RS crashed when SCP is not enabled.
513   */
514  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
515    private int invocations;
516
517    @Override
518    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
519      throws IOException {
520      if (this.invocations++ > 0) {
521        // Return w/o problem the second time through here.
522        return super.execOpenRegion(server, openReq);
523      }
524      // The procedure on master will just hang forever because nothing comes back
525      // from the RS in this case.
526      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
527      executor.schedule(new Runnable() {
528        @Override
529        public void run() {
530          LOG.info("Restarting RS of " + server);
531          doRestart(server);
532        }
533      }, 1, TimeUnit.SECONDS);
534      return null;
535    }
536  }
537
538  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
539    public static final int TYPES_OF_FAILURE = 6;
540    private int invocations;
541
542    @Override
543    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
544      throws IOException {
545      switch (this.invocations++) {
546        case 0:
547          throw new NotServingRegionException("Fake");
548        case 1:
549          executor.schedule(new Runnable() {
550            @Override
551            public void run() {
552              LOG.info("Sending in CRASH of " + server);
553              doCrash(server);
554            }
555          }, 1, TimeUnit.SECONDS);
556          throw new RegionServerAbortedException("Fake!");
557        case 2:
558          executor.schedule(new Runnable() {
559            @Override
560            public void run() {
561              LOG.info("Sending in CRASH of " + server);
562              doCrash(server);
563            }
564          }, 1, TimeUnit.SECONDS);
565          throw new RegionServerStoppedException("Fake!");
566        case 3:
567          throw new ServerNotRunningYetException("Fake!");
568        case 4:
569          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
570          executor.schedule(new Runnable() {
571            @Override
572            public void run() {
573              LOG.info("Sending in CRASH of " + server);
574              doCrash(server);
575            }
576          }, 1, TimeUnit.SECONDS);
577          return null;
578        default:
579          return super.execCloseRegion(server, regionName);
580      }
581    }
582  }
583
584  protected class RandRsExecutor extends NoopRsExecutor {
585    @Override
586    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
587      throws IOException {
588      switch (ThreadLocalRandom.current().nextInt(5)) {
589        case 0:
590          throw new ServerNotRunningYetException("wait on server startup");
591        case 1:
592          throw new SocketTimeoutException("simulate socket timeout");
593        case 2:
594          throw new RemoteException("java.io.IOException", "unexpected exception");
595        default:
596          // fall out
597      }
598      return super.sendRequest(server, req);
599    }
600
601    @Override
602    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
603      throws IOException {
604      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
605      long previousOpenSeqNum =
606        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
607      switch (ThreadLocalRandom.current().nextInt(3)) {
608        case 0:
609          LOG.info("Return OPENED response");
610          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
611            previousOpenSeqNum + 2);
612          return OpenRegionResponse.RegionOpeningState.OPENED;
613        case 1:
614          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
615          sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
616          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
617        default:
618          // fall out
619      }
620      // The procedure on master will just hang forever because nothing comes back
621      // from the RS in this case.
622      LOG.info("Return null as response; means proc stuck so we send in a crash report after"
623        + " a few seconds...");
624      executor.schedule(new Runnable() {
625        @Override
626        public void run() {
627          LOG.info("Delayed CRASHING of " + server);
628          doCrash(server);
629        }
630      }, 5, TimeUnit.SECONDS);
631      return null;
632    }
633
634    @Override
635    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
636      throws IOException {
637      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
638      boolean closed = ThreadLocalRandom.current().nextBoolean();
639      if (closed) {
640        RegionInfo hri = am.getRegionInfo(regionName);
641        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
642      }
643      resp.setClosed(closed);
644      return resp.build();
645    }
646  }
647
648  protected interface MockRSExecutor {
649    ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
650      throws IOException;
651  }
652
653  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
654    private MockRSExecutor mockRsExec;
655
656    public MockRSProcedureDispatcher(final MasterServices master) {
657      super(master);
658    }
659
660    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
661      this.mockRsExec = mockRsExec;
662    }
663
664    @Override
665    protected void remoteDispatch(ServerName serverName,
666      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
667      submitTask(new MockRemoteCall(serverName, remoteProcedures));
668    }
669
670    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
671      public MockRemoteCall(final ServerName serverName,
672        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
673        super(serverName, operations);
674      }
675
676      @Override
677      protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
678        final ExecuteProceduresRequest request) throws IOException {
679        return mockRsExec.sendRequest(serverName, request);
680      }
681    }
682  }
683
684  protected final void collectAssignmentManagerMetrics() {
685    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
686    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
687    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
688    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
689    moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount();
690    moveFailedCount = moveProcMetrics.getFailedCounter().getCount();
691    reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount();
692    reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount();
693    openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount();
694    openFailedCount = openProcMetrics.getFailedCounter().getCount();
695    closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount();
696    closeFailedCount = closeProcMetrics.getFailedCounter().getCount();
697  }
698}