001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.ConcurrentSkipListSet; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 025import org.apache.hadoop.hbase.testclassification.MasterTests; 026import org.apache.hadoop.hbase.testclassification.SmallTests; 027import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 028import org.apache.hadoop.hbase.util.Threads; 029import org.junit.After; 030import org.junit.Before; 031import org.junit.ClassRule; 032import org.junit.Test; 033import org.junit.experimental.categories.Category; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037@Category({ MasterTests.class, SmallTests.class }) 038public class TestProcedureSchedulerConcurrency { 039 @ClassRule 040 public static final HBaseClassTestRule CLASS_RULE = 041 HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class); 042 043 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 044 045 private SimpleProcedureScheduler procSched; 046 047 @Before 048 public void setUp() throws IOException { 049 procSched = new SimpleProcedureScheduler(); 050 procSched.start(); 051 } 052 053 @After 054 public void tearDown() throws IOException { 055 procSched.stop(); 056 } 057 058 @Test 059 public void testConcurrentWaitWake() throws Exception { 060 testConcurrentWaitWake(false); 061 } 062 063 @Test 064 public void testConcurrentWaitWakeBatch() throws Exception { 065 testConcurrentWaitWake(true); 066 } 067 068 private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { 069 final int WAIT_THRESHOLD = 2500; 070 final int NPROCS = 20; 071 final int NRUNS = 500; 072 073 final ProcedureScheduler sched = procSched; 074 for (long i = 0; i < NPROCS; ++i) { 075 sched.addBack(new TestProcedureWithEvent(i)); 076 } 077 078 final Thread[] threads = new Thread[4]; 079 final AtomicInteger waitCount = new AtomicInteger(0); 080 final AtomicInteger wakeCount = new AtomicInteger(0); 081 082 final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>(); 083 threads[0] = new Thread() { 084 @Override 085 public void run() { 086 long lastUpdate = 0; 087 while (true) { 088 final int oldWakeCount = wakeCount.get(); 089 if (useWakeBatch) { 090 ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; 091 for (int i = 0; i < ev.length; ++i) { 092 ev[i] = waitQueue.pollFirst().getEvent(); 093 LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); 094 } 095 ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); 096 wakeCount.addAndGet(ev.length); 097 } else { 098 int size = waitQueue.size(); 099 while (size-- > 0) { 100 ProcedureEvent ev = waitQueue.pollFirst().getEvent(); 101 ev.wake(procSched); 102 LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); 103 wakeCount.incrementAndGet(); 104 } 105 } 106 if (wakeCount.get() != oldWakeCount) { 107 lastUpdate = EnvironmentEdgeManager.currentTime(); 108 } else if ( 109 wakeCount.get() >= NRUNS 110 && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD 111 ) { 112 break; 113 } 114 Threads.sleepWithoutInterrupt(25); 115 } 116 } 117 }; 118 119 for (int i = 1; i < threads.length; ++i) { 120 threads[i] = new Thread() { 121 @Override 122 public void run() { 123 while (true) { 124 TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll(); 125 if (proc == null) { 126 continue; 127 } 128 129 proc.getEvent().suspend(); 130 waitQueue.add(proc); 131 proc.getEvent().suspendIfNotReady(proc); 132 LOG.debug("WAIT " + proc.getEvent()); 133 if (waitCount.incrementAndGet() >= NRUNS) { 134 break; 135 } 136 } 137 } 138 }; 139 } 140 141 for (int i = 0; i < threads.length; ++i) { 142 threads[i].start(); 143 } 144 for (int i = 0; i < threads.length; ++i) { 145 threads[i].join(); 146 } 147 148 sched.clear(); 149 } 150 151 public static class TestProcedureWithEvent extends NoopProcedure<Void> { 152 private final ProcedureEvent event; 153 154 public TestProcedureWithEvent(long procId) { 155 setProcId(procId); 156 event = new ProcedureEvent("test-event procId=" + procId); 157 } 158 159 public ProcedureEvent getEvent() { 160 return event; 161 } 162 } 163}