001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static org.hamcrest.MatcherAssert.assertThat;
021import static org.hamcrest.Matchers.contains;
022import static org.hamcrest.Matchers.greaterThan;
023import static org.hamcrest.Matchers.lessThanOrEqualTo;
024import static org.junit.jupiter.api.Assertions.assertEquals;
025import static org.junit.jupiter.api.Assertions.fail;
026
027import java.util.ArrayList;
028import java.util.Arrays;
029import java.util.Collection;
030import java.util.HashSet;
031import java.util.Iterator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Random;
035import java.util.concurrent.CompletableFuture;
036import java.util.concurrent.ConcurrentLinkedQueue;
037import java.util.concurrent.ThreadLocalRandom;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicBoolean;
040import java.util.stream.Collectors;
041import java.util.stream.IntStream;
042import org.apache.hadoop.hbase.HBaseConfiguration;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.testclassification.MasterTests;
045import org.apache.hadoop.hbase.testclassification.SmallTests;
046import org.junit.jupiter.api.Tag;
047import org.junit.jupiter.api.Test;
048
049/**
050 * Tests that {@link RegionNormalizerWorkQueue} implements the contract described in its docstring.
051 */
052@Tag(MasterTests.TAG)
053@Tag(SmallTests.TAG)
054public class TestRegionNormalizerWorkQueue {
055
056  @Test
057  public void testElementUniquenessAndFIFO() throws Exception {
058    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
059    final List<Integer> content = new LinkedList<>();
060    IntStream.of(4, 3, 2, 1, 4, 3, 2, 1).boxed().forEach(queue::put);
061    assertEquals(4, queue.size());
062    while (queue.size() > 0) {
063      content.add(queue.take());
064    }
065    assertThat(content, contains(4, 3, 2, 1));
066
067    queue.clear();
068    queue.putAll(Arrays.asList(4, 3, 2, 1));
069    queue.putAll(Arrays.asList(4, 5));
070    assertEquals(5, queue.size());
071    content.clear();
072    while (queue.size() > 0) {
073      content.add(queue.take());
074    }
075    assertThat(content, contains(4, 3, 2, 1, 5));
076  }
077
078  @Test
079  public void testPriorityAndFIFO() throws Exception {
080    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
081    final List<Integer> content = new LinkedList<>();
082    queue.putAll(Arrays.asList(4, 3, 2, 1));
083    assertEquals(4, queue.size());
084    queue.putFirst(0);
085    assertEquals(5, queue.size());
086    drainTo(queue, content);
087    assertThat("putFirst items should jump the queue, preserving existing order", content,
088      contains(0, 4, 3, 2, 1));
089
090    queue.clear();
091    content.clear();
092    queue.putAll(Arrays.asList(4, 3, 2, 1));
093    queue.putFirst(1);
094    assertEquals(4, queue.size());
095    drainTo(queue, content);
096    assertThat("existing items re-added with putFirst should jump the queue", content,
097      contains(1, 4, 3, 2));
098
099    queue.clear();
100    content.clear();
101    queue.putAll(Arrays.asList(4, 3, 2, 1));
102    queue.putAllFirst(Arrays.asList(2, 3));
103    assertEquals(4, queue.size());
104    drainTo(queue, content);
105    assertThat(
106      "existing items re-added with putAllFirst jump the queue AND honor changes in priority",
107      content, contains(2, 3, 4, 1));
108  }
109
110  private enum Action {
111    PUT,
112    PUT_FIRST,
113    PUT_ALL,
114    PUT_ALL_FIRST,
115  }
116
117  /**
118   * Test that the uniqueness constraint is honored in the face of concurrent modification.
119   */
120  @Test
121  public void testConcurrentPut() throws Exception {
122    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
123    final int maxValue = 100;
124    final Runnable producer = () -> {
125      final Random rand = ThreadLocalRandom.current();
126      for (int i = 0; i < 1_000; i++) {
127        final Action action = Action.values()[rand.nextInt(Action.values().length)];
128        switch (action) {
129          case PUT: {
130            final int val = rand.nextInt(maxValue);
131            queue.put(val);
132            break;
133          }
134          case PUT_FIRST: {
135            final int val = rand.nextInt(maxValue);
136            queue.putFirst(val);
137            break;
138          }
139          case PUT_ALL: {
140            final List<Integer> vals =
141              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
142            queue.putAll(vals);
143            break;
144          }
145          case PUT_ALL_FIRST: {
146            final List<Integer> vals =
147              rand.ints(5, 0, maxValue).boxed().collect(Collectors.toList());
148            queue.putAllFirst(vals);
149            break;
150          }
151          default:
152            fail("Unrecognized action " + action);
153        }
154      }
155    };
156
157    final int numThreads = 5;
158    final CompletableFuture<?>[] futures = IntStream.range(0, numThreads)
159      .mapToObj(val -> CompletableFuture.runAsync(producer)).toArray(CompletableFuture<?>[]::new);
160    CompletableFuture.allOf(futures).join();
161
162    final List<Integer> content = new ArrayList<>(queue.size());
163    drainTo(queue, content);
164    assertThat("at most `maxValue` items should be present.", content.size(),
165      lessThanOrEqualTo(maxValue));
166    assertEquals(content.size(), new HashSet<>(content).size(), "all items should be unique.");
167  }
168
169  /**
170   * Test that calls to {@link RegionNormalizerWorkQueue#take()} block the requesting thread. The
171   * producing thread places new entries onto the queue following a known schedule. The consuming
172   * thread collects a time measurement between calls to {@code take}. Finally, the test makes
173   * coarse-grained assertions of the consumer's observations based on the producer's schedule.
174   */
175  @Test
176  public void testTake() throws Exception {
177    final RegionNormalizerWorkQueue<Integer> queue = new RegionNormalizerWorkQueue<>();
178    final ConcurrentLinkedQueue<Long> takeTimes = new ConcurrentLinkedQueue<>();
179    final AtomicBoolean finished = new AtomicBoolean(false);
180    final int count = 5;
181    final Runnable consumer = () -> {
182      try {
183        while (!finished.get()) {
184          queue.take();
185          takeTimes.add(System.nanoTime());
186        }
187      } catch (InterruptedException e) {
188        fail("interrupted.");
189      }
190    };
191
192    CompletableFuture<Void> worker = CompletableFuture.runAsync(consumer);
193    final long testStart = System.nanoTime();
194    for (int i = 0; i < count; i++) {
195      Thread.sleep(10);
196      queue.put(i);
197    }
198    // should have timing information for 5 calls to take.
199    Waiter.waitFor(HBaseConfiguration.create(), 1000, () -> count == takeTimes.size());
200    // set finished = true and pipe one more value in case the thread needs an extra pass through
201    // the loop.
202    finished.set(true);
203    queue.put(1);
204    worker.get(1, TimeUnit.SECONDS);
205
206    final Iterator<Long> times = takeTimes.iterator();
207    for (int i = 0; i < count; i++) {
208      assertThat(
209        "Observations collected in takeTimes should increase by roughly 10ms every interval",
210        times.next(), greaterThan(testStart + TimeUnit.MILLISECONDS.toNanos(i * 10)));
211    }
212  }
213
214  private static <E> void drainTo(final RegionNormalizerWorkQueue<E> queue, Collection<E> dest)
215    throws InterruptedException {
216    assertThat(queue.size(), greaterThan(0));
217    while (queue.size() > 0) {
218      dest.add(queue.take());
219    }
220  }
221}