001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.io.UncheckedIOException;
027import java.net.SocketTimeoutException;
028import java.util.Arrays;
029import java.util.NavigableMap;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ConcurrentSkipListSet;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.CallQueueTooBigException;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.ServerMetricsBuilder;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.YouAreDeadException;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.ipc.CallTimeoutException;
051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
052import org.apache.hadoop.hbase.master.MasterServices;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
059import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.CommonFSUtils;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.ipc.RemoteException;
067import org.junit.After;
068import org.junit.Before;
069import org.junit.Rule;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
088
089/**
090 * Base class for AM test.
091 */
092public abstract class TestAssignmentManagerBase {
093
094  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class);
095
096  @Rule
097  public TestName name = new TestName();
098
099  protected static final int PROC_NTHREADS = 64;
100  protected static final int NREGIONS = 1 * 1000;
101  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
102
103  protected HBaseTestingUtil util;
104  protected MockRSProcedureDispatcher rsDispatcher;
105  protected MockMasterServices master;
106  protected AssignmentManager am;
107  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
108    new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>();
109  // Simple executor to run some simple tasks.
110  protected ScheduledExecutorService executor;
111
112  protected ProcedureMetrics assignProcMetrics;
113  protected ProcedureMetrics unassignProcMetrics;
114  protected ProcedureMetrics moveProcMetrics;
115  protected ProcedureMetrics reopenProcMetrics;
116  protected ProcedureMetrics openProcMetrics;
117  protected ProcedureMetrics closeProcMetrics;
118
119  protected long assignSubmittedCount = 0;
120  protected long assignFailedCount = 0;
121  protected long unassignSubmittedCount = 0;
122  protected long unassignFailedCount = 0;
123  protected long moveSubmittedCount = 0;
124  protected long moveFailedCount = 0;
125  protected long reopenSubmittedCount = 0;
126  protected long reopenFailedCount = 0;
127  protected long openSubmittedCount = 0;
128  protected long openFailedCount = 0;
129  protected long closeSubmittedCount = 0;
130  protected long closeFailedCount = 0;
131
132  protected int newRsAdded;
133
134  protected int getAssignMaxAttempts() {
135    // Have many so we succeed eventually.
136    return 1000;
137  }
138
139  protected void setupConfiguration(Configuration conf) throws Exception {
140    CommonFSUtils.setRootDir(conf, util.getDataTestDir());
141    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
142    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
143    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
144    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
145    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts());
146    // make retry for TRSP more frequent
147    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
148    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
149  }
150
151  @Before
152  public void setUp() throws Exception {
153    util = new HBaseTestingUtil();
154    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
155      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
156    setupConfiguration(util.getConfiguration());
157    master = new MockMasterServices(util.getConfiguration());
158    rsDispatcher = new MockRSProcedureDispatcher(master);
159    master.start(NSERVERS, rsDispatcher);
160    newRsAdded = 0;
161    am = master.getAssignmentManager();
162    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
163    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
164    moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics();
165    reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics();
166    openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics();
167    closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics();
168    setUpMeta();
169  }
170
171  protected void setUpMeta() throws Exception {
172    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
173    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
174    am.wakeMetaLoadedEvent();
175  }
176
177  @After
178  public void tearDown() throws Exception {
179    master.stop("tearDown");
180    this.executor.shutdownNow();
181  }
182
183  protected class NoopRsExecutor implements MockRSExecutor {
184    @Override
185    public ExecuteProceduresResponse sendRequest(ServerName server,
186      ExecuteProceduresRequest request) throws IOException {
187      if (request.getOpenRegionCount() > 0) {
188        for (OpenRegionRequest req : request.getOpenRegionList()) {
189          for (RegionOpenInfo openReq : req.getOpenInfoList()) {
190            execOpenRegion(server, openReq);
191          }
192        }
193      }
194      if (request.getCloseRegionCount() > 0) {
195        for (CloseRegionRequest req : request.getCloseRegionList()) {
196          execCloseRegion(server, req.getRegion().getValue().toByteArray());
197        }
198      }
199      return ExecuteProceduresResponse.newBuilder().build();
200    }
201
202    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
203      throws IOException {
204      return null;
205    }
206
207    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
208      throws IOException {
209      return null;
210    }
211  }
212
213  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
214    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
215  }
216
217  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
218    try {
219      return future.get(3, TimeUnit.MINUTES);
220    } catch (ExecutionException e) {
221      LOG.info("ExecutionException", e);
222      Exception ee = (Exception) e.getCause();
223      if (ee instanceof InterruptedIOException) {
224        for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) {
225          LOG.info(p.toStringDetails());
226        }
227      }
228      throw (Exception) e.getCause();
229    }
230  }
231
232  // ============================================================================================
233  // Helpers
234  // ============================================================================================
235  protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception {
236    Thread[] threads = new Thread[PROC_NTHREADS];
237    for (int i = 0; i < threads.length; ++i) {
238      final int threadId = i;
239      threads[i] = new Thread() {
240        @Override
241        public void run() {
242          TableName tableName = TableName.valueOf("table-" + threadId);
243          int n = (procs.length / threads.length);
244          int start = threadId * n;
245          int stop = start + n;
246          for (int j = start; j < stop; ++j) {
247            procs[j] = createAndSubmitAssign(tableName, j);
248          }
249        }
250      };
251      threads[i].start();
252    }
253    for (int i = 0; i < threads.length; ++i) {
254      threads[i].join();
255    }
256    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
257      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
258    }
259  }
260
261  protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) {
262    RegionInfo hri = createRegionInfo(tableName, regionId);
263    TransitRegionStateProcedure proc = createAssignProcedure(hri);
264    master.getMasterProcedureExecutor().submitProcedure(proc);
265    return proc;
266  }
267
268  protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
269    return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
270      .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build();
271  }
272
273  protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) {
274    return am.createAssignProcedures(Arrays.asList(hri))[0];
275  }
276
277  protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) {
278    RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri);
279    TransitRegionStateProcedure proc;
280    regionNode.lock();
281    try {
282      assertFalse(regionNode.isInTransition());
283      proc = TransitRegionStateProcedure
284        .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri);
285      regionNode.setProcedure(proc);
286    } finally {
287      regionNode.unlock();
288    }
289    return proc;
290  }
291
292  protected void sendTransitionReport(final ServerName serverName,
293    final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
294    final TransitionCode state, long seqId) throws IOException {
295    ReportRegionStateTransitionRequest.Builder req =
296      ReportRegionStateTransitionRequest.newBuilder();
297    req.setServer(ProtobufUtil.toServerName(serverName));
298    req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
299      .setTransitionCode(state).setOpenSeqNum(seqId).build());
300    am.reportRegionStateTransition(req.build());
301  }
302
303  protected void doCrash(final ServerName serverName) {
304    this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
305    this.am.submitServerCrash(serverName, false/* No WALs here */, false);
306    // add a new server to avoid killing all the region servers which may hang the UTs
307    ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
308    newRsAdded++;
309    try {
310      this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder
311        .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
312    } catch (YouAreDeadException e) {
313      // should not happen
314      throw new UncheckedIOException(e);
315    }
316  }
317
318  protected void doRestart(final ServerName serverName) {
319    try {
320      this.master.restartRegionServer(serverName);
321    } catch (IOException e) {
322      LOG.warn("Can not restart RS with new startcode");
323    }
324  }
325
326  protected class GoodRsExecutor extends NoopRsExecutor {
327    @Override
328    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
329      throws IOException {
330      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
331      long previousOpenSeqNum =
332        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
333      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
334        previousOpenSeqNum + 2);
335      // Concurrency?
336      // Now update the state of our cluster in regionsToRegionServers.
337      SortedSet<byte[]> regions = regionsToRegionServers.get(server);
338      if (regions == null) {
339        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
340        regionsToRegionServers.put(server, regions);
341      }
342      if (regions.contains(hri.getRegionName())) {
343        throw new UnsupportedOperationException(hri.getRegionNameAsString());
344      }
345      regions.add(hri.getRegionName());
346      return RegionOpeningState.OPENED;
347    }
348
349    @Override
350    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
351      throws IOException {
352      RegionInfo hri = am.getRegionInfo(regionName);
353      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
354      return CloseRegionResponse.newBuilder().setClosed(true).build();
355    }
356  }
357
358  protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
359    @Override
360    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
361      throws IOException {
362      throw new ServerNotRunningYetException("wait on server startup");
363    }
364  }
365
366  protected static class FaultyRsExecutor implements MockRSExecutor {
367    private final IOException exception;
368
369    public FaultyRsExecutor(final IOException exception) {
370      this.exception = exception;
371    }
372
373    @Override
374    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
375      throws IOException {
376      throw exception;
377    }
378  }
379
380  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
381    private final int timeoutTimes;
382
383    private ServerName lastServer;
384    private int retries;
385
386    public SocketTimeoutRsExecutor(int timeoutTimes) {
387      this.timeoutTimes = timeoutTimes;
388    }
389
390    @Override
391    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
392      throws IOException {
393      // SocketTimeoutException should be a temporary problem
394      // unless the server will be declared dead.
395      retries++;
396      if (retries == 1) {
397        lastServer = server;
398      }
399      if (retries <= timeoutTimes) {
400        LOG.debug("Socket timeout for server=" + server + " retries=" + retries);
401        // should not change the server if the server is not dead yet.
402        assertEquals(lastServer, server);
403        if (retries == timeoutTimes) {
404          LOG.info("Mark server=" + server + " as dead. retries=" + retries);
405          master.getServerManager().moveFromOnlineToDeadServers(server);
406          executor.schedule(new Runnable() {
407            @Override
408            public void run() {
409              LOG.info("Sending in CRASH of " + server);
410              doCrash(server);
411            }
412          }, 1, TimeUnit.SECONDS);
413        }
414        throw new SocketTimeoutException("simulate socket timeout");
415      } else {
416        // should select another server
417        assertNotEquals(lastServer, server);
418        return super.sendRequest(server, req);
419      }
420    }
421  }
422
423  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
424
425    private boolean invoked = false;
426
427    private ServerName lastServer;
428
429    @Override
430    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
431      throws IOException {
432      if (!invoked) {
433        lastServer = server;
434        invoked = true;
435        throw new CallQueueTooBigException("simulate queue full");
436      }
437      // better select another server since the server is over loaded, but anyway, it is fine to
438      // still select the same server since it is not dead yet...
439      if (lastServer.equals(server)) {
440        LOG.warn("We still select the same server, which is not good.");
441      }
442      return super.sendRequest(server, req);
443    }
444  }
445
446  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
447
448    private final int queueFullTimes;
449
450    private int retries;
451
452    private ServerName lastServer;
453
454    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
455      this.queueFullTimes = queueFullTimes;
456    }
457
458    @Override
459    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
460      throws IOException {
461      retries++;
462      if (retries == 1) {
463        lastServer = server;
464        throw new CallTimeoutException("simulate call timeout");
465      }
466      // should always retry on the same server
467      assertEquals(lastServer, server);
468      if (retries < queueFullTimes) {
469        throw new CallQueueTooBigException("simulate queue full");
470      }
471      return super.sendRequest(server, req);
472    }
473  }
474
475  /**
476   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
477   * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this
478   * server after a few seconds; crash is supposed to take care of the suspended procedures.
479   */
480  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
481    private int invocations;
482
483    @Override
484    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
485      throws IOException {
486      if (this.invocations++ > 0) {
487        // Return w/o problem the second time through here.
488        return super.execOpenRegion(server, openReq);
489      }
490      // The procedure on master will just hang forever because nothing comes back
491      // from the RS in this case.
492      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
493      executor.schedule(new Runnable() {
494        @Override
495        public void run() {
496          LOG.info("Sending in CRASH of " + server);
497          doCrash(server);
498        }
499      }, 1, TimeUnit.SECONDS);
500      return null;
501    }
502  }
503
504  /**
505   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
506   * proc is stuck/suspended on the Master and won't wake up.). Different with
507   * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the
508   * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for
509   * RS crashed when SCP is not enabled.
510   */
511  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
512    private int invocations;
513
514    @Override
515    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
516      throws IOException {
517      if (this.invocations++ > 0) {
518        // Return w/o problem the second time through here.
519        return super.execOpenRegion(server, openReq);
520      }
521      // The procedure on master will just hang forever because nothing comes back
522      // from the RS in this case.
523      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
524      executor.schedule(new Runnable() {
525        @Override
526        public void run() {
527          LOG.info("Restarting RS of " + server);
528          doRestart(server);
529        }
530      }, 1, TimeUnit.SECONDS);
531      return null;
532    }
533  }
534
535  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
536    public static final int TYPES_OF_FAILURE = 6;
537    private int invocations;
538
539    @Override
540    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
541      throws IOException {
542      switch (this.invocations++) {
543        case 0:
544          throw new NotServingRegionException("Fake");
545        case 1:
546          executor.schedule(new Runnable() {
547            @Override
548            public void run() {
549              LOG.info("Sending in CRASH of " + server);
550              doCrash(server);
551            }
552          }, 1, TimeUnit.SECONDS);
553          throw new RegionServerAbortedException("Fake!");
554        case 2:
555          executor.schedule(new Runnable() {
556            @Override
557            public void run() {
558              LOG.info("Sending in CRASH of " + server);
559              doCrash(server);
560            }
561          }, 1, TimeUnit.SECONDS);
562          throw new RegionServerStoppedException("Fake!");
563        case 3:
564          throw new ServerNotRunningYetException("Fake!");
565        case 4:
566          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
567          executor.schedule(new Runnable() {
568            @Override
569            public void run() {
570              LOG.info("Sending in CRASH of " + server);
571              doCrash(server);
572            }
573          }, 1, TimeUnit.SECONDS);
574          return null;
575        default:
576          return super.execCloseRegion(server, regionName);
577      }
578    }
579  }
580
581  protected class RandRsExecutor extends NoopRsExecutor {
582    @Override
583    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
584      throws IOException {
585      switch (ThreadLocalRandom.current().nextInt(5)) {
586        case 0:
587          throw new ServerNotRunningYetException("wait on server startup");
588        case 1:
589          throw new SocketTimeoutException("simulate socket timeout");
590        case 2:
591          throw new RemoteException("java.io.IOException", "unexpected exception");
592        default:
593          // fall out
594      }
595      return super.sendRequest(server, req);
596    }
597
598    @Override
599    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
600      throws IOException {
601      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
602      long previousOpenSeqNum =
603        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
604      switch (ThreadLocalRandom.current().nextInt(3)) {
605        case 0:
606          LOG.info("Return OPENED response");
607          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
608            previousOpenSeqNum + 2);
609          return OpenRegionResponse.RegionOpeningState.OPENED;
610        case 1:
611          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
612          sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
613          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
614        default:
615          // fall out
616      }
617      // The procedure on master will just hang forever because nothing comes back
618      // from the RS in this case.
619      LOG.info("Return null as response; means proc stuck so we send in a crash report after"
620        + " a few seconds...");
621      executor.schedule(new Runnable() {
622        @Override
623        public void run() {
624          LOG.info("Delayed CRASHING of " + server);
625          doCrash(server);
626        }
627      }, 5, TimeUnit.SECONDS);
628      return null;
629    }
630
631    @Override
632    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
633      throws IOException {
634      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
635      boolean closed = ThreadLocalRandom.current().nextBoolean();
636      if (closed) {
637        RegionInfo hri = am.getRegionInfo(regionName);
638        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
639      }
640      resp.setClosed(closed);
641      return resp.build();
642    }
643  }
644
645  protected interface MockRSExecutor {
646    ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
647      throws IOException;
648  }
649
650  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
651    private MockRSExecutor mockRsExec;
652
653    public MockRSProcedureDispatcher(final MasterServices master) {
654      super(master);
655    }
656
657    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
658      this.mockRsExec = mockRsExec;
659    }
660
661    @Override
662    protected void remoteDispatch(ServerName serverName,
663      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
664      submitTask(new MockRemoteCall(serverName, remoteProcedures));
665    }
666
667    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
668      public MockRemoteCall(final ServerName serverName,
669        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
670        super(serverName, operations);
671      }
672
673      @Override
674      protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
675        final ExecuteProceduresRequest request) throws IOException {
676        return mockRsExec.sendRequest(serverName, request);
677      }
678    }
679  }
680
681  protected final void collectAssignmentManagerMetrics() {
682    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
683    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
684    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
685    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
686    moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount();
687    moveFailedCount = moveProcMetrics.getFailedCounter().getCount();
688    reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount();
689    reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount();
690    openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount();
691    openFailedCount = openProcMetrics.getFailedCounter().getCount();
692    closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount();
693    closeFailedCount = closeProcMetrics.getFailedCounter().getCount();
694  }
695}