001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021 022import java.util.concurrent.ThreadLocalRandom; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.testclassification.RegionServerTests; 025import org.apache.hadoop.hbase.testclassification.SmallTests; 026import org.junit.ClassRule; 027import org.junit.Test; 028import org.junit.experimental.categories.Category; 029 030@Category({RegionServerTests.class, SmallTests.class}) 031public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker { 032 033 @ClassRule 034 public static final HBaseClassTestRule CLASS_RULE = 035 HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class); 036 037 private static final int NUM_KEYS = 10000000; 038 private static final int NUM_OF_THREADS = 20; 039 040 @Override 041 protected TimeRangeTracker getTimeRangeTracker() { 042 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 043 } 044 045 @Override 046 protected TimeRangeTracker getTimeRangeTracker(long min, long max) { 047 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max); 048 } 049 050 /** 051 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive 052 * at right range. Here we do ten threads each incrementing over 100k at an offset 053 * of the thread index; max is 10 * 10k and min is 0. 054 * @throws InterruptedException 055 */ 056 @Test 057 public void testArriveAtRightAnswer() throws InterruptedException { 058 final TimeRangeTracker trr = getTimeRangeTracker(); 059 final int threadCount = 10; 060 final int calls = 1000 * 1000; 061 Thread [] threads = new Thread[threadCount]; 062 for (int i = 0; i < threads.length; i++) { 063 Thread t = new Thread("" + i) { 064 @Override 065 public void run() { 066 int offset = Integer.parseInt(getName()); 067 boolean even = offset % 2 == 0; 068 if (even) { 069 for (int i = (offset * calls); i < calls; i++) trr.includeTimestamp(i); 070 } else { 071 int base = offset * calls; 072 for (int i = base + calls; i >= base; i--) trr.includeTimestamp(i); 073 } 074 } 075 }; 076 t.start(); 077 threads[i] = t; 078 } 079 for (int i = 0; i < threads.length; i++) { 080 threads[i].join(); 081 } 082 083 assertTrue(trr.getMax() == calls * threadCount); 084 assertTrue(trr.getMin() == 0); 085 } 086 087 static class RandomTestData { 088 private long[] keys = new long[NUM_KEYS]; 089 private long min = Long.MAX_VALUE; 090 private long max = 0; 091 092 public RandomTestData() { 093 if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) { 094 for (int i = 0; i < NUM_KEYS; i++) { 095 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 096 if (keys[i] < min) min = keys[i]; 097 if (keys[i] > max) max = keys[i]; 098 } 099 } else { 100 for (int i = NUM_KEYS - 1; i >= 0; i--) { 101 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 102 if (keys[i] < min) min = keys[i]; 103 if (keys[i] > max) max = keys[i]; 104 } 105 } 106 } 107 108 public long getMax() { 109 return this.max; 110 } 111 112 public long getMin() { 113 return this.min; 114 } 115 } 116 117 static class TrtUpdateRunnable implements Runnable { 118 119 private TimeRangeTracker trt; 120 private RandomTestData data; 121 public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) { 122 this.trt = trt; 123 this.data = data; 124 } 125 126 @Override 127 public void run() { 128 for (long key : data.keys) { 129 trt.includeTimestamp(key); 130 } 131 } 132 } 133 134 /** 135 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive 136 * at right range. The data chosen is going to ensure that there are lots collisions, i.e, 137 * some other threads may already update the value while one tries to update min/max value. 138 */ 139 @Test 140 public void testConcurrentIncludeTimestampCorrectness() { 141 RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS]; 142 long min = Long.MAX_VALUE, max = 0; 143 for (int i = 0; i < NUM_OF_THREADS; i ++) { 144 testData[i] = new RandomTestData(); 145 if (testData[i].getMin() < min) { 146 min = testData[i].getMin(); 147 } 148 if (testData[i].getMax() > max) { 149 max = testData[i].getMax(); 150 } 151 } 152 153 TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 154 155 Thread[] t = new Thread[NUM_OF_THREADS]; 156 for (int i = 0; i < NUM_OF_THREADS; i++) { 157 t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i])); 158 t[i].start(); 159 } 160 161 for (Thread thread : t) { 162 try { 163 thread.join(); 164 } catch (InterruptedException e) { 165 e.printStackTrace(); 166 } 167 } 168 169 assertTrue(min == trt.getMin()); 170 assertTrue(max == trt.getMax()); 171 } 172}