001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.CountDownLatch; 026import java.util.concurrent.ThreadPoolExecutor; 027import java.util.concurrent.TimeUnit; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.hbase.HBaseClassTestRule; 030import org.apache.hadoop.hbase.testclassification.MiscTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037@Category({ MiscTests.class, SmallTests.class }) 038public class TestStealJobQueue { 039 040 @ClassRule 041 public static final HBaseClassTestRule CLASS_RULE = 042 HBaseClassTestRule.forClass(TestStealJobQueue.class); 043 044 StealJobQueue<Integer> stealJobQueue; 045 BlockingQueue<Integer> stealFromQueue; 046 047 @Before 048 public void setup() { 049 stealJobQueue = new StealJobQueue<>(Integer::compare); 050 stealFromQueue = stealJobQueue.getStealFromQueue(); 051 052 } 053 054 @Test 055 public void testTake() throws InterruptedException { 056 stealJobQueue.offer(3); 057 stealFromQueue.offer(10); 058 stealJobQueue.offer(15); 059 stealJobQueue.offer(4); 060 assertEquals(3, stealJobQueue.take().intValue()); 061 assertEquals(4, stealJobQueue.take().intValue()); 062 assertEquals("always take from the main queue before trying to steal", 15, 063 stealJobQueue.take().intValue()); 064 assertEquals(10, stealJobQueue.take().intValue()); 065 assertTrue(stealFromQueue.isEmpty()); 066 assertTrue(stealJobQueue.isEmpty()); 067 } 068 069 @Test 070 public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException { 071 final AtomicInteger taken = new AtomicInteger(); 072 Thread consumer = new Thread() { 073 @Override 074 public void run() { 075 try { 076 Integer n = stealJobQueue.take(); 077 taken.set(n); 078 } catch (InterruptedException e) { 079 e.printStackTrace(); 080 } 081 } 082 }; 083 consumer.start(); 084 stealFromQueue.offer(3); 085 consumer.join(1000); 086 assertEquals(3, taken.get()); 087 consumer.interrupt(); // Ensure the consumer thread will stop. 088 } 089 090 @Test 091 public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException { 092 final AtomicInteger taken = new AtomicInteger(); 093 Thread consumer = new Thread() { 094 @Override 095 public void run() { 096 try { 097 Integer n = stealJobQueue.take(); 098 taken.set(n); 099 } catch (InterruptedException e) { 100 e.printStackTrace(); 101 } 102 } 103 }; 104 consumer.start(); 105 stealJobQueue.offer(3); 106 consumer.join(1000); 107 assertEquals(3, taken.get()); 108 consumer.interrupt(); // Ensure the consumer thread will stop. 109 } 110 111 @Test 112 public void testPoll() throws InterruptedException { 113 stealJobQueue.offer(3); 114 stealFromQueue.offer(10); 115 stealJobQueue.offer(15); 116 stealJobQueue.offer(4); 117 assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 118 assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 119 assertEquals("always take from the main queue before trying to steal", 15, 120 stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 121 assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue()); 122 assertTrue(stealFromQueue.isEmpty()); 123 assertTrue(stealJobQueue.isEmpty()); 124 assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS)); 125 } 126 127 @Test 128 public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException { 129 final AtomicInteger taken = new AtomicInteger(); 130 Thread consumer = new Thread() { 131 @Override 132 public void run() { 133 try { 134 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 135 taken.set(n); 136 } catch (InterruptedException e) { 137 e.printStackTrace(); 138 } 139 } 140 }; 141 consumer.start(); 142 stealFromQueue.put(3); 143 consumer.join(1000); 144 assertEquals(3, taken.get()); 145 consumer.interrupt(); // Ensure the consumer thread will stop. 146 147 } 148 149 @Test 150 public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException { 151 final AtomicInteger taken = new AtomicInteger(); 152 Thread consumer = new Thread() { 153 @Override 154 public void run() { 155 try { 156 Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS); 157 taken.set(n); 158 } catch (InterruptedException e) { 159 e.printStackTrace(); 160 } 161 } 162 }; 163 consumer.start(); 164 stealJobQueue.add(3); 165 consumer.join(1000); 166 assertEquals(3, taken.get()); 167 consumer.interrupt(); // Ensure the consumer thread will stop. 168 } 169 170 @Test 171 public void testInteractWithThreadPool() throws InterruptedException { 172 StealJobQueue<Runnable> stealTasksQueue = 173 new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2)); 174 final CountDownLatch stealJobCountDown = new CountDownLatch(3); 175 final CountDownLatch stealFromCountDown = new CountDownLatch(3); 176 ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) { 177 @Override 178 protected void afterExecute(Runnable r, Throwable t) { 179 super.afterExecute(r, t); 180 stealJobCountDown.countDown(); 181 } 182 183 }; 184 185 // This is necessary otherwise no worker will be running and stealing job 186 stealPool.prestartAllCoreThreads(); 187 188 ThreadPoolExecutor stealFromPool = 189 new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue.getStealFromQueue()) { 190 @Override 191 protected void afterExecute(Runnable r, Throwable t) { 192 super.afterExecute(r, t); 193 stealFromCountDown.countDown(); 194 } 195 }; 196 197 for (int i = 0; i < 4; i++) { 198 TestTask task = new TestTask(); 199 stealFromPool.execute(task); 200 } 201 202 for (int i = 0; i < 2; i++) { 203 TestTask task = new TestTask(); 204 stealPool.execute(task); 205 } 206 207 stealJobCountDown.await(1, TimeUnit.SECONDS); 208 stealFromCountDown.await(1, TimeUnit.SECONDS); 209 assertEquals(0, stealFromCountDown.getCount()); 210 assertEquals(0, stealJobCountDown.getCount()); 211 } 212 213 class TestTask extends Thread implements Comparable<TestTask> { 214 @Override 215 public int compareTo(TestTask o) { 216 return 0; 217 } 218 219 @Override 220 public void run() { 221 try { 222 Thread.sleep(200); 223 } catch (InterruptedException e) { 224 e.printStackTrace(); 225 } 226 } 227 } 228 229}