001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.ConcurrentSkipListSet; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 024import org.apache.hadoop.hbase.testclassification.MasterTests; 025import org.apache.hadoop.hbase.testclassification.SmallTests; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.Threads; 028import org.junit.jupiter.api.AfterEach; 029import org.junit.jupiter.api.BeforeEach; 030import org.junit.jupiter.api.Tag; 031import org.junit.jupiter.api.Test; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035@Tag(MasterTests.TAG) 036@Tag(SmallTests.TAG) 037public class TestProcedureSchedulerConcurrency { 038 039 private static final Logger LOG = LoggerFactory.getLogger(TestProcedureEvents.class); 040 041 private SimpleProcedureScheduler procSched; 042 043 @BeforeEach 044 public void setUp() throws IOException { 045 procSched = new SimpleProcedureScheduler(); 046 procSched.start(); 047 } 048 049 @AfterEach 050 public void tearDown() throws IOException { 051 procSched.stop(); 052 } 053 054 @Test 055 public void testConcurrentWaitWake() throws Exception { 056 testConcurrentWaitWake(false); 057 } 058 059 @Test 060 public void testConcurrentWaitWakeBatch() throws Exception { 061 testConcurrentWaitWake(true); 062 } 063 064 private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { 065 final int WAIT_THRESHOLD = 2500; 066 final int NPROCS = 20; 067 final int NRUNS = 500; 068 069 final ProcedureScheduler sched = procSched; 070 for (long i = 0; i < NPROCS; ++i) { 071 sched.addBack(new TestProcedureWithEvent(i)); 072 } 073 074 final Thread[] threads = new Thread[4]; 075 final AtomicInteger waitCount = new AtomicInteger(0); 076 final AtomicInteger wakeCount = new AtomicInteger(0); 077 078 final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>(); 079 threads[0] = new Thread() { 080 @Override 081 public void run() { 082 long lastUpdate = 0; 083 while (true) { 084 final int oldWakeCount = wakeCount.get(); 085 if (useWakeBatch) { 086 ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; 087 for (int i = 0; i < ev.length; ++i) { 088 ev[i] = waitQueue.pollFirst().getEvent(); 089 LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); 090 } 091 ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); 092 wakeCount.addAndGet(ev.length); 093 } else { 094 int size = waitQueue.size(); 095 while (size-- > 0) { 096 ProcedureEvent ev = waitQueue.pollFirst().getEvent(); 097 ev.wake(procSched); 098 LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); 099 wakeCount.incrementAndGet(); 100 } 101 } 102 if (wakeCount.get() != oldWakeCount) { 103 lastUpdate = EnvironmentEdgeManager.currentTime(); 104 } else if ( 105 wakeCount.get() >= NRUNS 106 && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD 107 ) { 108 break; 109 } 110 Threads.sleepWithoutInterrupt(25); 111 } 112 } 113 }; 114 115 for (int i = 1; i < threads.length; ++i) { 116 threads[i] = new Thread() { 117 @Override 118 public void run() { 119 while (true) { 120 TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll(); 121 if (proc == null) { 122 continue; 123 } 124 125 proc.getEvent().suspend(); 126 waitQueue.add(proc); 127 proc.getEvent().suspendIfNotReady(proc); 128 LOG.debug("WAIT " + proc.getEvent()); 129 if (waitCount.incrementAndGet() >= NRUNS) { 130 break; 131 } 132 } 133 } 134 }; 135 } 136 137 for (int i = 0; i < threads.length; ++i) { 138 threads[i].start(); 139 } 140 for (int i = 0; i < threads.length; ++i) { 141 threads[i].join(); 142 } 143 144 sched.clear(); 145 } 146 147 public static class TestProcedureWithEvent extends NoopProcedure<Void> { 148 private final ProcedureEvent event; 149 150 public TestProcedureWithEvent(long procId) { 151 setProcId(procId); 152 event = new ProcedureEvent("test-event procId=" + procId); 153 } 154 155 public ProcedureEvent getEvent() { 156 return event; 157 } 158 } 159}