001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.ConcurrentSkipListSet; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 025import org.apache.hadoop.hbase.testclassification.MasterTests; 026import org.apache.hadoop.hbase.testclassification.SmallTests; 027import org.apache.hadoop.hbase.util.Threads; 028import org.junit.After; 029import org.junit.Before; 030import org.junit.ClassRule; 031import org.junit.Test; 032import org.junit.experimental.categories.Category; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036@Category({MasterTests.class, SmallTests.class}) 037public class TestProcedureSchedulerConcurrency { 038 @ClassRule 039 public static final HBaseClassTestRule CLASS_RULE = 040 HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class); 041 042 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 043 044 private SimpleProcedureScheduler procSched; 045 046 @Before 047 public void setUp() throws IOException { 048 procSched = new SimpleProcedureScheduler(); 049 procSched.start(); 050 } 051 052 @After 053 public void tearDown() throws IOException { 054 procSched.stop(); 055 } 056 057 @Test 058 public void testConcurrentWaitWake() throws Exception { 059 testConcurrentWaitWake(false); 060 } 061 062 @Test 063 public void testConcurrentWaitWakeBatch() throws Exception { 064 testConcurrentWaitWake(true); 065 } 066 067 private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { 068 final int WAIT_THRESHOLD = 2500; 069 final int NPROCS = 20; 070 final int NRUNS = 500; 071 072 final ProcedureScheduler sched = procSched; 073 for (long i = 0; i < NPROCS; ++i) { 074 sched.addBack(new TestProcedureWithEvent(i)); 075 } 076 077 final Thread[] threads = new Thread[4]; 078 final AtomicInteger waitCount = new AtomicInteger(0); 079 final AtomicInteger wakeCount = new AtomicInteger(0); 080 081 final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>(); 082 threads[0] = new Thread() { 083 @Override 084 public void run() { 085 long lastUpdate = 0; 086 while (true) { 087 final int oldWakeCount = wakeCount.get(); 088 if (useWakeBatch) { 089 ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; 090 for (int i = 0; i < ev.length; ++i) { 091 ev[i] = waitQueue.pollFirst().getEvent(); 092 LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); 093 } 094 ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); 095 wakeCount.addAndGet(ev.length); 096 } else { 097 int size = waitQueue.size(); 098 while (size-- > 0) { 099 ProcedureEvent ev = waitQueue.pollFirst().getEvent(); 100 ev.wake(procSched); 101 LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); 102 wakeCount.incrementAndGet(); 103 } 104 } 105 if (wakeCount.get() != oldWakeCount) { 106 lastUpdate = System.currentTimeMillis(); 107 } else if (wakeCount.get() >= NRUNS && 108 (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) { 109 break; 110 } 111 Threads.sleepWithoutInterrupt(25); 112 } 113 } 114 }; 115 116 for (int i = 1; i < threads.length; ++i) { 117 threads[i] = new Thread() { 118 @Override 119 public void run() { 120 while (true) { 121 TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll(); 122 if (proc == null) { 123 continue; 124 } 125 126 proc.getEvent().suspend(); 127 waitQueue.add(proc); 128 proc.getEvent().suspendIfNotReady(proc); 129 LOG.debug("WAIT " + proc.getEvent()); 130 if (waitCount.incrementAndGet() >= NRUNS) { 131 break; 132 } 133 } 134 } 135 }; 136 } 137 138 for (int i = 0; i < threads.length; ++i) { 139 threads[i].start(); 140 } 141 for (int i = 0; i < threads.length; ++i) { 142 threads[i].join(); 143 } 144 145 sched.clear(); 146 } 147 148 public static class TestProcedureWithEvent extends NoopProcedure<Void> { 149 private final ProcedureEvent event; 150 151 public TestProcedureWithEvent(long procId) { 152 setProcId(procId); 153 event = new ProcedureEvent("test-event procId=" + procId); 154 } 155 156 public ProcedureEvent getEvent() { 157 return event; 158 } 159 } 160}