001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.ConcurrentSkipListSet;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
025import org.apache.hadoop.hbase.testclassification.MasterTests;
026import org.apache.hadoop.hbase.testclassification.MediumTests;
027import org.apache.hadoop.hbase.util.Threads;
028import org.junit.After;
029import org.junit.Before;
030import org.junit.ClassRule;
031import org.junit.Test;
032import org.junit.experimental.categories.Category;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036@Category({MasterTests.class, MediumTests.class})
037public class TestProcedureSchedulerConcurrency {
038
039  @ClassRule
040  public static final HBaseClassTestRule CLASS_RULE =
041      HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class);
042
043  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
044
045  private SimpleProcedureScheduler procSched;
046
047  @Before
048  public void setUp() throws IOException {
049    procSched = new SimpleProcedureScheduler();
050    procSched.start();
051  }
052
053  @After
054  public void tearDown() throws IOException {
055    procSched.stop();
056  }
057
058  @Test
059  public void testConcurrentWaitWake() throws Exception {
060    testConcurrentWaitWake(false);
061  }
062
063  @Test
064  public void testConcurrentWaitWakeBatch() throws Exception {
065    testConcurrentWaitWake(true);
066  }
067
068  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
069    final int WAIT_THRESHOLD = 2500;
070    final int NPROCS = 20;
071    final int NRUNS = 500;
072
073    final ProcedureScheduler sched = procSched;
074    for (long i = 0; i < NPROCS; ++i) {
075      sched.addBack(new TestProcedureWithEvent(i));
076    }
077
078    final Thread[] threads = new Thread[4];
079    final AtomicInteger waitCount = new AtomicInteger(0);
080    final AtomicInteger wakeCount = new AtomicInteger(0);
081
082    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>();
083    threads[0] = new Thread() {
084      @Override
085      public void run() {
086        long lastUpdate = 0;
087        while (true) {
088          final int oldWakeCount = wakeCount.get();
089          if (useWakeBatch) {
090            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
091            for (int i = 0; i < ev.length; ++i) {
092              ev[i] = waitQueue.pollFirst().getEvent();
093              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
094            }
095            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
096            wakeCount.addAndGet(ev.length);
097          } else {
098            int size = waitQueue.size();
099            while (size-- > 0) {
100              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
101              ev.wake(procSched);
102              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
103              wakeCount.incrementAndGet();
104            }
105          }
106          if (wakeCount.get() != oldWakeCount) {
107            lastUpdate = System.currentTimeMillis();
108          } else if (wakeCount.get() >= NRUNS &&
109              (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) {
110            break;
111          }
112          Threads.sleepWithoutInterrupt(25);
113        }
114      }
115    };
116
117    for (int i = 1; i < threads.length; ++i) {
118      threads[i] = new Thread() {
119        @Override
120        public void run() {
121          while (true) {
122            TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
123            if (proc == null) continue;
124
125            proc.getEvent().suspend();
126            waitQueue.add(proc);
127            proc.getEvent().suspendIfNotReady(proc);
128            LOG.debug("WAIT " + proc.getEvent());
129            if (waitCount.incrementAndGet() >= NRUNS) {
130              break;
131            }
132          }
133        }
134      };
135    }
136
137    for (int i = 0; i < threads.length; ++i) {
138      threads[i].start();
139    }
140    for (int i = 0; i < threads.length; ++i) {
141      threads[i].join();
142    }
143
144    sched.clear();
145  }
146
147  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
148    private final ProcedureEvent event;
149
150    public TestProcedureWithEvent(long procId) {
151      setProcId(procId);
152      event = new ProcedureEvent("test-event procId=" + procId);
153    }
154
155    public ProcedureEvent getEvent() {
156      return event;
157    }
158  }
159}