001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.ConcurrentSkipListSet;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
024import org.apache.hadoop.hbase.testclassification.MasterTests;
025import org.apache.hadoop.hbase.testclassification.SmallTests;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.Threads;
028import org.junit.jupiter.api.AfterEach;
029import org.junit.jupiter.api.BeforeEach;
030import org.junit.jupiter.api.Tag;
031import org.junit.jupiter.api.Test;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@Tag(MasterTests.TAG)
036@Tag(SmallTests.TAG)
037public class TestProcedureSchedulerConcurrency {
038
039  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
040
041  private SimpleProcedureScheduler procSched;
042
043  @BeforeEach
044  public void setUp() throws IOException {
045    procSched = new SimpleProcedureScheduler();
046    procSched.start();
047  }
048
049  @AfterEach
050  public void tearDown() throws IOException {
051    procSched.stop();
052  }
053
054  @Test
055  public void testConcurrentWaitWake() throws Exception {
056    testConcurrentWaitWake(false);
057  }
058
059  @Test
060  public void testConcurrentWaitWakeBatch() throws Exception {
061    testConcurrentWaitWake(true);
062  }
063
064  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
065    final int WAIT_THRESHOLD = 2500;
066    final int NPROCS = 20;
067    final int NRUNS = 500;
068
069    final ProcedureScheduler sched = procSched;
070    for (long i = 0; i < NPROCS; ++i) {
071      sched.addBack(new TestProcedureWithEvent(i));
072    }
073
074    final Thread[] threads = new Thread[4];
075    final AtomicInteger waitCount = new AtomicInteger(0);
076    final AtomicInteger wakeCount = new AtomicInteger(0);
077
078    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>();
079    threads[0] = new Thread() {
080      @Override
081      public void run() {
082        long lastUpdate = 0;
083        while (true) {
084          final int oldWakeCount = wakeCount.get();
085          if (useWakeBatch) {
086            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
087            for (int i = 0; i < ev.length; ++i) {
088              ev[i] = waitQueue.pollFirst().getEvent();
089              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
090            }
091            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
092            wakeCount.addAndGet(ev.length);
093          } else {
094            int size = waitQueue.size();
095            while (size-- > 0) {
096              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
097              ev.wake(procSched);
098              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
099              wakeCount.incrementAndGet();
100            }
101          }
102          if (wakeCount.get() != oldWakeCount) {
103            lastUpdate = EnvironmentEdgeManager.currentTime();
104          } else if (
105            wakeCount.get() >= NRUNS
106              && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD
107          ) {
108            break;
109          }
110          Threads.sleepWithoutInterrupt(25);
111        }
112      }
113    };
114
115    for (int i = 1; i < threads.length; ++i) {
116      threads[i] = new Thread() {
117        @Override
118        public void run() {
119          while (true) {
120            TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll();
121            if (proc == null) {
122              continue;
123            }
124
125            proc.getEvent().suspend();
126            waitQueue.add(proc);
127            proc.getEvent().suspendIfNotReady(proc);
128            LOG.debug("WAIT " + proc.getEvent());
129            if (waitCount.incrementAndGet() >= NRUNS) {
130              break;
131            }
132          }
133        }
134      };
135    }
136
137    for (int i = 0; i < threads.length; ++i) {
138      threads[i].start();
139    }
140    for (int i = 0; i < threads.length; ++i) {
141      threads[i].join();
142    }
143
144    sched.clear();
145  }
146
147  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
148    private final ProcedureEvent event;
149
150    public TestProcedureWithEvent(long procId) {
151      setProcId(procId);
152      event = new ProcedureEvent("test-event procId=" + procId);
153    }
154
155    public ProcedureEvent getEvent() {
156      return event;
157    }
158  }
159}