001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.testclassification.MiscTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.junit.Before;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036
037@Category({MiscTests.class, SmallTests.class})
038public class TestStealJobQueue {
039
040  @ClassRule
041  public static final HBaseClassTestRule CLASS_RULE =
042      HBaseClassTestRule.forClass(TestStealJobQueue.class);
043
044  StealJobQueue<Integer> stealJobQueue;
045  BlockingQueue<Integer> stealFromQueue;
046
047  @Before
048  public void setup() {
049    stealJobQueue = new StealJobQueue<>(Integer::compare);
050    stealFromQueue = stealJobQueue.getStealFromQueue();
051
052  }
053
054
055  @Test
056  public void testTake() throws InterruptedException {
057    stealJobQueue.offer(3);
058    stealFromQueue.offer(10);
059    stealJobQueue.offer(15);
060    stealJobQueue.offer(4);
061    assertEquals(3, stealJobQueue.take().intValue());
062    assertEquals(4, stealJobQueue.take().intValue());
063    assertEquals("always take from the main queue before trying to steal", 15,
064            stealJobQueue.take().intValue());
065    assertEquals(10, stealJobQueue.take().intValue());
066    assertTrue(stealFromQueue.isEmpty());
067    assertTrue(stealJobQueue.isEmpty());
068  }
069
070  @Test
071  public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException {
072    final AtomicInteger taken = new AtomicInteger();
073    Thread consumer = new Thread() {
074      @Override
075      public void run() {
076        try {
077          Integer n = stealJobQueue.take();
078          taken.set(n);
079        } catch (InterruptedException e) {
080          e.printStackTrace();
081        }
082      }
083    };
084    consumer.start();
085    stealFromQueue.offer(3);
086    consumer.join(1000);
087    assertEquals(3, taken.get());
088    consumer.interrupt(); //Ensure the consumer thread will stop.
089  }
090
091
092  @Test
093  public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException {
094    final AtomicInteger taken = new AtomicInteger();
095    Thread consumer = new Thread() {
096      @Override
097      public void run() {
098        try {
099          Integer n = stealJobQueue.take();
100          taken.set(n);
101        } catch (InterruptedException e) {
102          e.printStackTrace();
103        }
104      }
105    };
106    consumer.start();
107    stealJobQueue.offer(3);
108    consumer.join(1000);
109    assertEquals(3, taken.get());
110    consumer.interrupt(); //Ensure the consumer thread will stop.
111  }
112
113
114  @Test
115  public void testPoll() throws InterruptedException {
116    stealJobQueue.offer(3);
117    stealFromQueue.offer(10);
118    stealJobQueue.offer(15);
119    stealJobQueue.offer(4);
120    assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
121    assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
122    assertEquals("always take from the main queue before trying to steal", 15,
123            stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
124    assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
125    assertTrue(stealFromQueue.isEmpty());
126    assertTrue(stealJobQueue.isEmpty());
127    assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS));
128  }
129
130  @Test
131  public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException {
132    final AtomicInteger taken = new AtomicInteger();
133    Thread consumer = new Thread() {
134      @Override
135      public void run() {
136        try {
137          Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
138          taken.set(n);
139        } catch (InterruptedException e) {
140          e.printStackTrace();
141        }
142      }
143    };
144    consumer.start();
145    stealFromQueue.put(3);
146    consumer.join(1000);
147    assertEquals(3, taken.get());
148    consumer.interrupt(); //Ensure the consumer thread will stop.
149
150  }
151
152
153  @Test
154  public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException {
155    final AtomicInteger taken = new AtomicInteger();
156    Thread consumer = new Thread() {
157      @Override
158      public void run() {
159        try {
160          Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
161          taken.set(n);
162        } catch (InterruptedException e) {
163          e.printStackTrace();
164        }
165      }
166    };
167    consumer.start();
168    stealJobQueue.add(3);
169    consumer.join(1000);
170    assertEquals(3, taken.get());
171    consumer.interrupt(); //Ensure the consumer thread will stop.
172  }
173
174
175  @Test
176  public void testInteractWithThreadPool() throws InterruptedException {
177    StealJobQueue<Runnable> stealTasksQueue =
178        new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2));
179    final CountDownLatch stealJobCountDown = new CountDownLatch(3);
180    final CountDownLatch stealFromCountDown = new CountDownLatch(3);
181    ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {
182      @Override
183      protected void afterExecute(Runnable r, Throwable t) {
184        super.afterExecute(r, t);
185        stealJobCountDown.countDown();
186      }
187
188    };
189
190    //This is necessary otherwise no worker will be running and stealing job
191    stealPool.prestartAllCoreThreads();
192
193    ThreadPoolExecutor stealFromPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS,
194            stealTasksQueue.getStealFromQueue()) {
195      @Override
196      protected void afterExecute(Runnable r, Throwable t) {
197        super.afterExecute(r, t);
198        stealFromCountDown.countDown();
199      }
200    };
201
202    for (int i = 0; i < 4; i++) {
203      TestTask task = new TestTask();
204      stealFromPool.execute(task);
205    }
206
207    for (int i = 0; i < 2; i++) {
208      TestTask task = new TestTask();
209      stealPool.execute(task);
210    }
211
212    stealJobCountDown.await(1, TimeUnit.SECONDS);
213    stealFromCountDown.await(1, TimeUnit.SECONDS);
214    assertEquals(0, stealFromCountDown.getCount());
215    assertEquals(0, stealJobCountDown.getCount());
216  }
217
218  class TestTask extends Thread implements Comparable<TestTask> {
219    @Override
220    public int compareTo(TestTask o) {
221      return 0;
222    }
223
224    @Override
225    public void run() {
226      try {
227        Thread.sleep(200);
228      } catch (InterruptedException e) {
229        e.printStackTrace();
230      }
231    }
232  }
233
234}