001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.util.concurrent.Semaphore;
025import java.util.concurrent.TimeUnit;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
029import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
030import org.apache.hadoop.hbase.testclassification.MasterTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.apache.hadoop.hbase.util.Threads;
033import org.junit.jupiter.api.AfterEach;
034import org.junit.jupiter.api.BeforeEach;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.Test;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@Tag(MasterTests.TAG)
041@Tag(SmallTests.TAG)
042public class TestProcedureExecutor {
043
044  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecutor.class);
045
046  private TestProcEnv procEnv;
047  private NoopProcedureStore procStore;
048  private ProcedureExecutor<TestProcEnv> procExecutor;
049
050  private HBaseCommonTestingUtil htu;
051
052  @BeforeEach
053  public void setUp() throws Exception {
054    htu = new HBaseCommonTestingUtil();
055
056    // NOTE: The executor will be created by each test
057    procEnv = new TestProcEnv();
058    procStore = new NoopProcedureStore();
059    procStore.start(1);
060  }
061
062  @AfterEach
063  public void tearDown() throws Exception {
064    procExecutor.stop();
065    procStore.stop(false);
066    procExecutor.join();
067  }
068
069  private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
070    procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
071    ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
072  }
073
074  @Test
075  public void testWorkerStuck() throws Exception {
076    // replace the executor
077    final Configuration conf = new Configuration(htu.getConfiguration());
078    conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f);
079    conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500);
080    conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750);
081
082    final int NUM_THREADS = 2;
083    createNewExecutor(conf, NUM_THREADS);
084
085    Semaphore latch1 = new Semaphore(2);
086    latch1.acquire(2);
087    BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1);
088
089    Semaphore latch2 = new Semaphore(2);
090    latch2.acquire(2);
091    BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2);
092
093    long busyProcId1 = procExecutor.submitProcedure(busyProc1);
094    long busyProcId2 = procExecutor.submitProcedure(busyProc2);
095    long otherProcId = procExecutor.submitProcedure(new NoopProcedure());
096
097    // wait until a new worker is being created
098    int threads1 = waitThreadCount(NUM_THREADS + 1);
099    LOG.info("new threads got created: " + (threads1 - NUM_THREADS));
100    assertEquals(NUM_THREADS + 1, threads1);
101
102    ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId);
103    assertEquals(true, procExecutor.isFinished(otherProcId));
104    ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId);
105
106    assertTrue(procExecutor.isRunning());
107    assertFalse(procExecutor.isFinished(busyProcId1));
108    assertFalse(procExecutor.isFinished(busyProcId2));
109
110    // terminate the busy procedures
111    latch1.release();
112    latch2.release();
113
114    LOG.info("set keep alive and wait threads being removed");
115    procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS);
116    int threads2 = waitThreadCount(NUM_THREADS);
117    LOG.info("threads got removed: {}", threads1 - threads2);
118    assertEquals(NUM_THREADS, threads2);
119
120    // terminate the busy procedures
121    latch1.release();
122    latch2.release();
123
124    // wait for all procs to complete
125    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1);
126    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2);
127    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1);
128    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
129  }
130
131  @Test
132  public void testSubmitBatch() throws Exception {
133    Procedure[] procs = new Procedure[5];
134    for (int i = 0; i < procs.length; ++i) {
135      procs[i] = new NoopProcedure<TestProcEnv>();
136    }
137
138    // submit procedures
139    createNewExecutor(htu.getConfiguration(), 3);
140    procExecutor.submitProcedures(procs);
141
142    // wait for procs to be completed
143    for (int i = 0; i < procs.length; ++i) {
144      final long procId = procs[i].getProcId();
145      ProcedureTestingUtility.waitProcedure(procExecutor, procId);
146      ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
147    }
148  }
149
150  private int waitThreadCount(final int expectedThreads) {
151    while (procExecutor.isRunning()) {
152      if (procExecutor.getWorkerThreadCount() == expectedThreads) {
153        break;
154      }
155      LOG.debug("waiting for thread count=" + expectedThreads + " current="
156        + procExecutor.getWorkerThreadCount());
157      Threads.sleepWithoutInterrupt(250);
158    }
159    return procExecutor.getWorkerThreadCount();
160  }
161
162  public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> {
163    private final Semaphore latch;
164
165    public BusyWaitProcedure(final Semaphore latch) {
166      this.latch = latch;
167    }
168
169    @Override
170    protected Procedure[] execute(final TestProcEnv env) {
171      try {
172        LOG.info("worker started " + this);
173        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
174          throw new Exception("waited too long");
175        }
176
177        LOG.info("worker step 2 " + this);
178        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
179          throw new Exception("waited too long");
180        }
181      } catch (Exception e) {
182        LOG.error("got unexpected exception", e);
183        setFailure("BusyWaitProcedure", e);
184      }
185      return null;
186    }
187  }
188
189  private static class TestProcEnv {
190  }
191}