001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertTrue;
021import java.util.concurrent.ThreadLocalRandom;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.testclassification.MediumTests;
024import org.apache.hadoop.hbase.testclassification.RegionServerTests;
025import org.junit.ClassRule;
026import org.junit.Test;
027import org.junit.experimental.categories.Category;
028
029@Category({RegionServerTests.class, MediumTests.class})
030public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker {
031
032  @ClassRule
033  public static final HBaseClassTestRule CLASS_RULE =
034      HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class);
035
036  private static final int NUM_KEYS = 10000000;
037  private static final int NUM_OF_THREADS = 20;
038
039  @Override
040  protected TimeRangeTracker getTimeRangeTracker() {
041    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
042  }
043
044  @Override
045  protected TimeRangeTracker getTimeRangeTracker(long min, long max) {
046    return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max);
047  }
048
049  /**
050   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
051   * at right range.  Here we do ten threads each incrementing over 100k at an offset
052   * of the thread index; max is 10 * 10k and min is 0.
053   */
054  @Test
055  public void testArriveAtRightAnswer() throws InterruptedException {
056    final TimeRangeTracker trr = getTimeRangeTracker();
057    final int threadCount = 10;
058    final int calls = 1000 * 1000;
059    Thread [] threads = new Thread[threadCount];
060    for (int i = 0; i < threads.length; i++) {
061      Thread t = new Thread("" + i) {
062        @Override
063        public void run() {
064          int offset = Integer.parseInt(getName());
065          boolean even = offset % 2 == 0;
066          if (even) {
067            for (int i = (offset * calls); i < calls; i++) {
068              trr.includeTimestamp(i);
069            }
070          } else {
071            int base = offset * calls;
072            for (int i = base + calls; i >= base; i--) {
073              trr.includeTimestamp(i);
074            }
075          }
076        }
077      };
078      t.start();
079      threads[i] = t;
080    }
081    for (int i = 0; i < threads.length; i++) {
082      threads[i].join();
083    }
084
085    assertTrue(trr.getMax() == calls * threadCount);
086    assertTrue(trr.getMin() == 0);
087  }
088
089  static class RandomTestData {
090    private long[] keys = new long[NUM_KEYS];
091    private long min = Long.MAX_VALUE;
092    private long max = 0;
093
094    public RandomTestData() {
095      if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) {
096        for (int i = 0; i < NUM_KEYS; i++) {
097          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
098          if (keys[i] < min) {
099            min = keys[i];
100          }
101          if (keys[i] > max) {
102            max = keys[i];
103          }
104        }
105      } else {
106        for (int i = NUM_KEYS - 1; i >= 0; i--) {
107          keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS);
108          if (keys[i] < min) {
109            min = keys[i];
110          }
111          if (keys[i] > max) {
112            max = keys[i];
113          }
114        }
115      }
116    }
117
118    public long getMax() {
119      return this.max;
120    }
121
122    public long getMin() {
123      return this.min;
124    }
125  }
126
127  static class TrtUpdateRunnable implements Runnable {
128
129    private TimeRangeTracker trt;
130    private RandomTestData data;
131    public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) {
132      this.trt = trt;
133      this.data = data;
134    }
135
136    @Override
137    public void run() {
138      for (long key : data.keys) {
139        trt.includeTimestamp(key);
140      }
141    }
142  }
143
144  /**
145   * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive
146   * at right range.  The data chosen is going to ensure that there are lots collisions, i.e,
147   * some other threads may already update the value while one tries to update min/max value.
148   */
149  @Test
150  public void testConcurrentIncludeTimestampCorrectness() {
151    RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS];
152    long min = Long.MAX_VALUE, max = 0;
153    for (int i = 0; i < NUM_OF_THREADS; i ++) {
154      testData[i] = new RandomTestData();
155      if (testData[i].getMin() < min) {
156        min = testData[i].getMin();
157      }
158      if (testData[i].getMax() > max) {
159        max = testData[i].getMax();
160      }
161    }
162
163    TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC);
164
165    Thread[] t = new Thread[NUM_OF_THREADS];
166    for (int i = 0; i < NUM_OF_THREADS; i++) {
167      t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i]));
168      t[i].start();
169    }
170
171    for (Thread thread : t) {
172      try {
173        thread.join();
174      } catch (InterruptedException e) {
175        e.printStackTrace();
176      }
177    }
178
179    assertTrue(min == trt.getMin());
180    assertTrue(max == trt.getMax());
181  }
182}