001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.ConcurrentSkipListSet; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 025import org.apache.hadoop.hbase.testclassification.MasterTests; 026import org.apache.hadoop.hbase.testclassification.MediumTests; 027import org.apache.hadoop.hbase.util.Threads; 028import org.junit.After; 029import org.junit.Before; 030import org.junit.ClassRule; 031import org.junit.Test; 032import org.junit.experimental.categories.Category; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036@Category({MasterTests.class, MediumTests.class}) 037public class TestProcedureSchedulerConcurrency { 038 039 @ClassRule 040 public static final HBaseClassTestRule CLASS_RULE = 041 HBaseClassTestRule.forClass(TestProcedureSchedulerConcurrency.class); 042 043 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 044 045 private SimpleProcedureScheduler procSched; 046 047 @Before 048 public void setUp() throws IOException { 049 procSched = new SimpleProcedureScheduler(); 050 procSched.start(); 051 } 052 053 @After 054 public void tearDown() throws IOException { 055 procSched.stop(); 056 } 057 058 @Test 059 public void testConcurrentWaitWake() throws Exception { 060 testConcurrentWaitWake(false); 061 } 062 063 @Test 064 public void testConcurrentWaitWakeBatch() throws Exception { 065 testConcurrentWaitWake(true); 066 } 067 068 private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { 069 final int WAIT_THRESHOLD = 2500; 070 final int NPROCS = 20; 071 final int NRUNS = 500; 072 073 final ProcedureScheduler sched = procSched; 074 for (long i = 0; i < NPROCS; ++i) { 075 sched.addBack(new TestProcedureWithEvent(i)); 076 } 077 078 final Thread[] threads = new Thread[4]; 079 final AtomicInteger waitCount = new AtomicInteger(0); 080 final AtomicInteger wakeCount = new AtomicInteger(0); 081 082 final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>(); 083 threads[0] = new Thread() { 084 @Override 085 public void run() { 086 long lastUpdate = 0; 087 while (true) { 088 final int oldWakeCount = wakeCount.get(); 089 if (useWakeBatch) { 090 ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; 091 for (int i = 0; i < ev.length; ++i) { 092 ev[i] = waitQueue.pollFirst().getEvent(); 093 LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); 094 } 095 ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); 096 wakeCount.addAndGet(ev.length); 097 } else { 098 int size = waitQueue.size(); 099 while (size-- > 0) { 100 ProcedureEvent ev = waitQueue.pollFirst().getEvent(); 101 ev.wake(procSched); 102 LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); 103 wakeCount.incrementAndGet(); 104 } 105 } 106 if (wakeCount.get() != oldWakeCount) { 107 lastUpdate = System.currentTimeMillis(); 108 } else if (wakeCount.get() >= NRUNS && 109 (System.currentTimeMillis() - lastUpdate) > WAIT_THRESHOLD) { 110 break; 111 } 112 Threads.sleepWithoutInterrupt(25); 113 } 114 } 115 }; 116 117 for (int i = 1; i < threads.length; ++i) { 118 threads[i] = new Thread() { 119 @Override 120 public void run() { 121 while (true) { 122 TestProcedureWithEvent proc = (TestProcedureWithEvent)sched.poll(); 123 if (proc == null) continue; 124 125 proc.getEvent().suspend(); 126 waitQueue.add(proc); 127 proc.getEvent().suspendIfNotReady(proc); 128 LOG.debug("WAIT " + proc.getEvent()); 129 if (waitCount.incrementAndGet() >= NRUNS) { 130 break; 131 } 132 } 133 } 134 }; 135 } 136 137 for (int i = 0; i < threads.length; ++i) { 138 threads[i].start(); 139 } 140 for (int i = 0; i < threads.length; ++i) { 141 threads[i].join(); 142 } 143 144 sched.clear(); 145 } 146 147 public static class TestProcedureWithEvent extends NoopProcedure<Void> { 148 private final ProcedureEvent event; 149 150 public TestProcedureWithEvent(long procId) { 151 setProcId(procId); 152 event = new ProcedureEvent("test-event procId=" + procId); 153 } 154 155 public ProcedureEvent getEvent() { 156 return event; 157 } 158 } 159}