001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.util.concurrent.ThreadLocalRandom;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.MediumTests;
025import org.apache.hadoop.hbase.testclassification.RegionServerTests;
026import org.junit.ClassRule;
027import org.junit.Test;
028import org.junit.experimental.categories.Category;
029
030@Category({ RegionServerTests.class, MediumTests.class })
031public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker {
032
033  @ClassRule
034  public static final HBaseClassTestRule CLASS_RULE =
035    HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class);
036
037  private static final int NUM_KEYS = 8000000;
038  private static final int NUM_OF_THREADS = 20;
039
040  @Override
041  protected TimeRangeTracker getTimeRangeTracker() {
042    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
043  }
044
045  @Override
046  protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
047    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max);
048  }
049
050  /**
051   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range.
052   * Here we do ten threads each incrementing over 100k at an offset of the thread index; max is 10
053   * * 10k and min is 0.
054   */
055  @Test
056  public void testArriveAtRightAnswer() throws InterruptedException {
057    final TimeRangeTracker trr = getTimeRangeTracker();
058    final int threadCount = 10;
059    final int calls = 1000 * 1000;
060    Thread[] threads = new Thread[threadCount];
061    for (int i = 0; i < threads.length; i++) {
062      Thread t = new Thread("" + i) {
063        @Override
064        public void run() {
065          int offset = Integer.parseInt(getName());
066          boolean even = offset % 2 == 0;
067          if (even) {
068            for (int i = (offset * calls); i < calls; i++) {
069              trr.includeTimestamp(i);
070            }
071          } else {
072            int base = offset * calls;
073            for (int i = base + calls; i >= base; i--) {
074              trr.includeTimestamp(i);
075            }
076          }
077        }
078      };
079      t.start();
080      threads[i] = t;
081    }
082    for (int i = 0; i < threads.length; i++) {
083      threads[i].join();
084    }
085
086    assertTrue(trr.getMax() == calls * threadCount);
087    assertTrue(trr.getMin() == 0);
088  }
089
090  static class RandomTestData {
091    private long[] keys = new long[NUM_KEYS];
092    private long min = Long.MAX_VALUE;
093    private long max = 0;
094
095    public RandomTestData() {
096      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
097        for (int i = 0; i < NUM_KEYS; i++) {
098          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
099          if (keys[i] < min) {
100            min = keys[i];
101          }
102          if (keys[i] > max) {
103            max = keys[i];
104          }
105        }
106      } else {
107        for (int i = NUM_KEYS - 1; i >= 0; i--) {
108          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
109          if (keys[i] < min) {
110            min = keys[i];
111          }
112          if (keys[i] > max) {
113            max = keys[i];
114          }
115        }
116      }
117    }
118
119    public long getMax() {
120      return this.max;
121    }
122
123    public long getMin() {
124      return this.min;
125    }
126  }
127
128  static class TrtUpdateRunnable implements Runnable {
129
130    private TimeRangeTracker trt;
131    private RandomTestData data;
132
133    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
134      this.trt = trt;
135      this.data = data;
136    }
137
138    @Override
139    public void run() {
140      for (long key : data.keys) {
141        trt.includeTimestamp(key);
142      }
143    }
144  }
145
146  /**
147   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range.
148   * The data chosen is going to ensure that there are lots collisions, i.e, some other threads may
149   * already update the value while one tries to update min/max value.
150   */
151  @Test
152  public void testConcurrentIncludeTimestampCorrectness() {
153    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
154    long min = Long.MAX_VALUE, max = 0;
155    for (int i = 0; i < NUM_OF_THREADS; i++) {
156      testData[i] = new RandomTestData();
157      if (testData[i].getMin() < min) {
158        min = testData[i].getMin();
159      }
160      if (testData[i].getMax() > max) {
161        max = testData[i].getMax();
162      }
163    }
164
165    TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
166
167    Thread[] t = new Thread[NUM_OF_THREADS];
168    for (int i = 0; i < NUM_OF_THREADS; i++) {
169      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
170      t[i].start();
171    }
172
173    for (Thread thread : t) {
174      try {
175        thread.join();
176      } catch (InterruptedException e) {
177        e.printStackTrace();
178      }
179    }
180
181    assertTrue(min == trt.getMin());
182    assertTrue(max == trt.getMax());
183  }
184}