001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.util.concurrent.ConcurrentSkipListSet;
022import java.util.concurrent.atomic.AtomicInteger;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
025import org.apache.hadoop.hbase.testclassification.MasterTests;
026import org.apache.hadoop.hbase.testclassification.SmallTests;
027import org.apache.hadoop.hbase.util.Threads;
028import org.junit.After;
029import org.junit.Before;
030import org.junit.ClassRule;
031import org.junit.Test;
032import org.junit.experimental.categories.Category;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036@Category({MasterTests.class, SmallTests.class})
037public class TestProcedureSchedulerConcurrency {
038  @ClassRule
039  public static final HBaseClassTestRule CLASS_RULE =
040      HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class);
041
042  private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class);
043
044  private SimpleProcedureScheduler procSched;
045
046  @Before
047  public void setUp() throws IOException {
048    procSched = new SimpleProcedureScheduler();
049    procSched.start();
050  }
051
052  @After
053  public void tearDown() throws IOException {
054    procSched.stop();
055  }
056
057  @Test
058  public void testConcurrentWaitWake() throws Exception {
059    testConcurrentWaitWake(false);
060  }
061
062  @Test
063  public void testConcurrentWaitWakeBatch() throws Exception {
064    testConcurrentWaitWake(true);
065  }
066
067  private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
068    final int WAIT_THRESHOLD = 2500;
069    final int NPROCS = 20;
070    final int NRUNS = 500;
071
072    final ProcedureScheduler sched = procSched;
073    for (long i = 0; i < NPROCS; ++i) {
074      sched.addBack(new TestProcedureWithEvent(i));
075    }
076
077    final Thread[] threads = new Thread[4];
078    final AtomicInteger waitCount = new AtomicInteger(0);
079    final AtomicInteger wakeCount = new AtomicInteger(0);
080
081    final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>();
082    threads[0] = new Thread() {
083      @Override
084      public void run() {
085        long lastUpdate = 0;
086        while (true) {
087          final int oldWakeCount = wakeCount.get();
088          if (useWakeBatch) {
089            ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
090            for (int i = 0; i < ev.length; ++i) {
091              ev[i] = waitQueue.pollFirst().getEvent();
092              LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get());
093            }
094            ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev);
095            wakeCount.addAndGet(ev.length);
096          } else {
097            int size = waitQueue.size();
098            while (size-- > 0) {
099              ProcedureEvent ev = waitQueue.pollFirst().getEvent();
100              ev.wake(procSched);
101              LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
102              wakeCount.incrementAndGet();
103            }
104          }
105          if (wakeCount.get() != oldWakeCount) {
106            lastUpdate = System.currentTimeMillis();
107          } else if (wakeCount.get() >= NRUNS &&
108              (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) {
109            break;
110          }
111          Threads.sleepWithoutInterrupt(25);
112        }
113      }
114    };
115
116    for (int i = 1; i < threads.length; ++i) {
117      threads[i] = new Thread() {
118        @Override
119        public void run() {
120          while (true) {
121            TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll();
122            if (proc == null) {
123              continue;
124            }
125
126            proc.getEvent().suspend();
127            waitQueue.add(proc);
128            proc.getEvent().suspendIfNotReady(proc);
129            LOG.debug("WAIT " + proc.getEvent());
130            if (waitCount.incrementAndGet() >= NRUNS) {
131              break;
132            }
133          }
134        }
135      };
136    }
137
138    for (int i = 0; i < threads.length; ++i) {
139      threads[i].start();
140    }
141    for (int i = 0; i < threads.length; ++i) {
142      threads[i].join();
143    }
144
145    sched.clear();
146  }
147
148  public static class TestProcedureWithEvent extends NoopProcedure<Void> {
149    private final ProcedureEvent event;
150
151    public TestProcedureWithEvent(long procId) {
152      setProcId(procId);
153      event = new ProcedureEvent("test-event procId=" + procId);
154    }
155
156    public ProcedureEvent getEvent() {
157      return event;
158    }
159  }
160}