001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertNotEquals;
023
024import java.io.IOException;
025import java.io.InterruptedIOException;
026import java.io.UncheckedIOException;
027import java.net.SocketTimeoutException;
028import java.util.Arrays;
029import java.util.NavigableMap;
030import java.util.Set;
031import java.util.SortedSet;
032import java.util.concurrent.ConcurrentSkipListMap;
033import java.util.concurrent.ConcurrentSkipListSet;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Executors;
036import java.util.concurrent.Future;
037import java.util.concurrent.ScheduledExecutorService;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.CallQueueTooBigException;
042import org.apache.hadoop.hbase.HBaseTestingUtil;
043import org.apache.hadoop.hbase.NotServingRegionException;
044import org.apache.hadoop.hbase.ServerMetricsBuilder;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.YouAreDeadException;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.ipc.CallTimeoutException;
051import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
052import org.apache.hadoop.hbase.master.MasterServices;
053import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
054import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
055import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
056import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
059import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
060import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
061import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
062import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
063import org.apache.hadoop.hbase.util.Bytes;
064import org.apache.hadoop.hbase.util.CommonFSUtils;
065import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
066import org.apache.hadoop.ipc.RemoteException;
067import org.junit.After;
068import org.junit.Before;
069import org.junit.Rule;
070import org.junit.rules.TestName;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
075
076import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
088
089/**
090 * Base class for AM test.
091 */
092public abstract class TestAssignmentManagerBase {
093
094  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManagerBase.class);
095
096  @Rule
097  public TestName name = new TestName();
098
099  protected static final int PROC_NTHREADS = 64;
100  protected static final int NREGIONS = 1 * 1000;
101  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
102
103  protected HBaseTestingUtil util;
104  protected MockRSProcedureDispatcher rsDispatcher;
105  protected MockMasterServices master;
106  protected AssignmentManager am;
107  protected NavigableMap<ServerName, SortedSet<byte[]>> regionsToRegionServers =
108    new ConcurrentSkipListMap<ServerName, SortedSet<byte[]>>();
109  // Simple executor to run some simple tasks.
110  protected ScheduledExecutorService executor;
111
112  protected ProcedureMetrics assignProcMetrics;
113  protected ProcedureMetrics unassignProcMetrics;
114  protected ProcedureMetrics moveProcMetrics;
115  protected ProcedureMetrics reopenProcMetrics;
116  protected ProcedureMetrics openProcMetrics;
117  protected ProcedureMetrics closeProcMetrics;
118
119  protected long assignSubmittedCount = 0;
120  protected long assignFailedCount = 0;
121  protected long unassignSubmittedCount = 0;
122  protected long unassignFailedCount = 0;
123  protected long moveSubmittedCount = 0;
124  protected long moveFailedCount = 0;
125  protected long reopenSubmittedCount = 0;
126  protected long reopenFailedCount = 0;
127  protected long openSubmittedCount = 0;
128  protected long openFailedCount = 0;
129  protected long closeSubmittedCount = 0;
130  protected long closeFailedCount = 0;
131
132  protected int newRsAdded;
133
134  protected int getAssignMaxAttempts() {
135    // Have many so we succeed eventually.
136    return 1000;
137  }
138
139  protected void setupConfiguration(Configuration conf) throws Exception {
140    CommonFSUtils.setRootDir(conf, util.getDataTestDir());
141    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
142    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
143    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
144    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
145    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, getAssignMaxAttempts());
146    // make retry for TRSP more frequent
147    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_SLEEP_INTERVAL_MS, 10);
148    conf.setLong(ProcedureUtil.PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, 100);
149    conf.setInt("hbase.master.rs.remote.proc.fail.fast.limit", Integer.MAX_VALUE);
150  }
151
152  @Before
153  public void setUp() throws Exception {
154    util = new HBaseTestingUtil();
155    this.executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
156      .setUncaughtExceptionHandler((t, e) -> LOG.warn("Uncaught: ", e)).build());
157    setupConfiguration(util.getConfiguration());
158    master = new MockMasterServices(util.getConfiguration());
159    rsDispatcher = new MockRSProcedureDispatcher(master);
160    master.start(NSERVERS, rsDispatcher);
161    newRsAdded = 0;
162    am = master.getAssignmentManager();
163    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
164    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
165    moveProcMetrics = am.getAssignmentManagerMetrics().getMoveProcMetrics();
166    reopenProcMetrics = am.getAssignmentManagerMetrics().getReopenProcMetrics();
167    openProcMetrics = am.getAssignmentManagerMetrics().getOpenProcMetrics();
168    closeProcMetrics = am.getAssignmentManagerMetrics().getCloseProcMetrics();
169    setUpMeta();
170  }
171
172  protected void setUpMeta() throws Exception {
173    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
174    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
175    am.wakeMetaLoadedEvent();
176  }
177
178  @After
179  public void tearDown() throws Exception {
180    master.stop("tearDown");
181    this.executor.shutdownNow();
182  }
183
184  protected class NoopRsExecutor implements MockRSExecutor {
185    @Override
186    public ExecuteProceduresResponse sendRequest(ServerName server,
187      ExecuteProceduresRequest request) throws IOException {
188      if (request.getOpenRegionCount() > 0) {
189        for (OpenRegionRequest req : request.getOpenRegionList()) {
190          for (RegionOpenInfo openReq : req.getOpenInfoList()) {
191            execOpenRegion(server, openReq);
192          }
193        }
194      }
195      if (request.getCloseRegionCount() > 0) {
196        for (CloseRegionRequest req : request.getCloseRegionList()) {
197          execCloseRegion(server, req.getRegion().getValue().toByteArray());
198        }
199      }
200      return ExecuteProceduresResponse.newBuilder().build();
201    }
202
203    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
204      throws IOException {
205      return null;
206    }
207
208    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
209      throws IOException {
210      return null;
211    }
212  }
213
214  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
215    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
216  }
217
218  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
219    try {
220      return future.get(3, TimeUnit.MINUTES);
221    } catch (ExecutionException e) {
222      LOG.info("ExecutionException", e);
223      Exception ee = (Exception) e.getCause();
224      if (ee instanceof InterruptedIOException) {
225        for (Procedure<?> p : this.master.getMasterProcedureExecutor().getProcedures()) {
226          LOG.info(p.toStringDetails());
227        }
228      }
229      throw (Exception) e.getCause();
230    }
231  }
232
233  // ============================================================================================
234  // Helpers
235  // ============================================================================================
236  protected void bulkSubmit(TransitRegionStateProcedure[] procs) throws Exception {
237    Thread[] threads = new Thread[PROC_NTHREADS];
238    for (int i = 0; i < threads.length; ++i) {
239      final int threadId = i;
240      threads[i] = new Thread() {
241        @Override
242        public void run() {
243          TableName tableName = TableName.valueOf("table-" + threadId);
244          int n = (procs.length / threads.length);
245          int start = threadId * n;
246          int stop = start + n;
247          for (int j = start; j < stop; ++j) {
248            procs[j] = createAndSubmitAssign(tableName, j);
249          }
250        }
251      };
252      threads[i].start();
253    }
254    for (int i = 0; i < threads.length; ++i) {
255      threads[i].join();
256    }
257    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
258      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
259    }
260  }
261
262  protected TransitRegionStateProcedure createAndSubmitAssign(TableName tableName, int regionId) {
263    RegionInfo hri = createRegionInfo(tableName, regionId);
264    TransitRegionStateProcedure proc = createAssignProcedure(hri);
265    master.getMasterProcedureExecutor().submitProcedure(proc);
266    return proc;
267  }
268
269  protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
270    return RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes(regionId))
271      .setEndKey(Bytes.toBytes(regionId + 1)).setSplit(false).setRegionId(0).build();
272  }
273
274  protected TransitRegionStateProcedure createAssignProcedure(RegionInfo hri) {
275    return am.createAssignProcedures(Arrays.asList(hri))[0];
276  }
277
278  protected TransitRegionStateProcedure createUnassignProcedure(RegionInfo hri) {
279    RegionStateNode regionNode = am.getRegionStates().getRegionStateNode(hri);
280    TransitRegionStateProcedure proc;
281    regionNode.lock();
282    try {
283      assertFalse(regionNode.isInTransition());
284      proc = TransitRegionStateProcedure
285        .unassign(master.getMasterProcedureExecutor().getEnvironment(), hri);
286      regionNode.setProcedure(proc);
287    } finally {
288      regionNode.unlock();
289    }
290    return proc;
291  }
292
293  protected void sendTransitionReport(final ServerName serverName,
294    final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
295    final TransitionCode state, long seqId) throws IOException {
296    ReportRegionStateTransitionRequest.Builder req =
297      ReportRegionStateTransitionRequest.newBuilder();
298    req.setServer(ProtobufUtil.toServerName(serverName));
299    req.addTransition(RegionStateTransition.newBuilder().addRegionInfo(regionInfo)
300      .setTransitionCode(state).setOpenSeqNum(seqId).build());
301    am.reportRegionStateTransition(req.build());
302  }
303
304  protected void doCrash(final ServerName serverName) {
305    this.master.getServerManager().moveFromOnlineToDeadServers(serverName);
306    this.am.submitServerCrash(serverName, false/* No WALs here */, false);
307    // add a new server to avoid killing all the region servers which may hang the UTs
308    ServerName newSn = ServerName.valueOf("localhost", 10000 + newRsAdded, 1);
309    newRsAdded++;
310    try {
311      this.master.getServerManager().regionServerReport(newSn, ServerMetricsBuilder
312        .newBuilder(newSn).setLastReportTimestamp(EnvironmentEdgeManager.currentTime()).build());
313    } catch (YouAreDeadException e) {
314      // should not happen
315      throw new UncheckedIOException(e);
316    }
317  }
318
319  protected void doRestart(final ServerName serverName) {
320    try {
321      this.master.restartRegionServer(serverName);
322    } catch (IOException e) {
323      LOG.warn("Can not restart RS with new startcode");
324    }
325  }
326
327  protected class GoodRsExecutor extends NoopRsExecutor {
328    @Override
329    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
330      throws IOException {
331      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
332      long previousOpenSeqNum =
333        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
334      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
335        previousOpenSeqNum + 2);
336      // Concurrency?
337      // Now update the state of our cluster in regionsToRegionServers.
338      SortedSet<byte[]> regions = regionsToRegionServers.get(server);
339      if (regions == null) {
340        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
341        regionsToRegionServers.put(server, regions);
342      }
343      if (regions.contains(hri.getRegionName())) {
344        throw new UnsupportedOperationException(hri.getRegionNameAsString());
345      }
346      regions.add(hri.getRegionName());
347      return RegionOpeningState.OPENED;
348    }
349
350    @Override
351    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
352      throws IOException {
353      RegionInfo hri = am.getRegionInfo(regionName);
354      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
355      return CloseRegionResponse.newBuilder().setClosed(true).build();
356    }
357  }
358
359  protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
360    @Override
361    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
362      throws IOException {
363      throw new ServerNotRunningYetException("wait on server startup");
364    }
365  }
366
367  protected static class FaultyRsExecutor implements MockRSExecutor {
368    private final IOException exception;
369
370    public FaultyRsExecutor(final IOException exception) {
371      this.exception = exception;
372    }
373
374    @Override
375    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
376      throws IOException {
377      throw exception;
378    }
379  }
380
381  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
382    private final int timeoutTimes;
383
384    private ServerName lastServer;
385    private int retries;
386
387    public SocketTimeoutRsExecutor(int timeoutTimes) {
388      this.timeoutTimes = timeoutTimes;
389    }
390
391    @Override
392    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
393      throws IOException {
394      // SocketTimeoutException should be a temporary problem
395      // unless the server will be declared dead.
396      retries++;
397      if (retries == 1) {
398        lastServer = server;
399      }
400      if (retries <= timeoutTimes) {
401        LOG.debug("Socket timeout for server=" + server + " retries=" + retries);
402        // should not change the server if the server is not dead yet.
403        assertEquals(lastServer, server);
404        if (retries == timeoutTimes) {
405          LOG.info("Mark server=" + server + " as dead. retries=" + retries);
406          master.getServerManager().moveFromOnlineToDeadServers(server);
407          executor.schedule(new Runnable() {
408            @Override
409            public void run() {
410              LOG.info("Sending in CRASH of " + server);
411              doCrash(server);
412            }
413          }, 1, TimeUnit.SECONDS);
414        }
415        throw new SocketTimeoutException("simulate socket timeout");
416      } else {
417        // should select another server
418        assertNotEquals(lastServer, server);
419        return super.sendRequest(server, req);
420      }
421    }
422  }
423
424  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
425
426    private boolean invoked = false;
427
428    private ServerName lastServer;
429
430    @Override
431    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
432      throws IOException {
433      if (!invoked) {
434        lastServer = server;
435        invoked = true;
436        throw new CallQueueTooBigException("simulate queue full");
437      }
438      // better select another server since the server is over loaded, but anyway, it is fine to
439      // still select the same server since it is not dead yet...
440      if (lastServer.equals(server)) {
441        LOG.warn("We still select the same server, which is not good.");
442      }
443      return super.sendRequest(server, req);
444    }
445  }
446
447  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
448
449    private final int queueFullTimes;
450
451    private int retries;
452
453    private ServerName lastServer;
454
455    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
456      this.queueFullTimes = queueFullTimes;
457    }
458
459    @Override
460    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
461      throws IOException {
462      retries++;
463      if (retries == 1) {
464        lastServer = server;
465        throw new CallTimeoutException("simulate call timeout");
466      }
467      // should always retry on the same server
468      assertEquals(lastServer, server);
469      if (retries < queueFullTimes) {
470        throw new CallQueueTooBigException("simulate queue full");
471      }
472      return super.sendRequest(server, req);
473    }
474  }
475
476  /**
477   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
478   * proc is stuck/suspended on the Master and won't wake up.). We then send in a crash for this
479   * server after a few seconds; crash is supposed to take care of the suspended procedures.
480   */
481  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
482    private int invocations;
483
484    @Override
485    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
486      throws IOException {
487      if (this.invocations++ > 0) {
488        // Return w/o problem the second time through here.
489        return super.execOpenRegion(server, openReq);
490      }
491      // The procedure on master will just hang forever because nothing comes back
492      // from the RS in this case.
493      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
494      executor.schedule(new Runnable() {
495        @Override
496        public void run() {
497          LOG.info("Sending in CRASH of " + server);
498          doCrash(server);
499        }
500      }, 1, TimeUnit.SECONDS);
501      return null;
502    }
503  }
504
505  /**
506   * Takes open request and then returns nothing so acts like a RS that went zombie. No response (so
507   * proc is stuck/suspended on the Master and won't wake up.). Different with
508   * HangThenRSCrashExecutor, HangThenRSCrashExecutor will create ServerCrashProcedure to handle the
509   * server crash. However, this HangThenRSRestartExecutor will restart RS directly, situation for
510   * RS crashed when SCP is not enabled.
511   */
512  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
513    private int invocations;
514
515    @Override
516    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
517      throws IOException {
518      if (this.invocations++ > 0) {
519        // Return w/o problem the second time through here.
520        return super.execOpenRegion(server, openReq);
521      }
522      // The procedure on master will just hang forever because nothing comes back
523      // from the RS in this case.
524      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
525      executor.schedule(new Runnable() {
526        @Override
527        public void run() {
528          LOG.info("Restarting RS of " + server);
529          doRestart(server);
530        }
531      }, 1, TimeUnit.SECONDS);
532      return null;
533    }
534  }
535
536  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
537    public static final int TYPES_OF_FAILURE = 6;
538    private int invocations;
539
540    @Override
541    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
542      throws IOException {
543      switch (this.invocations++) {
544        case 0:
545          throw new NotServingRegionException("Fake");
546        case 1:
547          executor.schedule(new Runnable() {
548            @Override
549            public void run() {
550              LOG.info("Sending in CRASH of " + server);
551              doCrash(server);
552            }
553          }, 1, TimeUnit.SECONDS);
554          throw new RegionServerAbortedException("Fake!");
555        case 2:
556          executor.schedule(new Runnable() {
557            @Override
558            public void run() {
559              LOG.info("Sending in CRASH of " + server);
560              doCrash(server);
561            }
562          }, 1, TimeUnit.SECONDS);
563          throw new RegionServerStoppedException("Fake!");
564        case 3:
565          throw new ServerNotRunningYetException("Fake!");
566        case 4:
567          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
568          executor.schedule(new Runnable() {
569            @Override
570            public void run() {
571              LOG.info("Sending in CRASH of " + server);
572              doCrash(server);
573            }
574          }, 1, TimeUnit.SECONDS);
575          return null;
576        default:
577          return super.execCloseRegion(server, regionName);
578      }
579    }
580  }
581
582  protected class RandRsExecutor extends NoopRsExecutor {
583    @Override
584    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
585      throws IOException {
586      switch (ThreadLocalRandom.current().nextInt(5)) {
587        case 0:
588          throw new ServerNotRunningYetException("wait on server startup");
589        case 1:
590          throw new SocketTimeoutException("simulate socket timeout");
591        case 2:
592          throw new RemoteException("java.io.IOException", "unexpected exception");
593        default:
594          // fall out
595      }
596      return super.sendRequest(server, req);
597    }
598
599    @Override
600    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
601      throws IOException {
602      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
603      long previousOpenSeqNum =
604        am.getRegionStates().getOrCreateRegionStateNode(hri).getOpenSeqNum();
605      switch (ThreadLocalRandom.current().nextInt(3)) {
606        case 0:
607          LOG.info("Return OPENED response");
608          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED,
609            previousOpenSeqNum + 2);
610          return OpenRegionResponse.RegionOpeningState.OPENED;
611        case 1:
612          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
613          sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN, -1);
614          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
615        default:
616          // fall out
617      }
618      // The procedure on master will just hang forever because nothing comes back
619      // from the RS in this case.
620      LOG.info("Return null as response; means proc stuck so we send in a crash report after"
621        + " a few seconds...");
622      executor.schedule(new Runnable() {
623        @Override
624        public void run() {
625          LOG.info("Delayed CRASHING of " + server);
626          doCrash(server);
627        }
628      }, 5, TimeUnit.SECONDS);
629      return null;
630    }
631
632    @Override
633    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
634      throws IOException {
635      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
636      boolean closed = ThreadLocalRandom.current().nextBoolean();
637      if (closed) {
638        RegionInfo hri = am.getRegionInfo(regionName);
639        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED, -1);
640      }
641      resp.setClosed(closed);
642      return resp.build();
643    }
644  }
645
646  protected interface MockRSExecutor {
647    ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
648      throws IOException;
649  }
650
651  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
652    private MockRSExecutor mockRsExec;
653
654    public MockRSProcedureDispatcher(final MasterServices master) {
655      super(master);
656    }
657
658    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
659      this.mockRsExec = mockRsExec;
660    }
661
662    @Override
663    protected void remoteDispatch(ServerName serverName,
664      @SuppressWarnings("rawtypes") Set<RemoteProcedure> remoteProcedures) {
665      submitTask(new MockRemoteCall(serverName, remoteProcedures));
666    }
667
668    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
669      public MockRemoteCall(final ServerName serverName,
670        @SuppressWarnings("rawtypes") final Set<RemoteProcedure> operations) {
671        super(serverName, operations);
672      }
673
674      @Override
675      protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
676        final ExecuteProceduresRequest request) throws IOException {
677        return mockRsExec.sendRequest(serverName, request);
678      }
679    }
680  }
681
682  protected final void collectAssignmentManagerMetrics() {
683    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
684    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
685    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
686    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
687    moveSubmittedCount = moveProcMetrics.getSubmittedCounter().getCount();
688    moveFailedCount = moveProcMetrics.getFailedCounter().getCount();
689    reopenSubmittedCount = reopenProcMetrics.getSubmittedCounter().getCount();
690    reopenFailedCount = reopenProcMetrics.getFailedCounter().getCount();
691    openSubmittedCount = openProcMetrics.getSubmittedCounter().getCount();
692    openFailedCount = openProcMetrics.getFailedCounter().getCount();
693    closeSubmittedCount = closeProcMetrics.getSubmittedCounter().getCount();
694    closeFailedCount = closeProcMetrics.getFailedCounter().getCount();
695  }
696}