001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.hamcrest.Matchers.lessThanOrEqualTo;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ConcurrentLinkedQueue;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.stream.Collectors;
041import java.util.stream.IntStream;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.testclassification.MasterTests;
044import org.apache.hadoop.hbase.testclassification.SmallTests;
045import org.junit.ClassRule;
046import org.junit.Rule;
047import org.junit.Test;
048import org.junit.experimental.categories.Category;
049import org.junit.rules.TestName;
050
051/**
052 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
053 */
054@Category({ MasterTests.class, SmallTests.class})
055public class TestRegionNormalizerWorkQueue {
056
057  @ClassRule
058  public static final HBaseClassTestRule CLASS_RULE =
059    HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
060
061  @Rule
062  public TestName testName = new TestName();
063
064  @Test
065  public void testElementUniquenessAndFIFO() throws Exception {
066    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
067    final List<Integer> content = new LinkedList<>();
068    IntStream.of(4, 3, 2, 1, 4, 3, 2, 1)
069      .boxed()
070      .forEach(queue::put);
071    assertEquals(4, queue.size());
072    while (queue.size() > 0) {
073      content.add(queue.take());
074    }
075    assertThat(content, contains(4, 3, 2, 1));
076
077    queue.clear();
078    queue.putAll(Arrays.asList(4, 3, 2, 1));
079    queue.putAll(Arrays.asList(4, 5));
080    assertEquals(5, queue.size());
081    content.clear();
082    while (queue.size() > 0) {
083      content.add(queue.take());
084    }
085    assertThat(content, contains(4, 3, 2, 1, 5));
086  }
087
088  @Test
089  public void testPriorityAndFIFO() throws Exception {
090    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
091    final List<Integer> content = new LinkedList<>();
092    queue.putAll(Arrays.asList(4, 3, 2, 1));
093    assertEquals(4, queue.size());
094    queue.putFirst(0);
095    assertEquals(5, queue.size());
096    drainTo(queue, content);
097    assertThat("putFirst items should jump the queue, preserving existing order",
098      content, contains(0, 4, 3, 2, 1));
099
100    queue.clear();
101    content.clear();
102    queue.putAll(Arrays.asList(4, 3, 2, 1));
103    queue.putFirst(1);
104    assertEquals(4, queue.size());
105    drainTo(queue, content);
106    assertThat("existing items re-added with putFirst should jump the queue",
107      content, contains(1, 4, 3, 2));
108
109    queue.clear();
110    content.clear();
111    queue.putAll(Arrays.asList(4, 3, 2, 1));
112    queue.putAllFirst(Arrays.asList(2, 3));
113    assertEquals(4, queue.size());
114    drainTo(queue, content);
115    assertThat(
116      "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
117      content, contains(2, 3, 4, 1));
118  }
119
120  private enum Action {
121    PUT,
122    PUT_FIRST,
123    PUT_ALL,
124    PUT_ALL_FIRST,
125  }
126
127  /**
128   * Test that the uniqueness constraint is honored in the face of concurrent modification.
129   */
130  @Test
131  public void testConcurrentPut() throws Exception {
132    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
133    final int maxValue = 100;
134    final Runnable producer = () -> {
135      final Random rand = ThreadLocalRandom.current();
136      for (int i = 0; i < 1_000; i++) {
137        final Action action = Action.values()[rand.nextInt(Action.values().length)];
138        switch (action) {
139          case PUT: {
140            final int val = rand.nextInt(maxValue);
141            queue.put(val);
142            break;
143          }
144          case PUT_FIRST: {
145            final int val = rand.nextInt(maxValue);
146            queue.putFirst(val);
147            break;
148          }
149          case PUT_ALL: {
150            final List<Integer> vals = rand.ints(5, 0, maxValue)
151              .boxed()
152              .collect(Collectors.toList());
153            queue.putAll(vals);
154            break;
155          }
156          case PUT_ALL_FIRST: {
157            final List<Integer> vals = rand.ints(5, 0, maxValue)
158              .boxed()
159              .collect(Collectors.toList());
160            queue.putAllFirst(vals);
161            break;
162          }
163          default:
164            fail("Unrecognized action " + action);
165        }
166      }
167    };
168
169    final int numThreads = 5;
170    final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
171      .mapToObj(val -> CompletableFuture.runAsync(producer))
172      .toArray(CompletableFuture<?>[]::new);
173    CompletableFuture.allOf(futures).join();
174
175    final List<Integer> content = new ArrayList<>(queue.size());
176    drainTo(queue, content);
177    assertThat("at most `maxValue` items should be present.",
178      content.size(), lessThanOrEqualTo(maxValue));
179    assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
180  }
181
182  /**
183   * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
184   * producing thread places new entries onto the queue following a known schedule. The consuming
185   * thread collects a time measurement between calls to {@code take}. Finally, the test makes
186   * coarse-grained assertions of the consumer's observations based on the producer's schedule.
187   */
188  @Test
189  public void testTake() throws Exception {
190    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
191    final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
192    final AtomicBoolean finished = new AtomicBoolean(false);
193    final Runnable consumer = () -> {
194      try {
195        while (!finished.get()) {
196          queue.take();
197          takeTimes.add(System.nanoTime());
198        }
199      } catch (InterruptedException e) {
200        fail("interrupted.");
201      }
202    };
203
204    CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
205    final long testStart = System.nanoTime();
206    for (int i = 0; i < 5; i++) {
207      Thread.sleep(10);
208      queue.put(i);
209    }
210
211    // set finished = true and pipe one more value in case the thread needs an extra pass through
212    // the loop.
213    finished.set(true);
214    queue.put(1);
215    worker.get(1, TimeUnit.SECONDS);
216
217    final Iterator<Long> times = takeTimes.iterator();
218    assertTrue("should have timing information for at least 2 calls to take.",
219      takeTimes.size() >= 5);
220    for (int i = 0; i < 5; i++) {
221      assertThat(
222        "Observations collected in takeTimes should increase by roughly 10ms every interval",
223        times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
224    }
225  }
226
227  private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
228    throws InterruptedException {
229    assertThat(queue.size(), greaterThan(0));
230    while (queue.size() > 0) {
231      dest.add(queue.take());
232    }
233  }
234}