001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021
022import java.util.concurrent.ThreadLocalRandom;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.RegionServerTests;
025import org.apache.hadoop.hbase.testclassification.SmallTests;
026import org.junit.ClassRule;
027import org.junit.Test;
028import org.junit.experimental.categories.Category;
029
030@Category({RegionServerTests.class, SmallTests.class})
031public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker {
032
033  @ClassRule
034  public static final HBaseClassTestRule CLASS_RULE =
035      HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class);
036
037  private static final int NUM_KEYS = 10000000;
038  private static final int NUM_OF_THREADS = 20;
039
040  @Override
041  protected TimeRangeTracker getTimeRangeTracker() {
042    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
043  }
044
045  @Override
046  protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
047    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max);
048  }
049
050  /**
051   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
052   * at right range.  Here we do ten threads each incrementing over 100k at an offset
053   * of the thread index; max is 10 * 10k and min is 0.
054   * @throws InterruptedException
055   */
056  @Test
057  public void testArriveAtRightAnswer() throws InterruptedException {
058    final TimeRangeTracker trr = getTimeRangeTracker();
059    final int threadCount = 10;
060    final int calls = 1000 * 1000;
061    Thread [] threads = new Thread[threadCount];
062    for (int i = 0; i < threads.length; i++) {
063      Thread t = new Thread("" + i) {
064        @Override
065        public void run() {
066          int offset = Integer.parseInt(getName());
067          boolean even = offset % 2 == 0;
068          if (even) {
069            for (int i = (offset * calls); i < calls; i++) trr.includeTimestamp(i);
070          } else {
071            int base = offset * calls;
072            for (int i = base + calls; i >= base; i--) trr.includeTimestamp(i);
073          }
074        }
075      };
076      t.start();
077      threads[i] = t;
078    }
079    for (int i = 0; i < threads.length; i++) {
080      threads[i].join();
081    }
082
083    assertTrue(trr.getMax() == calls * threadCount);
084    assertTrue(trr.getMin() == 0);
085  }
086
087  static class RandomTestData {
088    private long[] keys = new long[NUM_KEYS];
089    private long min = Long.MAX_VALUE;
090    private long max = 0;
091
092    public RandomTestData() {
093      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
094        for (int i = 0; i < NUM_KEYS; i++) {
095          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
096          if (keys[i] < min) min = keys[i];
097          if (keys[i] > max) max = keys[i];
098        }
099      } else {
100        for (int i = NUM_KEYS - 1; i >= 0; i--) {
101          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
102          if (keys[i] < min) min = keys[i];
103          if (keys[i] > max) max = keys[i];
104        }
105      }
106    }
107
108    public long getMax() {
109      return this.max;
110    }
111
112    public long getMin() {
113      return this.min;
114    }
115  }
116
117  static class TrtUpdateRunnable implements Runnable {
118
119    private TimeRangeTracker trt;
120    private RandomTestData data;
121    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
122      this.trt = trt;
123      this.data = data;
124    }
125
126    @Override
127    public void run() {
128      for (long key : data.keys) {
129        trt.includeTimestamp(key);
130      }
131    }
132  }
133
134  /**
135   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
136   * at right range.  The data chosen is going to ensure that there are lots collisions, i.e,
137   * some other threads may already update the value while one tries to update min/max value.
138   */
139  @Test
140  public void testConcurrentIncludeTimestampCorrectness() {
141    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
142    long min = Long.MAX_VALUE, max = 0;
143    for (int i = 0; i < NUM_OF_THREADS; i ++) {
144      testData[i] = new RandomTestData();
145      if (testData[i].getMin() < min) {
146        min = testData[i].getMin();
147      }
148      if (testData[i].getMax() > max) {
149        max = testData[i].getMax();
150      }
151    }
152
153    TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
154
155    Thread[] t = new Thread[NUM_OF_THREADS];
156    for (int i = 0; i < NUM_OF_THREADS; i++) {
157      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
158      t[i].start();
159    }
160
161    for (Thread thread : t) {
162      try {
163        thread.join();
164      } catch (InterruptedException e) {
165        e.printStackTrace();
166      }
167    }
168
169    assertTrue(min == trt.getMin());
170    assertTrue(max == trt.getMax());
171  }
172}