001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.ConcurrentSkipListSet;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
024import org.apache.hadoop.hbase.testclassification.MasterTests;
025import org.apache.hadoop.hbase.testclassification.SmallTests;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.util.Threads;
028import org.junit.jupiter.api.AfterEach;
029import org.junit.jupiter.api.BeforeEach;
030import org.junit.jupiter.api.Tag;
031import org.junit.jupiter.api.Test;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035@Tag(MasterTests.TAG)
036@Tag(SmallTests.TAG)
037public class TestProcedureSchedulerConcurrency {
038
039  private static final Logger LOG =
040    LoggerFactory.getLogger(TestProcedureSchedulerConcurrency.class);
041
042  private SimpleProcedureScheduler procSched;
043
044  @BeforeEach
045  public void setUp() throws IOException {
046    procSched = new SimpleProcedureScheduler();
047    procSched.start();
048  }
049
050  @AfterEach
051  public void tearDown() throws IOException {
052    procSched.stop();
053  }
054
055  @Test
056  public void testConcurrentWaitWake() throws Exception {
057    testConcurrentWaitWake(false);
058  }
059
060  @Test
061  public void testConcurrentWaitWakeBatch() throws Exception {
062    testConcurrentWaitWake(true);
063  }
064
065  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
066    final int WAIT_THRESHOLD = 2500;
067    final int NPROCS = 20;
068    final int NRUNS = 500;
069
070    final ProcedureScheduler sched = procSched;
071    for (long i = 0; i < NPROCS; ++i) {
072      sched.addBack(new TestProcedureWithEvent(i));
073    }
074
075    final Thread[] threads = new Thread[4];
076    final AtomicInteger waitCount = new AtomicInteger(0);
077    final AtomicInteger wakeCount = new AtomicInteger(0);
078
079    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>();
080    threads[0] = new Thread() {
081      @Override
082      public void run() {
083        long lastUpdate = 0;
084        while (true) {
085          final int oldWakeCount = wakeCount.get();
086          if (useWakeBatch) {
087            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
088            for (int i = 0; i < ev.length; ++i) {
089              ev[i] = waitQueue.pollFirst().getEvent();
090              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
091            }
092            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
093            wakeCount.addAndGet(ev.length);
094          } else {
095            int size = waitQueue.size();
096            while (size-- > 0) {
097              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
098              ev.wake(procSched);
099              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
100              wakeCount.incrementAndGet();
101            }
102          }
103          if (wakeCount.get() != oldWakeCount) {
104            lastUpdate = EnvironmentEdgeManager.currentTime();
105          } else if (
106            wakeCount.get() >= NRUNS
107              && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD
108          ) {
109            break;
110          }
111          Threads.sleepWithoutInterrupt(25);
112        }
113      }
114    };
115
116    for (int i = 1; i < threads.length; ++i) {
117      threads[i] = new Thread() {
118        @Override
119        public void run() {
120          while (true) {
121            TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll();
122            if (proc == null) {
123              continue;
124            }
125
126            proc.getEvent().suspend();
127            waitQueue.add(proc);
128            proc.getEvent().suspendIfNotReady(proc);
129            LOG.debug("WAIT " + proc.getEvent());
130            if (waitCount.incrementAndGet() >= NRUNS) {
131              break;
132            }
133          }
134        }
135      };
136    }
137
138    for (int i = 0; i < threads.length; ++i) {
139      threads[i].start();
140    }
141    for (int i = 0; i < threads.length; ++i) {
142      threads[i].join();
143    }
144
145    sched.clear();
146  }
147
148  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
149    private final ProcedureEvent event;
150
151    public TestProcedureWithEvent(long procId) {
152      setProcId(procId);
153      event = new ProcedureEvent("test-event procId=" + procId);
154    }
155
156    public ProcedureEvent getEvent() {
157      return event;
158    }
159  }
160}