001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.ConcurrentSkipListSet;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
025import org.apache.hadoop.hbase.testclassification.MasterTests;
026import org.apache.hadoop.hbase.testclassification.SmallTests;
027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
028import org.apache.hadoop.hbase.util.Threads;
029import org.junit.After;
030import org.junit.Before;
031import org.junit.ClassRule;
032import org.junit.Test;
033import org.junit.experimental.categories.Category;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037@Category({ MasterTests.class, SmallTests.class })
038public class TestProcedureSchedulerConcurrency {
039  @ClassRule
040  public static final HBaseClassTestRule CLASS_RULE =
041    HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class);
042
043  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
044
045  private SimpleProcedureScheduler procSched;
046
047  @Before
048  public void setUp() throws IOException {
049    procSched = new SimpleProcedureScheduler();
050    procSched.start();
051  }
052
053  @After
054  public void tearDown() throws IOException {
055    procSched.stop();
056  }
057
058  @Test
059  public void testConcurrentWaitWake() throws Exception {
060    testConcurrentWaitWake(false);
061  }
062
063  @Test
064  public void testConcurrentWaitWakeBatch() throws Exception {
065    testConcurrentWaitWake(true);
066  }
067
068  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
069    final int WAIT_THRESHOLD = 2500;
070    final int NPROCS = 20;
071    final int NRUNS = 500;
072
073    final ProcedureScheduler sched = procSched;
074    for (long i = 0; i < NPROCS; ++i) {
075      sched.addBack(new TestProcedureWithEvent(i));
076    }
077
078    final Thread[] threads = new Thread[4];
079    final AtomicInteger waitCount = new AtomicInteger(0);
080    final AtomicInteger wakeCount = new AtomicInteger(0);
081
082    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>();
083    threads[0] = new Thread() {
084      @Override
085      public void run() {
086        long lastUpdate = 0;
087        while (true) {
088          final int oldWakeCount = wakeCount.get();
089          if (useWakeBatch) {
090            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
091            for (int i = 0; i < ev.length; ++i) {
092              ev[i] = waitQueue.pollFirst().getEvent();
093              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
094            }
095            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
096            wakeCount.addAndGet(ev.length);
097          } else {
098            int size = waitQueue.size();
099            while (size-- > 0) {
100              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
101              ev.wake(procSched);
102              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
103              wakeCount.incrementAndGet();
104            }
105          }
106          if (wakeCount.get() != oldWakeCount) {
107            lastUpdate = EnvironmentEdgeManager.currentTime();
108          } else if (
109            wakeCount.get() >= NRUNS
110              && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD
111          ) {
112            break;
113          }
114          Threads.sleepWithoutInterrupt(25);
115        }
116      }
117    };
118
119    for (int i = 1; i < threads.length; ++i) {
120      threads[i] = new Thread() {
121        @Override
122        public void run() {
123          while (true) {
124            TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll();
125            if (proc == null) {
126              continue;
127            }
128
129            proc.getEvent().suspend();
130            waitQueue.add(proc);
131            proc.getEvent().suspendIfNotReady(proc);
132            LOG.debug("WAIT " + proc.getEvent());
133            if (waitCount.incrementAndGet() >= NRUNS) {
134              break;
135            }
136          }
137        }
138      };
139    }
140
141    for (int i = 0; i < threads.length; ++i) {
142      threads[i].start();
143    }
144    for (int i = 0; i < threads.length; ++i) {
145      threads[i].join();
146    }
147
148    sched.clear();
149  }
150
151  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
152    private final ProcedureEvent event;
153
154    public TestProcedureWithEvent(long procId) {
155      setProcId(procId);
156      event = new ProcedureEvent("test-event procId=" + procId);
157    }
158
159    public ProcedureEvent getEvent() {
160      return event;
161    }
162  }
163}