001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.hbase.testclassification.MiscTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.junit.jupiter.api.BeforeEach; 032import org.junit.jupiter.api.Tag; 033import org.junit.jupiter.api.Test; 034 035@Tag(MiscTests.TAG) 036@Tag(SmallTests.TAG) 037public class TestStealJobQueue { 038 039 StealJobQueue<Integer> stealJobQueue; 040 BlockingQueue<Integer> stealFromQueue; 041 042 @BeforeEach 043 public void setup() { 044 stealJobQueue = new StealJobQueue<>(Integer::compare); 045 stealFromQueue = stealJobQueue.getStealFromQueue(); 046 047 } 048 049 @Test 050 public void testTake() throws InterruptedException { 051 stealJobQueue.offer(3); 052 stealFromQueue.offer(10); 053 stealJobQueue.offer(15); 054 stealJobQueue.offer(4); 055 assertEquals(3, stealJobQueue.take().intValue()); 056 assertEquals(4, stealJobQueue.take().intValue()); 057 assertEquals(15, stealJobQueue.take().intValue(), 058 "always take from the main queue before trying to steal"); 059 assertEquals(10, stealJobQueue.take().intValue()); 060 assertTrue(stealFromQueue.isEmpty()); 061 assertTrue(stealJobQueue.isEmpty()); 062 } 063 064 @Test 065 public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException { 066 final AtomicInteger taken = new AtomicInteger(); 067 Thread consumer = new Thread() { 068 @Override 069 public void run() { 070 try { 071 Integer n = stealJobQueue.take(); 072 taken.set(n); 073 } catch (InterruptedException e) { 074 e.printStackTrace(); 075 } 076 } 077 }; 078 consumer.start(); 079 stealFromQueue.offer(3); 080 consumer.join(1000); 081 assertEquals(3, taken.get()); 082 consumer.interrupt(); // Ensure the consumer thread will stop. 083 } 084 085 @Test 086 public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException { 087 final AtomicInteger taken = new AtomicInteger(); 088 Thread consumer = new Thread() { 089 @Override 090 public void run() { 091 try { 092 Integer n = stealJobQueue.take(); 093 taken.set(n); 094 } catch (InterruptedException e) { 095 e.printStackTrace(); 096 } 097 } 098 }; 099 consumer.start(); 100 stealJobQueue.offer(3); 101 consumer.join(1000); 102 assertEquals(3, taken.get()); 103 consumer.interrupt(); // Ensure the consumer thread will stop. 104 } 105 106 @Test 107 public void testPoll() throws InterruptedException { 108 stealJobQueue.offer(3); 109 stealFromQueue.offer(10); 110 stealJobQueue.offer(15); 111 stealJobQueue.offer(4); 112 assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 113 assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 114 assertEquals(15, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue(), 115 "always take from the main queue before trying to steal"); 116 assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 117 assertTrue(stealFromQueue.isEmpty()); 118 assertTrue(stealJobQueue.isEmpty()); 119 assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS)); 120 } 121 122 @Test 123 public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException { 124 final AtomicInteger taken = new AtomicInteger(); 125 Thread consumer = new Thread() { 126 @Override 127 public void run() { 128 try { 129 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 130 taken.set(n); 131 } catch (InterruptedException e) { 132 e.printStackTrace(); 133 } 134 } 135 }; 136 consumer.start(); 137 stealFromQueue.put(3); 138 consumer.join(1000); 139 assertEquals(3, taken.get()); 140 consumer.interrupt(); // Ensure the consumer thread will stop. 141 142 } 143 144 @Test 145 public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException { 146 final AtomicInteger taken = new AtomicInteger(); 147 Thread consumer = new Thread() { 148 @Override 149 public void run() { 150 try { 151 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 152 taken.set(n); 153 } catch (InterruptedException e) { 154 e.printStackTrace(); 155 } 156 } 157 }; 158 consumer.start(); 159 stealJobQueue.add(3); 160 consumer.join(1000); 161 assertEquals(3, taken.get()); 162 consumer.interrupt(); // Ensure the consumer thread will stop. 163 } 164 165 @Test 166 public void testInteractWithThreadPool() throws InterruptedException { 167 StealJobQueue<Runnable> stealTasksQueue = 168 new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2)); 169 final CountDownLatch stealJobCountDown = new CountDownLatch(3); 170 final CountDownLatch stealFromCountDown = new CountDownLatch(3); 171 ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) { 172 @Override 173 protected void afterExecute(Runnable r, Throwable t) { 174 super.afterExecute(r, t); 175 stealJobCountDown.countDown(); 176 } 177 178 }; 179 180 // This is necessary otherwise no worker will be running and stealing job 181 stealPool.prestartAllCoreThreads(); 182 183 ThreadPoolExecutor stealFromPool = 184 new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue.getStealFromQueue()) { 185 @Override 186 protected void afterExecute(Runnable r, Throwable t) { 187 super.afterExecute(r, t); 188 stealFromCountDown.countDown(); 189 } 190 }; 191 192 for (int i = 0; i < 4; i++) { 193 TestTask task = new TestTask(); 194 stealFromPool.execute(task); 195 } 196 197 for (int i = 0; i < 2; i++) { 198 TestTask task = new TestTask(); 199 stealPool.execute(task); 200 } 201 202 stealJobCountDown.await(1, TimeUnit.SECONDS); 203 stealFromCountDown.await(1, TimeUnit.SECONDS); 204 assertEquals(0, stealFromCountDown.getCount()); 205 assertEquals(0, stealJobCountDown.getCount()); 206 } 207 208 class TestTask extends Thread implements Comparable<TestTask> { 209 @Override 210 public int compareTo(TestTask o) { 211 return 0; 212 } 213 214 @Override 215 public void run() { 216 try { 217 Thread.sleep(200); 218 } catch (InterruptedException e) { 219 e.printStackTrace(); 220 } 221 } 222 } 223 224}