001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.hamcrest.Matchers.lessThanOrEqualTo;
024import static org.junit.Assert.assertEquals;
025import static org.junit.Assert.fail;
026
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ConcurrentLinkedQueue;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.stream.Collectors;
041import java.util.stream.IntStream;
042import org.apache.hadoop.hbase.HBaseClassTestRule;
043import org.apache.hadoop.hbase.HBaseConfiguration;
044import org.apache.hadoop.hbase.Waiter;
045import org.apache.hadoop.hbase.testclassification.MasterTests;
046import org.apache.hadoop.hbase.testclassification.SmallTests;
047import org.junit.ClassRule;
048import org.junit.Rule;
049import org.junit.Test;
050import org.junit.experimental.categories.Category;
051import org.junit.rules.TestName;
052
053/**
054 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
055 */
056@Category({ MasterTests.class, SmallTests.class })
057public class TestRegionNormalizerWorkQueue {
058
059  @ClassRule
060  public static final HBaseClassTestRule CLASS_RULE =
061    HBaseClassTestRule.forClass(TestRegionNormalizerWorkQueue.class);
062
063  @Rule
064  public TestName testName = new TestName();
065
066  @Test
067  public void testElementUniquenessAndFIFO() throws Exception {
068    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
069    final List<Integer> content = new LinkedList<>();
070    IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put);
071    assertEquals(4, queue.size());
072    while (queue.size() > 0) {
073      content.add(queue.take());
074    }
075    assertThat(content, contains(4, 3, 2, 1));
076
077    queue.clear();
078    queue.putAll(Arrays.asList(4, 3, 2, 1));
079    queue.putAll(Arrays.asList(4, 5));
080    assertEquals(5, queue.size());
081    content.clear();
082    while (queue.size() > 0) {
083      content.add(queue.take());
084    }
085    assertThat(content, contains(4, 3, 2, 1, 5));
086  }
087
088  @Test
089  public void testPriorityAndFIFO() throws Exception {
090    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
091    final List<Integer> content = new LinkedList<>();
092    queue.putAll(Arrays.asList(4, 3, 2, 1));
093    assertEquals(4, queue.size());
094    queue.putFirst(0);
095    assertEquals(5, queue.size());
096    drainTo(queue, content);
097    assertThat("putFirst items should jump the queue, preserving existing order", content,
098      contains(0, 4, 3, 2, 1));
099
100    queue.clear();
101    content.clear();
102    queue.putAll(Arrays.asList(4, 3, 2, 1));
103    queue.putFirst(1);
104    assertEquals(4, queue.size());
105    drainTo(queue, content);
106    assertThat("existing items re-added with putFirst should jump the queue", content,
107      contains(1, 4, 3, 2));
108
109    queue.clear();
110    content.clear();
111    queue.putAll(Arrays.asList(4, 3, 2, 1));
112    queue.putAllFirst(Arrays.asList(2, 3));
113    assertEquals(4, queue.size());
114    drainTo(queue, content);
115    assertThat(
116      "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
117      content, contains(2, 3, 4, 1));
118  }
119
120  private enum Action {
121    PUT,
122    PUT_FIRST,
123    PUT_ALL,
124    PUT_ALL_FIRST,
125  }
126
127  /**
128   * Test that the uniqueness constraint is honored in the face of concurrent modification.
129   */
130  @Test
131  public void testConcurrentPut() throws Exception {
132    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
133    final int maxValue = 100;
134    final Runnable producer = () -> {
135      final Random rand = ThreadLocalRandom.current();
136      for (int i = 0; i < 1_000; i++) {
137        final Action action = Action.values()[rand.nextInt(Action.values().length)];
138        switch (action) {
139          case PUT: {
140            final int val = rand.nextInt(maxValue);
141            queue.put(val);
142            break;
143          }
144          case PUT_FIRST: {
145            final int val = rand.nextInt(maxValue);
146            queue.putFirst(val);
147            break;
148          }
149          case PUT_ALL: {
150            final List<Integer> vals =
151              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
152            queue.putAll(vals);
153            break;
154          }
155          case PUT_ALL_FIRST: {
156            final List<Integer> vals =
157              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
158            queue.putAllFirst(vals);
159            break;
160          }
161          default:
162            fail("Unrecognized action " + action);
163        }
164      }
165    };
166
167    final int numThreads = 5;
168    final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
169      .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new);
170    CompletableFuture.allOf(futures).join();
171
172    final List<Integer> content = new ArrayList<>(queue.size());
173    drainTo(queue, content);
174    assertThat("at most `maxValue` items should be present.", content.size(),
175      lessThanOrEqualTo(maxValue));
176    assertEquals("all items should be unique.", content.size(), new HashSet<>(content).size());
177  }
178
179  /**
180   * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
181   * producing thread places new entries onto the queue following a known schedule. The consuming
182   * thread collects a time measurement between calls to {@code take}. Finally, the test makes
183   * coarse-grained assertions of the consumer's observations based on the producer's schedule.
184   */
185  @Test
186  public void testTake() throws Exception {
187    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
188    final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
189    final AtomicBoolean finished = new AtomicBoolean(false);
190    final int count = 5;
191    final Runnable consumer = () -> {
192      try {
193        while (!finished.get()) {
194          queue.take();
195          takeTimes.add(System.nanoTime());
196        }
197      } catch (InterruptedException e) {
198        fail("interrupted.");
199      }
200    };
201
202    CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
203    final long testStart = System.nanoTime();
204    for (int i = 0; i < count; i++) {
205      Thread.sleep(10);
206      queue.put(i);
207    }
208    // should have timing information for 5 calls to take.
209    Waiter.waitFor(HBaseConfiguration.create(), 1000, () -> count == takeTimes.size());
210    // set finished = true and pipe one more value in case the thread needs an extra pass through
211    // the loop.
212    finished.set(true);
213    queue.put(1);
214    worker.get(1, TimeUnit.SECONDS);
215
216    final Iterator<Long> times = takeTimes.iterator();
217    for (int i = 0; i < count; i++) {
218      assertThat(
219        "Observations collected in takeTimes should increase by roughly 10ms every interval",
220        times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
221    }
222  }
223
224  private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
225    throws InterruptedException {
226    assertThat(queue.size(), greaterThan(0));
227    while (queue.size() > 0) {
228      dest.add(queue.take());
229    }
230  }
231}