001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.assignment;
019
020import static org.junit.Assert.assertNotEquals;
021
022import java.io.IOException;
023import java.io.InterruptedIOException;
024import java.net.SocketTimeoutException;
025import java.util.List;
026import java.util.NavigableMap;
027import java.util.Random;
028import java.util.Set;
029import java.util.SortedSet;
030import java.util.concurrent.ConcurrentSkipListMap;
031import java.util.concurrent.ConcurrentSkipListSet;
032import java.util.concurrent.ExecutionException;
033import java.util.concurrent.Executors;
034import java.util.concurrent.Future;
035import java.util.concurrent.ScheduledExecutorService;
036import java.util.concurrent.TimeUnit;
037import org.apache.hadoop.conf.Configuration;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.NotServingRegionException;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.RegionInfo;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
045import org.apache.hadoop.hbase.master.MasterServices;
046import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
047import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
048import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
049import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher;
050import org.apache.hadoop.hbase.procedure2.Procedure;
051import org.apache.hadoop.hbase.procedure2.ProcedureMetrics;
052import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
053import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
054import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.util.FSUtils;
057import org.apache.hadoop.ipc.RemoteException;
058import org.junit.After;
059import org.junit.Before;
060import org.junit.Rule;
061import org.junit.rules.ExpectedException;
062import org.junit.rules.TestName;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
078
079/**
080 * Base class for AM test
081 */
082public class TestAssignmentManagerBase {
083  private static final Logger LOG = LoggerFactory.getLogger(TestAssignmentManager.class);
084
085  @Rule public TestName name = new TestName();
086  @Rule public final ExpectedException exception = ExpectedException.none();
087
088  private static final int PROC_NTHREADS = 64;
089  protected static final int NREGIONS = 1 * 1000;
090  protected static final int NSERVERS = Math.max(1, NREGIONS / 100);
091
092  protected HBaseTestingUtility UTIL;
093  protected MockRSProcedureDispatcher rsDispatcher;
094  protected MockMasterServices master;
095  protected AssignmentManager am;
096  protected NavigableMap<ServerName, SortedSet<byte []>> regionsToRegionServers =
097      new ConcurrentSkipListMap<>();
098  // Simple executor to run some simple tasks.
099  protected ScheduledExecutorService executor;
100
101  protected ProcedureMetrics assignProcMetrics;
102  protected ProcedureMetrics unassignProcMetrics;
103
104  protected long assignSubmittedCount = 0;
105  protected long assignFailedCount = 0;
106  protected long unassignSubmittedCount = 0;
107  protected long unassignFailedCount = 0;
108
109  protected void setupConfiguration(Configuration conf) throws Exception {
110    FSUtils.setRootDir(conf, UTIL.getDataTestDir());
111    conf.setBoolean(WALProcedureStore.USE_HSYNC_CONF_KEY, false);
112    conf.setInt(WALProcedureStore.SYNC_WAIT_MSEC_CONF_KEY, 10);
113    conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, PROC_NTHREADS);
114    conf.setInt(RSProcedureDispatcher.RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, 1000);
115    conf.setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 100); // Have many so we succeed eventually.
116  }
117
118  @Before
119  public void setUp() throws Exception {
120    UTIL = new HBaseTestingUtility();
121    this.executor = Executors.newSingleThreadScheduledExecutor();
122    setupConfiguration(UTIL.getConfiguration());
123    master = new MockMasterServices(UTIL.getConfiguration(), this.regionsToRegionServers);
124    rsDispatcher = new MockRSProcedureDispatcher(master);
125    master.start(NSERVERS, rsDispatcher);
126    am = master.getAssignmentManager();
127    assignProcMetrics = am.getAssignmentManagerMetrics().getAssignProcMetrics();
128    unassignProcMetrics = am.getAssignmentManagerMetrics().getUnassignProcMetrics();
129    setUpMeta();
130  }
131
132  private void setUpMeta() throws Exception {
133    rsDispatcher.setMockRsExecutor(new GoodRsExecutor());
134    am.assign(RegionInfoBuilder.FIRST_META_REGIONINFO);
135    am.wakeMetaLoadedEvent();
136  }
137
138  @After
139  public void tearDown() throws Exception {
140    master.stop("tearDown");
141    this.executor.shutdownNow();
142  }
143
144  protected Future<byte[]> submitProcedure(final Procedure<MasterProcedureEnv> proc) {
145    return ProcedureSyncWait.submitProcedure(master.getMasterProcedureExecutor(), proc);
146  }
147
148  protected byte[] waitOnFuture(final Future<byte[]> future) throws Exception {
149    try {
150      return future.get(5, TimeUnit.SECONDS);
151    } catch (ExecutionException e) {
152      LOG.info("ExecutionException", e);
153      Exception ee = (Exception)e.getCause();
154      if (ee instanceof InterruptedIOException) {
155        for (Procedure<?> p: this.master.getMasterProcedureExecutor().getProcedures()) {
156          LOG.info(p.toStringDetails());
157        }
158      }
159      throw (Exception)e.getCause();
160    }
161  }
162
163  // ============================================================================================
164  //  Helpers
165  // ============================================================================================
166  protected void bulkSubmit(final AssignProcedure[] procs) throws Exception {
167    final Thread[] threads = new Thread[PROC_NTHREADS];
168    for (int i = 0; i < threads.length; ++i) {
169      final int threadId = i;
170      threads[i] = new Thread() {
171        @Override
172        public void run() {
173          TableName tableName = TableName.valueOf("table-" + threadId);
174          int n = (procs.length / threads.length);
175          int start = threadId * n;
176          int stop = start + n;
177          for (int j = start; j < stop; ++j) {
178            procs[j] = createAndSubmitAssign(tableName, j);
179          }
180        }
181      };
182      threads[i].start();
183    }
184    for (int i = 0; i < threads.length; ++i) {
185      threads[i].join();
186    }
187    for (int i = procs.length - 1; i >= 0 && procs[i] == null; --i) {
188      procs[i] = createAndSubmitAssign(TableName.valueOf("table-sync"), i);
189    }
190  }
191
192  private AssignProcedure createAndSubmitAssign(TableName tableName, int regionId) {
193    RegionInfo hri = createRegionInfo(tableName, regionId);
194    AssignProcedure proc = am.createAssignProcedure(hri);
195    master.getMasterProcedureExecutor().submitProcedure(proc);
196    return proc;
197  }
198
199  protected RegionInfo createRegionInfo(final TableName tableName, final long regionId) {
200    return RegionInfoBuilder.newBuilder(tableName)
201        .setStartKey(Bytes.toBytes(regionId))
202        .setEndKey(Bytes.toBytes(regionId + 1))
203        .setSplit(false)
204        .setRegionId(0)
205        .build();
206  }
207
208  private void sendTransitionReport(final ServerName serverName,
209      final org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo regionInfo,
210      final TransitionCode state) throws IOException {
211    ReportRegionStateTransitionRequest.Builder req =
212        ReportRegionStateTransitionRequest.newBuilder();
213    req.setServer(ProtobufUtil.toServerName(serverName));
214    req.addTransition(RegionStateTransition.newBuilder()
215        .addRegionInfo(regionInfo)
216        .setTransitionCode(state)
217        .setOpenSeqNum(1)
218        .build());
219    am.reportRegionStateTransition(req.build());
220  }
221
222  private void doCrash(final ServerName serverName) {
223    this.am.submitServerCrash(serverName, false/*No WALs here*/);
224  }
225
226  private void doRestart(final ServerName serverName) {
227    try {
228      this.master.restartRegionServer(serverName);
229    } catch (IOException e) {
230      LOG.warn("Can not restart RS with new startcode");
231    }
232  }
233
234  protected class NoopRsExecutor implements MockRSExecutor {
235    @Override
236    public ExecuteProceduresResponse sendRequest(ServerName server,
237        ExecuteProceduresRequest request) throws IOException {
238      ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder();
239      if (request.getOpenRegionCount() > 0) {
240        for (OpenRegionRequest req: request.getOpenRegionList()) {
241          OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder();
242          for (RegionOpenInfo openReq: req.getOpenInfoList()) {
243            RegionOpeningState state = execOpenRegion(server, openReq);
244            if (state != null) {
245              resp.addOpeningState(state);
246            }
247          }
248          builder.addOpenRegion(resp.build());
249        }
250      }
251      if (request.getCloseRegionCount() > 0) {
252        for (CloseRegionRequest req: request.getCloseRegionList()) {
253          CloseRegionResponse resp = execCloseRegion(server,
254              req.getRegion().getValue().toByteArray());
255          if (resp != null) {
256            builder.addCloseRegion(resp);
257          }
258        }
259      }
260      return ExecuteProceduresResponse.newBuilder().build();
261    }
262
263    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo regionInfo)
264        throws IOException {
265      return null;
266    }
267
268    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
269        throws IOException {
270      return null;
271    }
272  }
273
274  protected class GoodRsExecutor extends NoopRsExecutor {
275    @Override
276    protected RegionOpeningState execOpenRegion(ServerName server, RegionOpenInfo openReq)
277        throws IOException {
278      sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
279      // Concurrency?
280      // Now update the state of our cluster in regionsToRegionServers.
281      SortedSet<byte []> regions = regionsToRegionServers.get(server);
282      if (regions == null) {
283        regions = new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
284        regionsToRegionServers.put(server, regions);
285      }
286      RegionInfo hri = ProtobufUtil.toRegionInfo(openReq.getRegion());
287      if (regions.contains(hri.getRegionName())) {
288        throw new UnsupportedOperationException(hri.getRegionNameAsString());
289      }
290      regions.add(hri.getRegionName());
291      return RegionOpeningState.OPENED;
292    }
293
294    @Override
295    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
296        throws IOException {
297      RegionInfo hri = am.getRegionInfo(regionName);
298      sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
299      return CloseRegionResponse.newBuilder().setClosed(true).build();
300    }
301  }
302
303  protected static class ServerNotYetRunningRsExecutor implements MockRSExecutor {
304    @Override
305    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
306        throws IOException {
307      throw new ServerNotRunningYetException("wait on server startup");
308    }
309  }
310
311  protected static class FaultyRsExecutor implements MockRSExecutor {
312    private final IOException exception;
313
314    public FaultyRsExecutor(final IOException exception) {
315      this.exception = exception;
316    }
317
318    @Override
319    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
320        throws IOException {
321      throw exception;
322    }
323  }
324
325  protected class SocketTimeoutRsExecutor extends GoodRsExecutor {
326    private final int maxSocketTimeoutRetries;
327    private final int maxServerRetries;
328
329    private ServerName lastServer;
330    private int sockTimeoutRetries;
331    private int serverRetries;
332
333    public SocketTimeoutRsExecutor(int maxSocketTimeoutRetries, int maxServerRetries) {
334      this.maxServerRetries = maxServerRetries;
335      this.maxSocketTimeoutRetries = maxSocketTimeoutRetries;
336    }
337
338    @Override
339    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
340        throws IOException {
341      // SocketTimeoutException should be a temporary problem
342      // unless the server will be declared dead.
343      if (sockTimeoutRetries++ < maxSocketTimeoutRetries) {
344        if (sockTimeoutRetries == 1) assertNotEquals(lastServer, server);
345        lastServer = server;
346        LOG.debug("Socket timeout for server=" + server + " retries=" + sockTimeoutRetries);
347        throw new SocketTimeoutException("simulate socket timeout");
348      } else if (serverRetries++ < maxServerRetries) {
349        LOG.info("Mark server=" + server + " as dead. serverRetries=" + serverRetries);
350        master.getServerManager().moveFromOnlineToDeadServers(server);
351        sockTimeoutRetries = 0;
352        throw new SocketTimeoutException("simulate socket timeout");
353      } else {
354        return super.sendRequest(server, req);
355      }
356    }
357  }
358
359  /**
360   * Takes open request and then returns nothing so acts like a RS that went zombie.
361   * No response (so proc is stuck/suspended on the Master and won't wake up.). We
362   * then send in a crash for this server after a few seconds; crash is supposed to
363   * take care of the suspended procedures.
364   */
365  protected class HangThenRSCrashExecutor extends GoodRsExecutor {
366    private int invocations;
367
368    @Override
369    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
370        throws IOException {
371      if (this.invocations++ > 0) {
372        // Return w/o problem the second time through here.
373        return super.execOpenRegion(server, openReq);
374      }
375      // The procedure on master will just hang forever because nothing comes back
376      // from the RS in this case.
377      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
378      executor.schedule(new Runnable() {
379        @Override
380        public void run() {
381          LOG.info("Sending in CRASH of " + server);
382          doCrash(server);
383        }
384      }, 1, TimeUnit.SECONDS);
385      return null;
386    }
387  }
388
389  /**
390   * Takes open request and then returns nothing so acts like a RS that went zombie.
391   * No response (so proc is stuck/suspended on the Master and won't wake up.).
392   * Different with HangThenRSCrashExecutor,  HangThenRSCrashExecutor will create
393   * ServerCrashProcedure to handle the server crash. However, this HangThenRSRestartExecutor
394   * will restart RS directly, situation for RS crashed when SCP is not enabled.
395   */
396  protected class HangThenRSRestartExecutor extends GoodRsExecutor {
397    private int invocations;
398
399    @Override
400    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
401        throws IOException {
402      if (this.invocations++ > 0) {
403        // Return w/o problem the second time through here.
404        return super.execOpenRegion(server, openReq);
405      }
406      // The procedure on master will just hang forever because nothing comes back
407      // from the RS in this case.
408      LOG.info("Return null response from serverName=" + server + "; means STUCK...TODO timeout");
409      executor.schedule(new Runnable() {
410        @Override
411        public void run() {
412          LOG.info("Restarting RS of " + server);
413          doRestart(server);
414        }
415      }, 1, TimeUnit.SECONDS);
416      return null;
417    }
418  }
419
420  protected class HangOnCloseThenRSCrashExecutor extends GoodRsExecutor {
421    public static final int TYPES_OF_FAILURE = 6;
422    private int invocations;
423
424    @Override
425    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
426        throws IOException {
427      switch (this.invocations++) {
428        case 0: throw new NotServingRegionException("Fake");
429        case 1:
430          executor.schedule(new Runnable() {
431            @Override
432            public void run() {
433              LOG.info("Sending in CRASH of " + server);
434              doCrash(server);
435            }
436          }, 1, TimeUnit.SECONDS);
437          throw new RegionServerAbortedException("Fake!");
438        case 2:
439          executor.schedule(new Runnable() {
440            @Override
441            public void run() {
442              LOG.info("Sending in CRASH of " + server);
443              doCrash(server);
444            }
445          }, 1, TimeUnit.SECONDS);
446          throw new RegionServerStoppedException("Fake!");
447        case 3: throw new ServerNotRunningYetException("Fake!");
448        case 4:
449          LOG.info("Returned null from serverName={}; means STUCK...TODO timeout", server);
450          executor.schedule(new Runnable() {
451            @Override
452            public void run() {
453              LOG.info("Sending in CRASH of " + server);
454              doCrash(server);
455            }
456          }, 1, TimeUnit.SECONDS);
457          return null;
458        default:
459          return super.execCloseRegion(server, regionName);
460      }
461    }
462  }
463
464  protected class RandRsExecutor extends NoopRsExecutor {
465    private final Random rand = new Random();
466
467    @Override
468    public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
469        throws IOException {
470      switch (rand.nextInt(5)) {
471        case 0: throw new ServerNotRunningYetException("wait on server startup");
472        case 1: throw new SocketTimeoutException("simulate socket timeout");
473        case 2: throw new RemoteException("java.io.IOException", "unexpected exception");
474      }
475      return super.sendRequest(server, req);
476    }
477
478    @Override
479    protected RegionOpeningState execOpenRegion(final ServerName server, RegionOpenInfo openReq)
480        throws IOException {
481      switch (rand.nextInt(6)) {
482        case 0:
483          LOG.info("Return OPENED response");
484          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
485          return OpenRegionResponse.RegionOpeningState.OPENED;
486        case 1:
487          LOG.info("Return transition report that OPENED/ALREADY_OPENED response");
488          sendTransitionReport(server, openReq.getRegion(), TransitionCode.OPENED);
489          return OpenRegionResponse.RegionOpeningState.ALREADY_OPENED;
490        case 2:
491          LOG.info("Return transition report that FAILED_OPEN/FAILED_OPENING response");
492          sendTransitionReport(server, openReq.getRegion(), TransitionCode.FAILED_OPEN);
493          return OpenRegionResponse.RegionOpeningState.FAILED_OPENING;
494      }
495      // The procedure on master will just hang forever because nothing comes back
496      // from the RS in this case.
497      LOG.info("Return null as response; means proc stuck so we send in a crash report after a few seconds...");
498      executor.schedule(new Runnable() {
499        @Override
500        public void run() {
501          LOG.info("Delayed CRASHING of " + server);
502          doCrash(server);
503        }
504      }, 5, TimeUnit.SECONDS);
505      return null;
506    }
507
508    @Override
509    protected CloseRegionResponse execCloseRegion(ServerName server, byte[] regionName)
510        throws IOException {
511      CloseRegionResponse.Builder resp = CloseRegionResponse.newBuilder();
512      boolean closed = rand.nextBoolean();
513      if (closed) {
514        RegionInfo hri = am.getRegionInfo(regionName);
515        sendTransitionReport(server, ProtobufUtil.toRegionInfo(hri), TransitionCode.CLOSED);
516      }
517      resp.setClosed(closed);
518      return resp.build();
519    }
520  }
521
522  protected interface MockRSExecutor {
523    ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest req)
524        throws IOException;
525  }
526
527  protected class MockRSProcedureDispatcher extends RSProcedureDispatcher {
528    private MockRSExecutor mockRsExec;
529
530    public MockRSProcedureDispatcher(final MasterServices master) {
531      super(master);
532    }
533
534    public void setMockRsExecutor(final MockRSExecutor mockRsExec) {
535      this.mockRsExec = mockRsExec;
536    }
537
538    @Override
539    protected void remoteDispatch(ServerName serverName, Set<RemoteProcedure> remoteProcedures) {
540      submitTask(new MockRemoteCall(serverName, remoteProcedures));
541    }
542
543    private class MockRemoteCall extends ExecuteProceduresRemoteCall {
544      public MockRemoteCall(final ServerName serverName, final Set<RemoteProcedure> operations) {
545        super(serverName, operations);
546      }
547
548      @Override
549      public void dispatchOpenRequests(MasterProcedureEnv env,
550          List<RegionOpenOperation> operations) {
551        request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations));
552      }
553
554      @Override
555      public void dispatchCloseRequests(MasterProcedureEnv env,
556          List<RegionCloseOperation> operations) {
557        for (RegionCloseOperation op : operations) {
558          request.addCloseRegion(op.buildCloseRegionRequest(getServerName()));
559        }
560      }
561
562      @Override
563      protected ExecuteProceduresResponse sendRequest(final ServerName serverName,
564          final ExecuteProceduresRequest request) throws IOException {
565        return mockRsExec.sendRequest(serverName, request);
566      }
567    }
568  }
569
570  protected void collectAssignmentManagerMetrics() {
571    assignSubmittedCount = assignProcMetrics.getSubmittedCounter().getCount();
572    assignFailedCount = assignProcMetrics.getFailedCounter().getCount();
573    unassignSubmittedCount = unassignProcMetrics.getSubmittedCounter().getCount();
574    unassignFailedCount = unassignProcMetrics.getFailedCounter().getCount();
575  }
576}