001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.testclassification.MiscTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037@Category({MiscTests.class, SmallTests.class}) 038public class TestStealJobQueue { 039 040 @ClassRule 041 public static final HBaseClassTestRule CLASS_RULE = 042 HBaseClassTestRule.forClass(TestStealJobQueue.class); 043 044 StealJobQueue<Integer> stealJobQueue; 045 BlockingQueue<Integer> stealFromQueue; 046 047 @Before 048 public void setup() { 049 stealJobQueue = new StealJobQueue<>(Integer::compare); 050 stealFromQueue = stealJobQueue.getStealFromQueue(); 051 052 } 053 054 055 @Test 056 public void testTake() throws InterruptedException { 057 stealJobQueue.offer(3); 058 stealFromQueue.offer(10); 059 stealJobQueue.offer(15); 060 stealJobQueue.offer(4); 061 assertEquals(3, stealJobQueue.take().intValue()); 062 assertEquals(4, stealJobQueue.take().intValue()); 063 assertEquals("always take from the main queue before trying to steal", 15, 064 stealJobQueue.take().intValue()); 065 assertEquals(10, stealJobQueue.take().intValue()); 066 assertTrue(stealFromQueue.isEmpty()); 067 assertTrue(stealJobQueue.isEmpty()); 068 } 069 070 @Test 071 public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException { 072 final AtomicInteger taken = new AtomicInteger(); 073 Thread consumer = new Thread() { 074 @Override 075 public void run() { 076 try { 077 Integer n = stealJobQueue.take(); 078 taken.set(n); 079 } catch (InterruptedException e) { 080 e.printStackTrace(); 081 } 082 } 083 }; 084 consumer.start(); 085 stealFromQueue.offer(3); 086 consumer.join(1000); 087 assertEquals(3, taken.get()); 088 consumer.interrupt(); //Ensure the consumer thread will stop. 089 } 090 091 092 @Test 093 public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException { 094 final AtomicInteger taken = new AtomicInteger(); 095 Thread consumer = new Thread() { 096 @Override 097 public void run() { 098 try { 099 Integer n = stealJobQueue.take(); 100 taken.set(n); 101 } catch (InterruptedException e) { 102 e.printStackTrace(); 103 } 104 } 105 }; 106 consumer.start(); 107 stealJobQueue.offer(3); 108 consumer.join(1000); 109 assertEquals(3, taken.get()); 110 consumer.interrupt(); //Ensure the consumer thread will stop. 111 } 112 113 114 @Test 115 public void testPoll() throws InterruptedException { 116 stealJobQueue.offer(3); 117 stealFromQueue.offer(10); 118 stealJobQueue.offer(15); 119 stealJobQueue.offer(4); 120 assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 121 assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 122 assertEquals("always take from the main queue before trying to steal", 15, 123 stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 124 assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 125 assertTrue(stealFromQueue.isEmpty()); 126 assertTrue(stealJobQueue.isEmpty()); 127 assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS)); 128 } 129 130 @Test 131 public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException { 132 final AtomicInteger taken = new AtomicInteger(); 133 Thread consumer = new Thread() { 134 @Override 135 public void run() { 136 try { 137 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 138 taken.set(n); 139 } catch (InterruptedException e) { 140 e.printStackTrace(); 141 } 142 } 143 }; 144 consumer.start(); 145 stealFromQueue.put(3); 146 consumer.join(1000); 147 assertEquals(3, taken.get()); 148 consumer.interrupt(); //Ensure the consumer thread will stop. 149 150 } 151 152 153 @Test 154 public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException { 155 final AtomicInteger taken = new AtomicInteger(); 156 Thread consumer = new Thread() { 157 @Override 158 public void run() { 159 try { 160 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 161 taken.set(n); 162 } catch (InterruptedException e) { 163 e.printStackTrace(); 164 } 165 } 166 }; 167 consumer.start(); 168 stealJobQueue.add(3); 169 consumer.join(1000); 170 assertEquals(3, taken.get()); 171 consumer.interrupt(); //Ensure the consumer thread will stop. 172 } 173 174 175 @Test 176 public void testInteractWithThreadPool() throws InterruptedException { 177 StealJobQueue<Runnable> stealTasksQueue = 178 new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2)); 179 final CountDownLatch stealJobCountDown = new CountDownLatch(3); 180 final CountDownLatch stealFromCountDown = new CountDownLatch(3); 181 ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) { 182 @Override 183 protected void afterExecute(Runnable r, Throwable t) { 184 super.afterExecute(r, t); 185 stealJobCountDown.countDown(); 186 } 187 188 }; 189 190 //This is necessary otherwise no worker will be running and stealing job 191 stealPool.prestartAllCoreThreads(); 192 193 ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, 194 stealTasksQueue.getStealFromQueue()) { 195 @Override 196 protected void afterExecute(Runnable r, Throwable t) { 197 super.afterExecute(r, t); 198 stealFromCountDown.countDown(); 199 } 200 }; 201 202 for (int i = 0; i < 4; i++) { 203 TestTask task = new TestTask(); 204 stealFromPool.execute(task); 205 } 206 207 for (int i = 0; i < 2; i++) { 208 TestTask task = new TestTask(); 209 stealPool.execute(task); 210 } 211 212 stealJobCountDown.await(1, TimeUnit.SECONDS); 213 stealFromCountDown.await(1, TimeUnit.SECONDS); 214 assertEquals(0, stealFromCountDown.getCount()); 215 assertEquals(0, stealJobCountDown.getCount()); 216 } 217 218 class TestTask extends Thread implements Comparable<TestTask> { 219 @Override 220 public int compareTo(TestTask o) { 221 return 0; 222 } 223 224 @Override 225 public void run() { 226 try { 227 Thread.sleep(200); 228 } catch (InterruptedException e) { 229 e.printStackTrace(); 230 } 231 } 232 } 233 234}