001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.util.concurrent.ThreadLocalRandom;
023import org.apache.hadoop.hbase.testclassification.MediumTests;
024import org.apache.hadoop.hbase.testclassification.RegionServerTests;
025import org.junit.jupiter.api.Tag;
026import org.junit.jupiter.api.Test;
027
028@Tag(RegionServerTests.TAG)
029@Tag(MediumTests.TAG)
030public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker {
031
032  private static final int NUM_KEYS = 8000000;
033  private static final int NUM_OF_THREADS = 20;
034
035  @Override
036  protected TimeRangeTracker getTimeRangeTracker() {
037    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
038  }
039
040  @Override
041  protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
042    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max);
043  }
044
045  /**
046   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range.
047   * Here we do ten threads each incrementing over 100k at an offset of the thread index; max is 10
048   * * 10k and min is 0.
049   */
050  @Test
051  public void testArriveAtRightAnswer() throws InterruptedException {
052    final TimeRangeTracker trr = getTimeRangeTracker();
053    final int threadCount = 10;
054    final int calls = 1000 * 1000;
055    Thread[] threads = new Thread[threadCount];
056    for (int i = 0; i < threads.length; i++) {
057      Thread t = new Thread("" + i) {
058        @Override
059        public void run() {
060          int offset = Integer.parseInt(getName());
061          boolean even = offset % 2 == 0;
062          if (even) {
063            for (int i = (offset * calls); i < calls; i++) {
064              trr.includeTimestamp(i);
065            }
066          } else {
067            int base = offset * calls;
068            for (int i = base + calls; i >= base; i--) {
069              trr.includeTimestamp(i);
070            }
071          }
072        }
073      };
074      t.start();
075      threads[i] = t;
076    }
077    for (int i = 0; i < threads.length; i++) {
078      threads[i].join();
079    }
080
081    assertEquals(calls * threadCount, trr.getMax());
082    assertEquals(0, trr.getMin());
083  }
084
085  static class RandomTestData {
086    private long[] keys = new long[NUM_KEYS];
087    private long min = Long.MAX_VALUE;
088    private long max = 0;
089
090    public RandomTestData() {
091      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
092        for (int i = 0; i < NUM_KEYS; i++) {
093          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
094          if (keys[i] < min) {
095            min = keys[i];
096          }
097          if (keys[i] > max) {
098            max = keys[i];
099          }
100        }
101      } else {
102        for (int i = NUM_KEYS - 1; i >= 0; i--) {
103          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
104          if (keys[i] < min) {
105            min = keys[i];
106          }
107          if (keys[i] > max) {
108            max = keys[i];
109          }
110        }
111      }
112    }
113
114    public long getMax() {
115      return this.max;
116    }
117
118    public long getMin() {
119      return this.min;
120    }
121  }
122
123  static class TrtUpdateRunnable implements Runnable {
124
125    private TimeRangeTracker trt;
126    private RandomTestData data;
127
128    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
129      this.trt = trt;
130      this.data = data;
131    }
132
133    @Override
134    public void run() {
135      for (long key : data.keys) {
136        trt.includeTimestamp(key);
137      }
138    }
139  }
140
141  /**
142   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range.
143   * The data chosen is going to ensure that there are lots collisions, i.e, some other threads may
144   * already update the value while one tries to update min/max value.
145   */
146  @Test
147  public void testConcurrentIncludeTimestampCorrectness() {
148    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
149    long min = Long.MAX_VALUE, max = 0;
150    for (int i = 0; i < NUM_OF_THREADS; i++) {
151      testData[i] = new RandomTestData();
152      if (testData[i].getMin() < min) {
153        min = testData[i].getMin();
154      }
155      if (testData[i].getMax() > max) {
156        max = testData[i].getMax();
157      }
158    }
159
160    TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
161
162    Thread[] t = new Thread[NUM_OF_THREADS];
163    for (int i = 0; i < NUM_OF_THREADS; i++) {
164      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
165      t[i].start();
166    }
167
168    for (Thread thread : t) {
169      try {
170        thread.join();
171      } catch (InterruptedException e) {
172        e.printStackTrace();
173      }
174    }
175
176    assertEquals(min, trt.getMin());
177    assertEquals(max, trt.getMax());
178  }
179}