001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.util.concurrent.Semaphore; 025import java.util.concurrent.TimeUnit; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseCommonTestingUtil; 028import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 029import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; 030import org.apache.hadoop.hbase.testclassification.MasterTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.apache.hadoop.hbase.util.Threads; 033import org.junit.jupiter.api.AfterEach; 034import org.junit.jupiter.api.BeforeEach; 035import org.junit.jupiter.api.Tag; 036import org.junit.jupiter.api.Test; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040@Tag(MasterTests.TAG) 041@Tag(SmallTests.TAG) 042public class TestProcedureExecutor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureExecutor.class); 045 046 private TestProcEnv procEnv; 047 private NoopProcedureStore procStore; 048 private ProcedureExecutor<TestProcEnv> procExecutor; 049 050 private HBaseCommonTestingUtil htu; 051 052 @BeforeEach 053 public void setUp() throws Exception { 054 htu = new HBaseCommonTestingUtil(); 055 056 // NOTE: The executor will be created by each test 057 procEnv = new TestProcEnv(); 058 procStore = new NoopProcedureStore(); 059 procStore.start(1); 060 } 061 062 @AfterEach 063 public void tearDown() throws Exception { 064 procExecutor.stop(); 065 procStore.stop(false); 066 procExecutor.join(); 067 } 068 069 private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { 070 procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore); 071 ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true); 072 } 073 074 @Test 075 public void testWorkerStuck() throws Exception { 076 // replace the executor 077 final Configuration conf = new Configuration(htu.getConfiguration()); 078 conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f); 079 conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500); 080 conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750); 081 082 final int NUM_THREADS = 2; 083 createNewExecutor(conf, NUM_THREADS); 084 085 Semaphore latch1 = new Semaphore(2); 086 latch1.acquire(2); 087 BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1); 088 089 Semaphore latch2 = new Semaphore(2); 090 latch2.acquire(2); 091 BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2); 092 093 long busyProcId1 = procExecutor.submitProcedure(busyProc1); 094 long busyProcId2 = procExecutor.submitProcedure(busyProc2); 095 long otherProcId = procExecutor.submitProcedure(new NoopProcedure()); 096 097 // wait until a new worker is being created 098 int threads1 = waitThreadCount(NUM_THREADS + 1); 099 LOG.info("new threads got created: " + (threads1 - NUM_THREADS)); 100 assertEquals(NUM_THREADS + 1, threads1); 101 102 ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId); 103 assertEquals(true, procExecutor.isFinished(otherProcId)); 104 ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId); 105 106 assertTrue(procExecutor.isRunning()); 107 assertFalse(procExecutor.isFinished(busyProcId1)); 108 assertFalse(procExecutor.isFinished(busyProcId2)); 109 110 // terminate the busy procedures 111 latch1.release(); 112 latch2.release(); 113 114 LOG.info("set keep alive and wait threads being removed"); 115 procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS); 116 int threads2 = waitThreadCount(NUM_THREADS); 117 LOG.info("threads got removed: {}", threads1 - threads2); 118 assertEquals(NUM_THREADS, threads2); 119 120 // terminate the busy procedures 121 latch1.release(); 122 latch2.release(); 123 124 // wait for all procs to complete 125 ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1); 126 ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2); 127 ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1); 128 ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2); 129 } 130 131 @Test 132 public void testSubmitBatch() throws Exception { 133 Procedure[] procs = new Procedure[5]; 134 for (int i = 0; i < procs.length; ++i) { 135 procs[i] = new NoopProcedure<TestProcEnv>(); 136 } 137 138 // submit procedures 139 createNewExecutor(htu.getConfiguration(), 3); 140 procExecutor.submitProcedures(procs); 141 142 // wait for procs to be completed 143 for (int i = 0; i < procs.length; ++i) { 144 final long procId = procs[i].getProcId(); 145 ProcedureTestingUtility.waitProcedure(procExecutor, procId); 146 ProcedureTestingUtility.assertProcNotFailed(procExecutor, procId); 147 } 148 } 149 150 private int waitThreadCount(final int expectedThreads) { 151 while (procExecutor.isRunning()) { 152 if (procExecutor.getWorkerThreadCount() == expectedThreads) { 153 break; 154 } 155 LOG.debug("waiting for thread count=" + expectedThreads + " current=" 156 + procExecutor.getWorkerThreadCount()); 157 Threads.sleepWithoutInterrupt(250); 158 } 159 return procExecutor.getWorkerThreadCount(); 160 } 161 162 public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> { 163 private final Semaphore latch; 164 165 public BusyWaitProcedure(final Semaphore latch) { 166 this.latch = latch; 167 } 168 169 @Override 170 protected Procedure[] execute(final TestProcEnv env) { 171 try { 172 LOG.info("worker started " + this); 173 if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { 174 throw new Exception("waited too long"); 175 } 176 177 LOG.info("worker step 2 " + this); 178 if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { 179 throw new Exception("waited too long"); 180 } 181 } catch (Exception e) { 182 LOG.error("got unexpected exception", e); 183 setFailure("BusyWaitProcedure", e); 184 } 185 return null; 186 } 187 } 188 189 private static class TestProcEnv { 190 } 191}