001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.util.concurrent.ConcurrentSkipListSet; 022import java.util.concurrent.atomic.AtomicInteger; 023import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; 024import org.apache.hadoop.hbase.testclassification.MasterTests; 025import org.apache.hadoop.hbase.testclassification.SmallTests; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.util.Threads; 028import org.junit.jupiter.api.AfterEach; 029import org.junit.jupiter.api.BeforeEach; 030import org.junit.jupiter.api.Tag; 031import org.junit.jupiter.api.Test; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035@Tag(MasterTests.TAG) 036@Tag(SmallTests.TAG) 037public class TestProcedureSchedulerConcurrency { 038 039 private static final Logger LOG = 040 LoggerFactory.getLogger(TestProcedureSchedulerConcurrency.class); 041 042 private SimpleProcedureScheduler procSched; 043 044 @BeforeEach 045 public void setUp() throws IOException { 046 procSched = new SimpleProcedureScheduler(); 047 procSched.start(); 048 } 049 050 @AfterEach 051 public void tearDown() throws IOException { 052 procSched.stop(); 053 } 054 055 @Test 056 public void testConcurrentWaitWake() throws Exception { 057 testConcurrentWaitWake(false); 058 } 059 060 @Test 061 public void testConcurrentWaitWakeBatch() throws Exception { 062 testConcurrentWaitWake(true); 063 } 064 065 private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception { 066 final int WAIT_THRESHOLD = 2500; 067 final int NPROCS = 20; 068 final int NRUNS = 500; 069 070 final ProcedureScheduler sched = procSched; 071 for (long i = 0; i < NPROCS; ++i) { 072 sched.addBack(new TestProcedureWithEvent(i)); 073 } 074 075 final Thread[] threads = new Thread[4]; 076 final AtomicInteger waitCount = new AtomicInteger(0); 077 final AtomicInteger wakeCount = new AtomicInteger(0); 078 079 final ConcurrentSkipListSet<TestProcedureWithEvent> waitQueue = new ConcurrentSkipListSet<>(); 080 threads[0] = new Thread() { 081 @Override 082 public void run() { 083 long lastUpdate = 0; 084 while (true) { 085 final int oldWakeCount = wakeCount.get(); 086 if (useWakeBatch) { 087 ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()]; 088 for (int i = 0; i < ev.length; ++i) { 089 ev[i] = waitQueue.pollFirst().getEvent(); 090 LOG.debug("WAKE BATCH " + ev[i] + " total=" + wakeCount.get()); 091 } 092 ProcedureEvent.wakeEvents((AbstractProcedureScheduler) sched, ev); 093 wakeCount.addAndGet(ev.length); 094 } else { 095 int size = waitQueue.size(); 096 while (size-- > 0) { 097 ProcedureEvent ev = waitQueue.pollFirst().getEvent(); 098 ev.wake(procSched); 099 LOG.debug("WAKE " + ev + " total=" + wakeCount.get()); 100 wakeCount.incrementAndGet(); 101 } 102 } 103 if (wakeCount.get() != oldWakeCount) { 104 lastUpdate = EnvironmentEdgeManager.currentTime(); 105 } else if ( 106 wakeCount.get() >= NRUNS 107 && (EnvironmentEdgeManager.currentTime() - lastUpdate) > WAIT_THRESHOLD 108 ) { 109 break; 110 } 111 Threads.sleepWithoutInterrupt(25); 112 } 113 } 114 }; 115 116 for (int i = 1; i < threads.length; ++i) { 117 threads[i] = new Thread() { 118 @Override 119 public void run() { 120 while (true) { 121 TestProcedureWithEvent proc = (TestProcedureWithEvent) sched.poll(); 122 if (proc == null) { 123 continue; 124 } 125 126 proc.getEvent().suspend(); 127 waitQueue.add(proc); 128 proc.getEvent().suspendIfNotReady(proc); 129 LOG.debug("WAIT " + proc.getEvent()); 130 if (waitCount.incrementAndGet() >= NRUNS) { 131 break; 132 } 133 } 134 } 135 }; 136 } 137 138 for (int i = 0; i < threads.length; ++i) { 139 threads[i].start(); 140 } 141 for (int i = 0; i < threads.length; ++i) { 142 threads[i].join(); 143 } 144 145 sched.clear(); 146 } 147 148 public static class TestProcedureWithEvent extends NoopProcedure<Void> { 149 private final ProcedureEvent event; 150 151 public TestProcedureWithEvent(long procId) { 152 setProcId(procId); 153 event = new ProcedureEvent("test-event procId=" + procId); 154 } 155 156 public ProcedureEvent getEvent() { 157 return event; 158 } 159 } 160}