001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import static org.junit.Assert.assertEquals;
021
022import java.util.concurrent.Semaphore;
023import java.util.concurrent.TimeUnit;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
027import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
028import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
029import org.apache.hadoop.hbase.testclassification.MasterTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.apache.hadoop.hbase.util.Threads;
032import org.junit.After;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@Category({ MasterTests.class, SmallTests.class })
041public class TestProcedureExecutor {
042
043  @ClassRule
044  public static final HBaseClassTestRule CLASS_RULE =
045    HBaseClassTestRule.forClass(TestProcedureExecutor.class);
046
047  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecutor.class);
048
049  private TestProcEnv procEnv;
050  private NoopProcedureStore procStore;
051  private ProcedureExecutor<TestProcEnv> procExecutor;
052
053  private HBaseCommonTestingUtility htu;
054
055  @Before
056  public void setUp() throws Exception {
057    htu = new HBaseCommonTestingUtility();
058
059    // NOTE: The executor will be created by each test
060    procEnv = new TestProcEnv();
061    procStore = new NoopProcedureStore();
062    procStore.start(1);
063  }
064
065  @After
066  public void tearDown() throws Exception {
067    procExecutor.stop();
068    procStore.stop(false);
069    procExecutor.join();
070  }
071
072  private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception {
073    procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore);
074    ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true);
075  }
076
077  @Test
078  public void testWorkerStuck() throws Exception {
079    // replace the executor
080    final Configuration conf = new Configuration(htu.getConfiguration());
081    conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f);
082    conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500);
083    conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750);
084
085    final int NUM_THREADS = 2;
086    createNewExecutor(conf, NUM_THREADS);
087
088    Semaphore latch1 = new Semaphore(2);
089    latch1.acquire(2);
090    BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1);
091
092    Semaphore latch2 = new Semaphore(2);
093    latch2.acquire(2);
094    BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2);
095
096    long busyProcId1 = procExecutor.submitProcedure(busyProc1);
097    long busyProcId2 = procExecutor.submitProcedure(busyProc2);
098    long otherProcId = procExecutor.submitProcedure(new NoopProcedure());
099
100    // wait until a new worker is being created
101    int threads1 = waitThreadCount(NUM_THREADS + 1);
102    LOG.info("new threads got created: " + (threads1 - NUM_THREADS));
103    assertEquals(NUM_THREADS + 1, threads1);
104
105    ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId);
106    assertEquals(true, procExecutor.isFinished(otherProcId));
107    ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId);
108
109    assertEquals(true, procExecutor.isRunning());
110    assertEquals(false, procExecutor.isFinished(busyProcId1));
111    assertEquals(false, procExecutor.isFinished(busyProcId2));
112
113    // terminate the busy procedures
114    latch1.release();
115    latch2.release();
116
117    LOG.info("set keep alive and wait threads being removed");
118    procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS);
119    int threads2 = waitThreadCount(NUM_THREADS);
120    LOG.info("threads got removed: " + (threads1 - threads2));
121    assertEquals(NUM_THREADS, threads2);
122
123    // terminate the busy procedures
124    latch1.release();
125    latch2.release();
126
127    // wait for all procs to complete
128    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1);
129    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2);
130    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1);
131    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
132  }
133
134  @Test
135  public void testSubmitBatch() throws Exception {
136    Procedure[] procs = new Procedure[5];
137    for (int i = 0; i < procs.length; ++i) {
138      procs[i] = new NoopProcedure<TestProcEnv>();
139    }
140
141    // submit procedures
142    createNewExecutor(htu.getConfiguration(), 3);
143    procExecutor.submitProcedures(procs);
144
145    // wait for procs to be completed
146    for (int i = 0; i < procs.length; ++i) {
147      final long procId = procs[i].getProcId();
148      ProcedureTestingUtility.waitProcedure(procExecutor, procId);
149      ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId);
150    }
151  }
152
153  private int waitThreadCount(final int expectedThreads) {
154    while (procExecutor.isRunning()) {
155      if (procExecutor.getWorkerThreadCount() == expectedThreads) {
156        break;
157      }
158      LOG.debug("waiting for thread count=" + expectedThreads + " current="
159        + procExecutor.getWorkerThreadCount());
160      Threads.sleepWithoutInterrupt(250);
161    }
162    return procExecutor.getWorkerThreadCount();
163  }
164
165  public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> {
166    private final Semaphore latch;
167
168    public BusyWaitProcedure(final Semaphore latch) {
169      this.latch = latch;
170    }
171
172    @Override
173    protected Procedure[] execute(final TestProcEnv env) {
174      try {
175        LOG.info("worker started " + this);
176        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
177          throw new Exception("waited too long");
178        }
179
180        LOG.info("worker step 2 " + this);
181        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
182          throw new Exception("waited too long");
183        }
184      } catch (Exception e) {
185        LOG.error("got unexpected exception", e);
186        setFailure("BusyWaitProcedure", e);
187      }
188      return null;
189    }
190  }
191
192  private static class TestProcEnv {
193  }
194}