001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.CountDownLatch;
026import java.util.concurrent.ThreadPoolExecutor;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.hbase.testclassification.MiscTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.junit.jupiter.api.BeforeEach;
032import org.junit.jupiter.api.Tag;
033import org.junit.jupiter.api.Test;
034
035@Tag(MiscTests.TAG)
036@Tag(SmallTests.TAG)
037public class TestStealJobQueue {
038
039  StealJobQueue<Integer> stealJobQueue;
040  BlockingQueue<Integer> stealFromQueue;
041
042  @BeforeEach
043  public void setup() {
044    stealJobQueue = new StealJobQueue<>(Integer::compare);
045    stealFromQueue = stealJobQueue.getStealFromQueue();
046
047  }
048
049  @Test
050  public void testTake() throws InterruptedException {
051    stealJobQueue.offer(3);
052    stealFromQueue.offer(10);
053    stealJobQueue.offer(15);
054    stealJobQueue.offer(4);
055    assertEquals(3, stealJobQueue.take().intValue());
056    assertEquals(4, stealJobQueue.take().intValue());
057    assertEquals(15, stealJobQueue.take().intValue(),
058      "always take from the main queue before trying to steal");
059    assertEquals(10, stealJobQueue.take().intValue());
060    assertTrue(stealFromQueue.isEmpty());
061    assertTrue(stealJobQueue.isEmpty());
062  }
063
064  @Test
065  public void testOfferInStealQueueFromShouldUnblock() throws InterruptedException {
066    final AtomicInteger taken = new AtomicInteger();
067    Thread consumer = new Thread() {
068      @Override
069      public void run() {
070        try {
071          Integer n = stealJobQueue.take();
072          taken.set(n);
073        } catch (InterruptedException e) {
074          e.printStackTrace();
075        }
076      }
077    };
078    consumer.start();
079    stealFromQueue.offer(3);
080    consumer.join(1000);
081    assertEquals(3, taken.get());
082    consumer.interrupt(); // Ensure the consumer thread will stop.
083  }
084
085  @Test
086  public void testOfferInStealJobQueueShouldUnblock() throws InterruptedException {
087    final AtomicInteger taken = new AtomicInteger();
088    Thread consumer = new Thread() {
089      @Override
090      public void run() {
091        try {
092          Integer n = stealJobQueue.take();
093          taken.set(n);
094        } catch (InterruptedException e) {
095          e.printStackTrace();
096        }
097      }
098    };
099    consumer.start();
100    stealJobQueue.offer(3);
101    consumer.join(1000);
102    assertEquals(3, taken.get());
103    consumer.interrupt(); // Ensure the consumer thread will stop.
104  }
105
106  @Test
107  public void testPoll() throws InterruptedException {
108    stealJobQueue.offer(3);
109    stealFromQueue.offer(10);
110    stealJobQueue.offer(15);
111    stealJobQueue.offer(4);
112    assertEquals(3, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
113    assertEquals(4, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
114    assertEquals(15, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue(),
115      "always take from the main queue before trying to steal");
116    assertEquals(10, stealJobQueue.poll(1, TimeUnit.SECONDS).intValue());
117    assertTrue(stealFromQueue.isEmpty());
118    assertTrue(stealJobQueue.isEmpty());
119    assertNull(stealJobQueue.poll(10, TimeUnit.MILLISECONDS));
120  }
121
122  @Test
123  public void testPutInStealQueueFromShouldUnblockPoll() throws InterruptedException {
124    final AtomicInteger taken = new AtomicInteger();
125    Thread consumer = new Thread() {
126      @Override
127      public void run() {
128        try {
129          Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
130          taken.set(n);
131        } catch (InterruptedException e) {
132          e.printStackTrace();
133        }
134      }
135    };
136    consumer.start();
137    stealFromQueue.put(3);
138    consumer.join(1000);
139    assertEquals(3, taken.get());
140    consumer.interrupt(); // Ensure the consumer thread will stop.
141
142  }
143
144  @Test
145  public void testAddInStealJobQueueShouldUnblockPoll() throws InterruptedException {
146    final AtomicInteger taken = new AtomicInteger();
147    Thread consumer = new Thread() {
148      @Override
149      public void run() {
150        try {
151          Integer n = stealJobQueue.poll(3, TimeUnit.SECONDS);
152          taken.set(n);
153        } catch (InterruptedException e) {
154          e.printStackTrace();
155        }
156      }
157    };
158    consumer.start();
159    stealJobQueue.add(3);
160    consumer.join(1000);
161    assertEquals(3, taken.get());
162    consumer.interrupt(); // Ensure the consumer thread will stop.
163  }
164
165  @Test
166  public void testInteractWithThreadPool() throws InterruptedException {
167    StealJobQueue<Runnable> stealTasksQueue =
168      new StealJobQueue<>((r1, r2) -> ((TestTask) r1).compareTo((TestTask) r2));
169    final CountDownLatch stealJobCountDown = new CountDownLatch(3);
170    final CountDownLatch stealFromCountDown = new CountDownLatch(3);
171    ThreadPoolExecutor stealPool = new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue) {
172      @Override
173      protected void afterExecute(Runnable r, Throwable t) {
174        super.afterExecute(r, t);
175        stealJobCountDown.countDown();
176      }
177
178    };
179
180    // This is necessary otherwise no worker will be running and stealing job
181    stealPool.prestartAllCoreThreads();
182
183    ThreadPoolExecutor stealFromPool =
184      new ThreadPoolExecutor(3, 3, 1, TimeUnit.DAYS, stealTasksQueue.getStealFromQueue()) {
185        @Override
186        protected void afterExecute(Runnable r, Throwable t) {
187          super.afterExecute(r, t);
188          stealFromCountDown.countDown();
189        }
190      };
191
192    for (int i = 0; i < 4; i++) {
193      TestTask task = new TestTask();
194      stealFromPool.execute(task);
195    }
196
197    for (int i = 0; i < 2; i++) {
198      TestTask task = new TestTask();
199      stealPool.execute(task);
200    }
201
202    stealJobCountDown.await(1, TimeUnit.SECONDS);
203    stealFromCountDown.await(1, TimeUnit.SECONDS);
204    assertEquals(0, stealFromCountDown.getCount());
205    assertEquals(0, stealJobCountDown.getCount());
206  }
207
208  class TestTask extends Thread implements Comparable<TestTask> {
209    @Override
210    public int compareTo(TestTask o) {
211      return 0;
212    }
213
214    @Override
215    public void run() {
216      try {
217        Thread.sleep(200);
218      } catch (InterruptedException e) {
219        e.printStackTrace();
220      }
221    }
222  }
223
224}