001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.hamcrest.Matchers.lessThanOrEqualTo;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collection;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedList;
034import java.util.List;
035import java.util.Random;
036import java.util.concurrent.CompletableFuture;
037import java.util.concurrent.ConcurrentLinkedQueue;
038import java.util.concurrent.ThreadLocalRandom;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.stream.Collectors;
042import java.util.stream.IntStream;
043import org.apache.hadoop.hbase.HBaseClassTestRule;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.junit.ClassRule;
047import org.junit.Rule;
048import org.junit.Test;
049import org.junit.experimental.categories.Category;
050import org.junit.rules.TestName;
051
052/**
053 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
054 */
055@Category({ MasterTests.class, SmallTests.class })
056public class TestRegionNormalizerWorkQueue {
057
058  @ClassRule
059  public static final HBaseClassTestRule CLASS_RULE =
060    HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
061
062  @Rule
063  public TestName testName = new TestName();
064
065  @Test
066  public void testElementUniquenessAndFIFO() throws Exception {
067    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
068    final List<Integer> content = new LinkedList<>();
069    IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put);
070    assertEquals(4, queue.size());
071    while (queue.size() > 0) {
072      content.add(queue.take());
073    }
074    assertThat(content, contains(4, 3, 2, 1));
075
076    queue.clear();
077    queue.putAll(Arrays.asList(4, 3, 2, 1));
078    queue.putAll(Arrays.asList(4, 5));
079    assertEquals(5, queue.size());
080    content.clear();
081    while (queue.size() > 0) {
082      content.add(queue.take());
083    }
084    assertThat(content, contains(4, 3, 2, 1, 5));
085  }
086
087  @Test
088  public void testPriorityAndFIFO() throws Exception {
089    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
090    final List<Integer> content = new LinkedList<>();
091    queue.putAll(Arrays.asList(4, 3, 2, 1));
092    assertEquals(4, queue.size());
093    queue.putFirst(0);
094    assertEquals(5, queue.size());
095    drainTo(queue, content);
096    assertThat("putFirst items should jump the queue, preserving existing order", content,
097      contains(0, 4, 3, 2, 1));
098
099    queue.clear();
100    content.clear();
101    queue.putAll(Arrays.asList(4, 3, 2, 1));
102    queue.putFirst(1);
103    assertEquals(4, queue.size());
104    drainTo(queue, content);
105    assertThat("existing items re-added with putFirst should jump the queue", content,
106      contains(1, 4, 3, 2));
107
108    queue.clear();
109    content.clear();
110    queue.putAll(Arrays.asList(4, 3, 2, 1));
111    queue.putAllFirst(Arrays.asList(2, 3));
112    assertEquals(4, queue.size());
113    drainTo(queue, content);
114    assertThat(
115      "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
116      content, contains(2, 3, 4, 1));
117  }
118
119  private enum Action {
120    PUT,
121    PUT_FIRST,
122    PUT_ALL,
123    PUT_ALL_FIRST,
124  }
125
126  /**
127   * Test that the uniqueness constraint is honored in the face of concurrent modification.
128   */
129  @Test
130  public void testConcurrentPut() throws Exception {
131    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
132    final int maxValue = 100;
133    final Runnable producer = () -> {
134      final Random rand = ThreadLocalRandom.current();
135      for (int i = 0; i < 1_000; i++) {
136        final Action action = Action.values()[rand.nextInt(Action.values().length)];
137        switch (action) {
138          case PUT: {
139            final int val = rand.nextInt(maxValue);
140            queue.put(val);
141            break;
142          }
143          case PUT_FIRST: {
144            final int val = rand.nextInt(maxValue);
145            queue.putFirst(val);
146            break;
147          }
148          case PUT_ALL: {
149            final List<Integer> vals =
150              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
151            queue.putAll(vals);
152            break;
153          }
154          case PUT_ALL_FIRST: {
155            final List<Integer> vals =
156              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
157            queue.putAllFirst(vals);
158            break;
159          }
160          default:
161            fail("Unrecognized action " + action);
162        }
163      }
164    };
165
166    final int numThreads = 5;
167    final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
168      .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new);
169    CompletableFuture.allOf(futures).join();
170
171    final List<Integer> content = new ArrayList<>(queue.size());
172    drainTo(queue, content);
173    assertThat("at most `maxValue` items should be present.", content.size(),
174      lessThanOrEqualTo(maxValue));
175    assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
176  }
177
178  /**
179   * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
180   * producing thread places new entries onto the queue following a known schedule. The consuming
181   * thread collects a time measurement between calls to {@code take}. Finally, the test makes
182   * coarse-grained assertions of the consumer's observations based on the producer's schedule.
183   */
184  @Test
185  public void testTake() throws Exception {
186    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
187    final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
188    final AtomicBoolean finished = new AtomicBoolean(false);
189    final Runnable consumer = () -> {
190      try {
191        while (!finished.get()) {
192          queue.take();
193          takeTimes.add(System.nanoTime());
194        }
195      } catch (InterruptedException e) {
196        fail("interrupted.");
197      }
198    };
199
200    CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
201    final long testStart = System.nanoTime();
202    for (int i = 0; i < 5; i++) {
203      Thread.sleep(10);
204      queue.put(i);
205    }
206
207    // set finished = true and pipe one more value in case the thread needs an extra pass through
208    // the loop.
209    finished.set(true);
210    queue.put(1);
211    worker.get(1, TimeUnit.SECONDS);
212
213    final Iterator<Long> times = takeTimes.iterator();
214    assertTrue("should have timing information for at least 2 calls to take.",
215      takeTimes.size() >= 5);
216    for (int i = 0; i < 5; i++) {
217      assertThat(
218        "Observations collected in takeTimes should increase by roughly 10ms every interval",
219        times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
220    }
221  }
222
223  private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
224    throws InterruptedException {
225    assertThat(queue.size(), greaterThan(0));
226    while (queue.size() > 0) {
227      dest.add(queue.take());
228    }
229  }
230}