001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.Iterator;
027import java.util.List;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.CompletionException;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.atomic.AtomicInteger;
032import org.apache.hadoop.hbase.testclassification.MiscTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.apache.hadoop.hbase.util.PoolMap.PoolType;
035import org.junit.jupiter.api.Tag;
036import org.junit.jupiter.api.Test;
037
038@Tag(MiscTests.TAG)
039@Tag(SmallTests.TAG)
040public class TestRoundRobinPoolMap extends PoolMapTestBase {
041
042  @Override
043  protected PoolType getPoolType() {
044    return PoolType.RoundRobin;
045  }
046
047  @Test
048  public void testGetOrCreate() throws IOException {
049    String key = "key";
050    String value = "value";
051    String result = poolMap.getOrCreate(key, () -> value);
052
053    assertEquals(value, result);
054    assertEquals(1, poolMap.values().size());
055  }
056
057  @Test
058  public void testMultipleKeys() throws IOException {
059    for (int i = 0; i < KEY_COUNT; i++) {
060      String key = Integer.toString(i);
061      String value = Integer.toString(2 * i);
062      String result = poolMap.getOrCreate(key, () -> value);
063
064      assertEquals(value, result);
065    }
066
067    assertEquals(KEY_COUNT, poolMap.values().size());
068  }
069
070  @Test
071  public void testMultipleValues() throws IOException {
072    String key = "key";
073
074    for (int i = 0; i < POOL_SIZE; i++) {
075      String value = Integer.toString(i);
076      String result = poolMap.getOrCreate(key, () -> value);
077
078      assertEquals(value, result);
079    }
080
081    assertEquals(POOL_SIZE, poolMap.values().size());
082  }
083
084  @Test
085  public void testRoundRobin() throws IOException {
086    String key = "key";
087
088    for (int i = 0; i < POOL_SIZE; i++) {
089      String value = Integer.toString(i);
090      poolMap.getOrCreate(key, () -> value);
091    }
092
093    assertEquals(POOL_SIZE, poolMap.values().size());
094
095    /* pool is filled, get() should return elements round robin order */
096    for (int i = 0; i < 2 * POOL_SIZE; i++) {
097      String expected = Integer.toString(i % POOL_SIZE);
098      assertEquals(expected, poolMap.getOrCreate(key, () -> {
099        throw new IOException("must not call me");
100      }));
101    }
102
103    assertEquals(POOL_SIZE, poolMap.values().size());
104  }
105
106  @Test
107  public void testMultiThreadedRoundRobin() throws ExecutionException, InterruptedException {
108    String key = "key";
109    AtomicInteger id = new AtomicInteger();
110    List<String> results = Collections.synchronizedList(new ArrayList<>());
111
112    Runnable runnable = () -> {
113      try {
114        for (int i = 0; i < POOL_SIZE; i++) {
115          String value = Integer.toString(id.getAndIncrement());
116          String result = poolMap.getOrCreate(key, () -> value);
117          results.add(result);
118          // Sleep for a short time to ensure a yield. Thread#yield has platform dependent behavior.
119          Thread.sleep(10);
120        }
121      } catch (Exception e) {
122        throw new CompletionException(e);
123      }
124    };
125
126    CompletableFuture<Void> future1 = CompletableFuture.runAsync(runnable);
127    CompletableFuture<Void> future2 = CompletableFuture.runAsync(runnable);
128
129    /* test for successful completion */
130    future1.get();
131    future2.get();
132
133    assertEquals(POOL_SIZE, poolMap.values().size());
134
135    /* check every elements occur twice */
136    Collections.sort(results);
137    Iterator<String> iterator = results.iterator();
138
139    for (int i = 0; i < POOL_SIZE; i++) {
140      String next1 = iterator.next();
141      String next2 = iterator.next();
142      assertEquals(next1, next2);
143    }
144
145    assertFalse(iterator.hasNext());
146  }
147}