001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.net.SocketTimeoutException;
028import java.util.List;
029import java.util.NavigableMap;
030import java.util.Random;
031import java.util.Set;
032import java.util.SortedSet;
033import java.util.concurrent.ConcurrentSkipListMap;
034import java.util.concurrent.ConcurrentSkipListSet;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.Executors;
037import java.util.concurrent.Future;
038import java.util.concurrent.ScheduledExecutorService;
039import java.util.concurrent.TimeUnit;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.hbase.CallQueueTooBigException;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.HBaseTestingUtility;
045import org.apache.hadoop.hbase.NotServingRegionException;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.RetriesExhaustedException;
051import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
052import org.apache.hadoop.hbase.ipc.CallTimeoutException;
053import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
054import org.apache.hadoop.hbase.master.MasterServices;
055import org.apache.hadoop.hbase.master.RegionState.State;
056import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
057import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
058import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
059import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
060import org.apache.hadoop.hbase.procedure2.Procedure;
061import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
063import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
064import org.apache.hadoop.hbase.procedure2.util.StringUtils;
065import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
066import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
067import org.apache.hadoop.hbase.testclassification.LargeTests;
068import org.apache.hadoop.hbase.testclassification.MasterTests;
069import org.apache.hadoop.hbase.util.Bytes;
070import org.apache.hadoop.hbase.util.FSUtils;
071import org.apache.hadoop.ipc.RemoteException;
072import org.junit.After;
073import org.junit.Before;
074import org.junit.ClassRule;
075import org.junit.Ignore;
076import org.junit.Rule;
077import org.junit.Test;
078import org.junit.experimental.categories.Category;
079import org.junit.rules.ExpectedException;
080import org.junit.rules.TestName;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
093import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
094import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
095import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
096
097@Category({MasterTests.class, LargeTests.class})
098public class TestAssignmentManager {
099
100  @ClassRule
101  public static final HBaseClassTestRule CLASS_RULE =
102      HBaseClassTestRule.forClass(TestAssignmentManager.class);
103
104  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
105
106  @Rule public TestName name = new TestName();
107  @Rule public final ExpectedException exception = ExpectedException.none();
108
109  private static final int PROC_NTHREADS = 64;
110  private static final int NREGIONS = 1 * 1000;
111  private static final int NSERVERS = Math.max(1, NREGIONS / 100);
112
113  private HBaseTestingUtility UTIL;
114  private MockRSProcedureDispatcher rsDispatcher;
115  private MockMasterServices master;
116  private AssignmentManager am;
117  private NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
118      new ConcurrentSkipListMap<ServerName, SortedSet<byte []>>();
119  // Simple executor to run some simple tasks.
120  private ScheduledExecutorService executor;
121
122  private ProcedureMetrics assignProcMetrics;
123  private ProcedureMetrics unassignProcMetrics;
124
125  private long assignSubmittedCount = 0;
126  private long assignFailedCount = 0;
127  private long unassignSubmittedCount = 0;
128  private long unassignFailedCount = 0;
129
130  private void setupConfiguration(Configuration conf) throws Exception {
131    FSUtils.setRootDir(conf, UTIL.getDataTestDir());
132    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
133    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
134    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
135    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
136    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
137  }
138
139  @Before
140  public void setUp() throws Exception {
141    UTIL = new HBaseTestingUtility();
142    this.executor = Executors.newSingleThreadScheduledExecutor();
143    setupConfiguration(UTIL.getConfiguration());
144    master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
145    rsDispatcher = new MockRSProcedureDispatcher(master);
146    master.start(NSERVERS, rsDispatcher);
147    am = master.getAssignmentManager();
148    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
149    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
150    setUpMeta();
151  }
152
153  private void setUpMeta() throws Exception {
154    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
155    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
156    am.wakeMetaLoadedEvent();
157  }
158
159  @After
160  public void tearDown() throws Exception {
161    master.stop("tearDown");
162    this.executor.shutdownNow();
163  }
164
165  @Test (expected=NullPointerException.class)
166  public void testWaitServerReportEventWithNullServer() throws UnexpectedStateException {
167    // Test what happens if we pass in null server. I'd expect it throws NPE.
168    if (this.am.waitServerReportEvent(null, null)) throw new UnexpectedStateException();
169  }
170
171  @Test
172  public void testAssignWithGoodExec() throws Exception {
173    // collect AM metrics before test
174    collectAssignmentManagerMetrics();
175
176    testAssign(new GoodRsExecutor());
177
178    assertEquals(assignSubmittedCount + NREGIONS,
179        assignProcMetrics.getSubmittedCounter().getCount());
180    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
181  }
182
183  @Test
184  public void testAssignAndCrashBeforeResponse() throws Exception {
185    final TableName tableName = TableName.valueOf("testAssignAndCrashBeforeResponse");
186    final RegionInfo hri = createRegionInfo(tableName, 1);
187    rsDispatcher.setMockRsExecutor(new HangThenRSCrashExecutor());
188    AssignProcedure proc = am.createAssignProcedure(hri);
189    waitOnFuture(submitProcedure(proc));
190  }
191
192  @Test
193  public void testUnassignAndCrashBeforeResponse() throws Exception {
194    final TableName tableName = TableName.valueOf("testAssignAndCrashBeforeResponse");
195    final RegionInfo hri = createRegionInfo(tableName, 1);
196    rsDispatcher.setMockRsExecutor(new HangOnCloseThenRSCrashExecutor());
197    for (int i = 0; i < HangOnCloseThenRSCrashExecutor.TYPES_OF_FAILURE; i++) {
198      AssignProcedure assign = am.createAssignProcedure(hri);
199      waitOnFuture(submitProcedure(assign));
200      UnassignProcedure unassign = am.createUnassignProcedure(hri,
201          am.getRegionStates().getRegionServerOfRegion(hri), false);
202      waitOnFuture(submitProcedure(unassign));
203    }
204  }
205
206  @Test
207  public void testAssignWithRandExec() throws Exception {
208    final TableName tableName = TableName.valueOf("testAssignWithRandExec");
209    final RegionInfo hri = createRegionInfo(tableName, 1);
210
211    rsDispatcher.setMockRsExecutor(new RandRsExecutor());
212    // Loop a bunch of times so we hit various combos of exceptions.
213    for (int i = 0; i < 10; i++) {
214      LOG.info("ROUND=" + i);
215      AssignProcedure proc = am.createAssignProcedure(hri);
216      waitOnFuture(submitProcedure(proc));
217    }
218  }
219
220  @Ignore @Test // Disabled for now. Since HBASE-18551, this mock is insufficient.
221  public void testSocketTimeout() throws Exception {
222    final TableName tableName = TableName.valueOf(this.name.getMethodName());
223    final RegionInfo hri = createRegionInfo(tableName, 1);
224
225    // collect AM metrics before test
226    collectAssignmentManagerMetrics();
227
228    rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 3));
229    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
230
231    rsDispatcher.setMockRsExecutor(new SocketTimeoutRsExecutor(20, 1));
232    // exception.expect(ServerCrashException.class);
233    waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false)));
234
235    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
236    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
237    assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
238    assertEquals(unassignFailedCount + 1, unassignProcMetrics.getFailedCounter().getCount());
239  }
240
241  @Test
242  public void testServerNotYetRunning() throws Exception {
243    testRetriesExhaustedFailure(TableName.valueOf(this.name.getMethodName()),
244      new ServerNotYetRunningRsExecutor());
245  }
246
247  private void testRetriesExhaustedFailure(final TableName tableName,
248      final MockRSExecutor executor) throws Exception {
249    final RegionInfo hri = createRegionInfo(tableName, 1);
250
251    // collect AM metrics before test
252    collectAssignmentManagerMetrics();
253
254    // Test Assign operation failure
255    rsDispatcher.setMockRsExecutor(executor);
256    try {
257      waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
258      fail("unexpected assign completion");
259    } catch (RetriesExhaustedException e) {
260      // expected exception
261      LOG.info("expected exception from assign operation: " + e.getMessage(), e);
262    }
263
264    // Assign the region (without problems)
265    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
266    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
267
268    // TODO: Currently unassign just keeps trying until it sees a server crash.
269    // There is no count on unassign.
270    /*
271    // Test Unassign operation failure
272    rsDispatcher.setMockRsExecutor(executor);
273    waitOnFuture(submitProcedure(am.createUnassignProcedure(hri, null, false)));
274
275    assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount());
276    assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
277    assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
278
279    // TODO: We supposed to have 1 failed assign, 1 successful assign and a failed unassign
280    // operation. But ProcV2 framework marks aborted unassign operation as success. Fix it!
281    assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
282    */
283  }
284
285
286  @Test
287  public void testIOExceptionOnAssignment() throws Exception {
288    // collect AM metrics before test
289    collectAssignmentManagerMetrics();
290
291    testFailedOpen(TableName.valueOf("testExceptionOnAssignment"),
292      new FaultyRsExecutor(new IOException("test fault")));
293
294    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
295    assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
296  }
297
298  @Test
299  public void testDoNotRetryExceptionOnAssignment() throws Exception {
300    // collect AM metrics before test
301    collectAssignmentManagerMetrics();
302
303    testFailedOpen(TableName.valueOf("testDoNotRetryExceptionOnAssignment"),
304      new FaultyRsExecutor(new DoNotRetryIOException("test do not retry fault")));
305
306    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
307    assertEquals(assignFailedCount + 1, assignProcMetrics.getFailedCounter().getCount());
308  }
309
310  private void testFailedOpen(final TableName tableName,
311      final MockRSExecutor executor) throws Exception {
312    final RegionInfo hri = createRegionInfo(tableName, 1);
313
314    // Test Assign operation failure
315    rsDispatcher.setMockRsExecutor(executor);
316    try {
317      waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
318      fail("unexpected assign completion");
319    } catch (RetriesExhaustedException e) {
320      // expected exception
321      LOG.info("REGION STATE " + am.getRegionStates().getRegionStateNode(hri));
322      LOG.info("expected exception from assign operation: " + e.getMessage(), e);
323      assertEquals(true, am.getRegionStates().getRegionState(hri).isFailedOpen());
324    }
325  }
326
327  private void testAssign(final MockRSExecutor executor) throws Exception {
328    testAssign(executor, NREGIONS);
329  }
330
331  private void testAssign(final MockRSExecutor executor, final int nregions) throws Exception {
332    rsDispatcher.setMockRsExecutor(executor);
333
334    AssignProcedure[] assignments = new AssignProcedure[nregions];
335
336    long st = System.currentTimeMillis();
337    bulkSubmit(assignments);
338
339    for (int i = 0; i < assignments.length; ++i) {
340      ProcedureTestingUtility.waitProcedure(
341        master.getMasterProcedureExecutor(), assignments[i]);
342      assertTrue(assignments[i].toString(), assignments[i].isSuccess());
343    }
344    long et = System.currentTimeMillis();
345    float sec = ((et - st) / 1000.0f);
346    LOG.info(String.format("[T] Assigning %dprocs in %s (%.2fproc/sec)",
347        assignments.length, StringUtils.humanTimeDiff(et - st), assignments.length / sec));
348  }
349
350  @Test
351  public void testAssignAnAssignedRegion() throws Exception {
352    final TableName tableName = TableName.valueOf("testAssignAnAssignedRegion");
353    final RegionInfo hri = createRegionInfo(tableName, 1);
354
355    // collect AM metrics before test
356    collectAssignmentManagerMetrics();
357
358    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
359
360    final Future<byte[]> futureA = submitProcedure(am.createAssignProcedure(hri));
361
362    // wait first assign
363    waitOnFuture(futureA);
364    am.getRegionStates().isRegionInState(hri, State.OPEN);
365    // Second should be a noop. We should recognize region is already OPEN internally
366    // and skip out doing nothing.
367    // wait second assign
368    final Future<byte[]> futureB = submitProcedure(am.createAssignProcedure(hri));
369    waitOnFuture(futureB);
370    am.getRegionStates().isRegionInState(hri, State.OPEN);
371    // TODO: What else can we do to ensure just a noop.
372
373    // TODO: Though second assign is noop, it's considered success, can noop be handled in a
374    // better way?
375    assertEquals(assignSubmittedCount + 2, assignProcMetrics.getSubmittedCounter().getCount());
376    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
377
378  }
379
380  @Test
381  public void testUnassignAnUnassignedRegion() throws Exception {
382    final TableName tableName = TableName.valueOf("testUnassignAnUnassignedRegion");
383    final RegionInfo hri = createRegionInfo(tableName, 1);
384
385    // collect AM metrics before test
386    collectAssignmentManagerMetrics();
387
388    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
389
390    // assign the region first
391    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
392
393    final Future<byte[]> futureA = submitProcedure(am.createUnassignProcedure(hri, null, false));
394
395    // Wait first unassign.
396    waitOnFuture(futureA);
397    am.getRegionStates().isRegionInState(hri, State.CLOSED);
398    // Second should be a noop. We should recognize region is already CLOSED internally
399    // and skip out doing nothing.
400    final Future<byte[]> futureB =
401        submitProcedure(am.createUnassignProcedure(hri,
402            ServerName.valueOf("example.org,1234,1"), false));
403    waitOnFuture(futureB);
404    // Ensure we are still CLOSED.
405    am.getRegionStates().isRegionInState(hri, State.CLOSED);
406    // TODO: What else can we do to ensure just a noop.
407
408    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
409    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
410    // TODO: Though second unassign is noop, it's considered success, can noop be handled in a
411    // better way?
412    assertEquals(unassignSubmittedCount + 2, unassignProcMetrics.getSubmittedCounter().getCount());
413    assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
414  }
415
416  /**
417   * It is possible that when AM send assign meta request to a RS successfully,
418   * but RS can not send back any response, which cause master startup hangs forever
419   */
420  @Test
421  public void testAssignMetaAndCrashBeforeResponse() throws Exception {
422    tearDown();
423    // See setUp(), start HBase until set up meta
424    UTIL = new HBaseTestingUtility();
425    this.executor = Executors.newSingleThreadScheduledExecutor();
426    setupConfiguration(UTIL.getConfiguration());
427    master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
428    rsDispatcher = new MockRSProcedureDispatcher(master);
429    master.start(NSERVERS, rsDispatcher);
430    am = master.getAssignmentManager();
431
432    // Assign meta
433    rsDispatcher.setMockRsExecutor(new HangThenRSRestartExecutor());
434    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
435    assertEquals(true, am.isMetaAssigned());
436
437    // set it back as default, see setUpMeta()
438    am.wakeMetaLoadedEvent();
439  }
440
441  @Test
442  public void testAssignQueueFullOnce() throws Exception {
443    TableName tableName = TableName.valueOf(this.name.getMethodName());
444    RegionInfo hri = createRegionInfo(tableName, 1);
445
446    // collect AM metrics before test
447    collectAssignmentManagerMetrics();
448
449    rsDispatcher.setMockRsExecutor(new CallQueueTooBigOnceRsExecutor());
450    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
451
452    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
453    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
454  }
455
456  @Test
457  public void testTimeoutThenQueueFull() throws Exception {
458    TableName tableName = TableName.valueOf(this.name.getMethodName());
459    RegionInfo hri = createRegionInfo(tableName, 1);
460
461    // collect AM metrics before test
462    collectAssignmentManagerMetrics();
463
464    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(10));
465    waitOnFuture(submitProcedure(am.createAssignProcedure(hri)));
466    rsDispatcher.setMockRsExecutor(new TimeoutThenCallQueueTooBigRsExecutor(15));
467    waitOnFuture(submitProcedure(am.createUnassignProcedure(hri)));
468
469    assertEquals(assignSubmittedCount + 1, assignProcMetrics.getSubmittedCounter().getCount());
470    assertEquals(assignFailedCount, assignProcMetrics.getFailedCounter().getCount());
471    assertEquals(unassignSubmittedCount + 1, unassignProcMetrics.getSubmittedCounter().getCount());
472    assertEquals(unassignFailedCount, unassignProcMetrics.getFailedCounter().getCount());
473  }
474
475
476  private Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
477    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
478  }
479
480  private byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
481    try {
482      return future.get(5, TimeUnit.SECONDS);
483    } catch (ExecutionException e) {
484      LOG.info("ExecutionException", e);
485      Exception ee = (Exception)e.getCause();
486      if (ee instanceof InterruptedIOException) {
487        for (Procedure<?> p: this.master.getMasterProcedureExecutor().getProcedures()) {
488          LOG.info(p.toStringDetails());
489        }
490      }
491      throw (Exception)e.getCause();
492    }
493  }
494
495  // ============================================================================================
496  //  Helpers
497  // ============================================================================================
498  private void bulkSubmit(final AssignProcedure[] procs) throws Exception {
499    final Thread[] threads = new Thread[PROC_NTHREADS];
500    for (int i = 0; i < threads.length; ++i) {
501      final int threadId = i;
502      threads[i] = new Thread() {
503        @Override
504        public void run() {
505          TableName tableName = TableName.valueOf("table-" + threadId);
506          int n = (procs.length / threads.length);
507          int start = threadId * n;
508          int stop = start + n;
509          for (int j = start; j < stop; ++j) {
510            procs[j] = createAndSubmitAssign(tableName, j);
511          }
512        }
513      };
514      threads[i].start();
515    }
516    for (int i = 0; i < threads.length; ++i) {
517      threads[i].join();
518    }
519    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
520      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
521    }
522  }
523
524  private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) {
525    RegionInfo hri = createRegionInfo(tableName, regionId);
526    AssignProcedure proc = am.createAssignProcedure(hri);
527    master.getMasterProcedureExecutor().submitProcedure(proc);
528    return proc;
529  }
530
531  private RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
532    return RegionInfoBuilder.newBuilder(tableName)
533        .setStartKey(Bytes.toBytes(regionId))
534        .setEndKey(Bytes.toBytes(regionId + 1))
535        .setSplit(false)
536        .setRegionId(0)
537        .build();
538  }
539
540  private void sendTransitionReport(final ServerName serverName,
541      final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
542      final TransitionCode state) throws IOException {
543    ReportRegionStateTransitionRequest.Builder req =
544      ReportRegionStateTransitionRequest.newBuilder();
545    req.setServer(ProtobufUtil.toServerName(serverName));
546    req.addTransition(RegionStateTransition.newBuilder()
547      .addRegionInfo(regionInfo)
548      .setTransitionCode(state)
549      .setOpenSeqNum(1)
550      .build());
551    am.reportRegionStateTransition(req.build());
552  }
553
554  private void doCrash(final ServerName serverName) {
555    this.am.submitServerCrash(serverName, false/*No WALs here*/);
556  }
557
558  private void doRestart(final ServerName serverName) {
559    try {
560      this.master.restartRegionServer(serverName);
561    } catch (IOException e) {
562      LOG.warn("Can not restart RS with new startcode");
563    }
564  }
565
566  private class NoopRsExecutor implements MockRSExecutor {
567    @Override
568    public ExecuteProceduresResponse sendRequest(ServerName server,
569        ExecuteProceduresRequest request) throws IOException {
570      if (request.getOpenRegionCount() > 0) {
571        for (OpenRegionRequest req : request.getOpenRegionList()) {
572          for (RegionOpenInfo openReq : req.getOpenInfoList()) {
573            execOpenRegion(server, openReq);
574          }
575        }
576      }
577      if (request.getCloseRegionCount() > 0) {
578        for (CloseRegionRequest req : request.getCloseRegionList()) {
579          execCloseRegion(server, req.getRegion().getValue().toByteArray());
580        }
581      }
582      return ExecuteProceduresResponse.newBuilder().build();
583    }
584
585    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
586        throws IOException {
587      return null;
588    }
589
590    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
591        throws IOException {
592      return null;
593    }
594  }
595
596  private class GoodRsExecutor extends NoopRsExecutor {
597    @Override
598    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
599        throws IOException {
600      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
601      // Concurrency?
602      // Now update the state of our cluster in regionsToRegionServers.
603      SortedSet<byte []> regions = regionsToRegionServers.get(server);
604      if (regions == null) {
605        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
606        regionsToRegionServers.put(server, regions);
607      }
608      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
609      if (regions.contains(hri.getRegionName())) {
610        throw new UnsupportedOperationException(hri.getRegionNameAsString());
611      }
612      regions.add(hri.getRegionName());
613      return RegionOpeningState.OPENED;
614    }
615
616    @Override
617    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
618        throws IOException {
619      RegionInfo hri = am.getRegionInfo(regionName);
620      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
621      return CloseRegionResponse.newBuilder().setClosed(true).build();
622    }
623  }
624
625  private static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
626    @Override
627    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
628        throws IOException {
629      throw new ServerNotRunningYetException("wait on server startup");
630    }
631  }
632
633  private static class FaultyRsExecutor implements MockRSExecutor {
634    private final IOException exception;
635
636    public FaultyRsExecutor(final IOException exception) {
637      this.exception = exception;
638    }
639
640    @Override
641    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
642        throws IOException {
643      throw exception;
644    }
645  }
646
647  private class SocketTimeoutRsExecutor extends GoodRsExecutor {
648    private final int maxSocketTimeoutRetries;
649    private final int maxServerRetries;
650
651    private ServerName lastServer;
652    private int sockTimeoutRetries;
653    private int serverRetries;
654
655    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
656      this.maxServerRetries = maxServerRetries;
657      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
658    }
659
660    @Override
661    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
662        throws IOException {
663      // SocketTimeoutException should be a temporary problem
664      // unless the server will be declared dead.
665      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
666        if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
667        lastServer = server;
668        LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
669        throw new SocketTimeoutException("simulate socket timeout");
670      } else if (serverRetries++ < maxServerRetries) {
671        LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
672        master.getServerManager().moveFromOnlineToDeadServers(server);
673        sockTimeoutRetries = 0;
674        throw new SocketTimeoutException("simulate socket timeout");
675      } else {
676        return super.sendRequest(server, req);
677      }
678    }
679  }
680
681  /**
682   * Takes open request and then returns nothing so acts like a RS that went zombie.
683   * No response (so proc is stuck/suspended on the Master and won't wake up.). We
684   * then send in a crash for this server after a few seconds; crash is supposed to
685   * take care of the suspended procedures.
686   */
687  private class HangThenRSCrashExecutor extends GoodRsExecutor {
688    private int invocations;
689
690    @Override
691    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
692    throws IOException {
693      if (this.invocations++ > 0) {
694        // Return w/o problem the second time through here.
695        return super.execOpenRegion(server, openReq);
696      }
697      // The procedure on master will just hang forever because nothing comes back
698      // from the RS in this case.
699      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
700      executor.schedule(new Runnable() {
701        @Override
702        public void run() {
703          LOG.info("Sending in CRASH of " + server);
704          doCrash(server);
705        }
706      }, 1, TimeUnit.SECONDS);
707      return null;
708    }
709  }
710
711  /**
712   * Takes open request and then returns nothing so acts like a RS that went zombie.
713   * No response (so proc is stuck/suspended on the Master and won't wake up.).
714   * Different with HangThenRSCrashExecutor,  HangThenRSCrashExecutor will create
715   * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor
716   * will restart RS directly, situation for RS crashed when SCP is not enabled.
717   */
718  private class HangThenRSRestartExecutor extends GoodRsExecutor {
719    private int invocations;
720
721    @Override
722    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
723        throws IOException {
724      if (this.invocations++ > 0) {
725        // Return w/o problem the second time through here.
726        return super.execOpenRegion(server, openReq);
727      }
728      // The procedure on master will just hang forever because nothing comes back
729      // from the RS in this case.
730      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
731      executor.schedule(new Runnable() {
732        @Override
733        public void run() {
734          LOG.info("Restarting RS of " + server);
735          doRestart(server);
736        }
737      }, 1, TimeUnit.SECONDS);
738      return null;
739    }
740  }
741
742  private class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
743    public static final int TYPES_OF_FAILURE = 6;
744    private int invocations;
745
746    @Override
747    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
748        throws IOException {
749      switch (this.invocations++) {
750        case 0: throw new NotServingRegionException("Fake");
751        case 1:
752          executor.schedule(new Runnable() {
753            @Override
754            public void run() {
755              LOG.info("Sending in CRASH of " + server);
756              doCrash(server);
757            }
758          }, 1, TimeUnit.SECONDS);
759          throw new RegionServerAbortedException("Fake!");
760        case 2:
761          executor.schedule(new Runnable() {
762            @Override
763            public void run() {
764              LOG.info("Sending in CRASH of " + server);
765              doCrash(server);
766            }
767          }, 1, TimeUnit.SECONDS);
768          throw new RegionServerStoppedException("Fake!");
769        case 3: throw new ServerNotRunningYetException("Fake!");
770        case 4:
771          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
772          executor.schedule(new Runnable() {
773            @Override
774            public void run() {
775              LOG.info("Sending in CRASH of " + server);
776              doCrash(server);
777            }
778          }, 1, TimeUnit.SECONDS);
779          return null;
780        default:
781          return super.execCloseRegion(server, regionName);
782      }
783    }
784  }
785
786  private class RandRsExecutor extends NoopRsExecutor {
787    private final Random rand = new Random();
788
789    @Override
790    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
791        throws IOException {
792      switch (rand.nextInt(5)) {
793        case 0: throw new ServerNotRunningYetException("wait on server startup");
794        case 1: throw new SocketTimeoutException("simulate socket timeout");
795        case 2: throw new RemoteException("java.io.IOException", "unexpected exception");
796        default:
797          // fall out
798      }
799      return super.sendRequest(server, req);
800    }
801
802    @Override
803    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
804        throws IOException {
805      switch (rand.nextInt(6)) {
806        case 0:
807          LOG.info("Return OPENED response");
808          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
809          return OpenRegionResponse.RegionOpeningState.OPENED;
810        case 1:
811          LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
812          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
813          return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
814        case 2:
815          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
816          sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
817          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
818        default:
819          // fall out
820      }
821      // The procedure on master will just hang forever because nothing comes back
822      // from the RS in this case.
823      LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds...");
824      executor.schedule(new Runnable() {
825        @Override
826        public void run() {
827          LOG.info("Delayed CRASHING of " + server);
828          doCrash(server);
829        }
830      }, 5, TimeUnit.SECONDS);
831      return null;
832    }
833
834    @Override
835    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
836        throws IOException {
837      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
838      boolean closed = rand.nextBoolean();
839      if (closed) {
840        RegionInfo hri = am.getRegionInfo(regionName);
841        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
842      }
843      resp.setClosed(closed);
844      return resp.build();
845    }
846  }
847
848  protected class CallQueueTooBigOnceRsExecutor extends GoodRsExecutor {
849
850    private boolean invoked = false;
851
852    private ServerName lastServer;
853
854    @Override
855    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
856        throws IOException {
857      if (!invoked) {
858        lastServer = server;
859        invoked = true;
860        throw new CallQueueTooBigException("simulate queue full");
861      }
862      // better select another server since the server is over loaded, but anyway, it is fine to
863      // still select the same server since it is not dead yet...
864      if (lastServer.equals(server)) {
865        LOG.warn("We still select the same server, which is not good.");
866      }
867      return super.sendRequest(server, req);
868    }
869  }
870
871  protected class TimeoutThenCallQueueTooBigRsExecutor extends GoodRsExecutor {
872
873    private final int queueFullTimes;
874
875    private int retries;
876
877    private ServerName lastServer;
878
879    public TimeoutThenCallQueueTooBigRsExecutor(int queueFullTimes) {
880      this.queueFullTimes = queueFullTimes;
881    }
882
883    @Override
884    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
885        throws IOException {
886      retries++;
887      if (retries == 1) {
888        lastServer = server;
889        throw new CallTimeoutException("simulate call timeout");
890      }
891      // should always retry on the same server
892      assertEquals(lastServer, server);
893      if (retries < queueFullTimes) {
894        throw new CallQueueTooBigException("simulate queue full");
895      }
896      return super.sendRequest(server, req);
897    }
898  }
899
900  private interface MockRSExecutor {
901    ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
902        throws IOException;
903  }
904
905  private class MockRSProcedureDispatcher extends RSProcedureDispatcher {
906    private MockRSExecutor mockRsExec;
907
908    public MockRSProcedureDispatcher(final MasterServices master) {
909      super(master);
910    }
911
912    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
913      this.mockRsExec = mockRsExec;
914    }
915
916    @Override
917    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
918      submitTask(new MockRemoteCall(serverName, remoteProcedures));
919    }
920
921    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
922      public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
923        super(serverName, operations);
924      }
925
926      @Override
927      public void dispatchOpenRequests(MasterProcedureEnv env,
928          List<RegionOpenOperation> operations) {
929        request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
930      }
931
932      @Override
933      public void dispatchCloseRequests(MasterProcedureEnv env,
934          List<RegionCloseOperation> operations) {
935        for (RegionCloseOperation op : operations) {
936          request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
937        }
938      }
939
940      @Override
941      protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
942          final ExecuteProceduresRequest request) throws IOException {
943        return mockRsExec.sendRequest(serverName, request);
944      }
945    }
946  }
947
948  private void collectAssignmentManagerMetrics() {
949    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
950    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
951    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
952    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
953  }
954}