001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertTrue; 021import java.util.concurrent.ThreadLocalRandom; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.testclassification.MediumTests; 024import org.apache.hadoop.hbase.testclassification.RegionServerTests; 025import org.junit.ClassRule; 026import org.junit.Test; 027import org.junit.experimental.categories.Category; 028 029@Category({RegionServerTests.class, MediumTests.class}) 030public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker { 031 032 @ClassRule 033 public static final HBaseClassTestRule CLASS_RULE = 034 HBaseClassTestRule.forClass(TestSyncTimeRangeTracker.class); 035 036 private static final int NUM_KEYS = 10000000; 037 private static final int NUM_OF_THREADS = 20; 038 039 @Override 040 protected TimeRangeTracker getTimeRangeTracker() { 041 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 042 } 043 044 @Override 045 protected TimeRangeTracker getTimeRangeTracker(long min, long max) { 046 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max); 047 } 048 049 /** 050 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive 051 * at right range. Here we do ten threads each incrementing over 100k at an offset 052 * of the thread index; max is 10 * 10k and min is 0. 053 */ 054 @Test 055 public void testArriveAtRightAnswer() throws InterruptedException { 056 final TimeRangeTracker trr = getTimeRangeTracker(); 057 final int threadCount = 10; 058 final int calls = 1000 * 1000; 059 Thread [] threads = new Thread[threadCount]; 060 for (int i = 0; i < threads.length; i++) { 061 Thread t = new Thread("" + i) { 062 @Override 063 public void run() { 064 int offset = Integer.parseInt(getName()); 065 boolean even = offset % 2 == 0; 066 if (even) { 067 for (int i = (offset * calls); i < calls; i++) { 068 trr.includeTimestamp(i); 069 } 070 } else { 071 int base = offset * calls; 072 for (int i = base + calls; i >= base; i--) { 073 trr.includeTimestamp(i); 074 } 075 } 076 } 077 }; 078 t.start(); 079 threads[i] = t; 080 } 081 for (int i = 0; i < threads.length; i++) { 082 threads[i].join(); 083 } 084 085 assertTrue(trr.getMax() == calls * threadCount); 086 assertTrue(trr.getMin() == 0); 087 } 088 089 static class RandomTestData { 090 private long[] keys = new long[NUM_KEYS]; 091 private long min = Long.MAX_VALUE; 092 private long max = 0; 093 094 public RandomTestData() { 095 if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) { 096 for (int i = 0; i < NUM_KEYS; i++) { 097 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 098 if (keys[i] < min) { 099 min = keys[i]; 100 } 101 if (keys[i] > max) { 102 max = keys[i]; 103 } 104 } 105 } else { 106 for (int i = NUM_KEYS - 1; i >= 0; i--) { 107 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 108 if (keys[i] < min) { 109 min = keys[i]; 110 } 111 if (keys[i] > max) { 112 max = keys[i]; 113 } 114 } 115 } 116 } 117 118 public long getMax() { 119 return this.max; 120 } 121 122 public long getMin() { 123 return this.min; 124 } 125 } 126 127 static class TrtUpdateRunnable implements Runnable { 128 129 private TimeRangeTracker trt; 130 private RandomTestData data; 131 public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) { 132 this.trt = trt; 133 this.data = data; 134 } 135 136 @Override 137 public void run() { 138 for (long key : data.keys) { 139 trt.includeTimestamp(key); 140 } 141 } 142 } 143 144 /** 145 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive 146 * at right range. The data chosen is going to ensure that there are lots collisions, i.e, 147 * some other threads may already update the value while one tries to update min/max value. 148 */ 149 @Test 150 public void testConcurrentIncludeTimestampCorrectness() { 151 RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS]; 152 long min = Long.MAX_VALUE, max = 0; 153 for (int i = 0; i < NUM_OF_THREADS; i ++) { 154 testData[i] = new RandomTestData(); 155 if (testData[i].getMin() < min) { 156 min = testData[i].getMin(); 157 } 158 if (testData[i].getMax() > max) { 159 max = testData[i].getMax(); 160 } 161 } 162 163 TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 164 165 Thread[] t = new Thread[NUM_OF_THREADS]; 166 for (int i = 0; i < NUM_OF_THREADS; i++) { 167 t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i])); 168 t[i].start(); 169 } 170 171 for (Thread thread : t) { 172 try { 173 thread.join(); 174 } catch (InterruptedException e) { 175 e.printStackTrace(); 176 } 177 } 178 179 assertTrue(min == trt.getMin()); 180 assertTrue(max == trt.getMax()); 181 } 182}