001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.util.concurrent.ThreadLocalRandom; 023import org.apache.hadoop.hbase.testclassification.MediumTests; 024import org.apache.hadoop.hbase.testclassification.RegionServerTests; 025import org.junit.jupiter.api.Tag; 026import org.junit.jupiter.api.Test; 027 028@Tag(RegionServerTests.TAG) 029@Tag(MediumTests.TAG) 030public class TestSyncTimeRangeTracker extends TestSimpleTimeRangeTracker { 031 032 private static final int NUM_KEYS = 8000000; 033 private static final int NUM_OF_THREADS = 20; 034 035 @Override 036 protected TimeRangeTracker getTimeRangeTracker() { 037 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 038 } 039 040 @Override 041 protected TimeRangeTracker getTimeRangeTracker(long min, long max) { 042 return TimeRangeTracker.create(TimeRangeTracker.Type.SYNC, min, max); 043 } 044 045 /** 046 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range. 047 * Here we do ten threads each incrementing over 100k at an offset of the thread index; max is 10 048 * * 10k and min is 0. 049 */ 050 @Test 051 public void testArriveAtRightAnswer() throws InterruptedException { 052 final TimeRangeTracker trr = getTimeRangeTracker(); 053 final int threadCount = 10; 054 final int calls = 1000 * 1000; 055 Thread[] threads = new Thread[threadCount]; 056 for (int i = 0; i < threads.length; i++) { 057 Thread t = new Thread("" + i) { 058 @Override 059 public void run() { 060 int offset = Integer.parseInt(getName()); 061 boolean even = offset % 2 == 0; 062 if (even) { 063 for (int i = (offset * calls); i < calls; i++) { 064 trr.includeTimestamp(i); 065 } 066 } else { 067 int base = offset * calls; 068 for (int i = base + calls; i >= base; i--) { 069 trr.includeTimestamp(i); 070 } 071 } 072 } 073 }; 074 t.start(); 075 threads[i] = t; 076 } 077 for (int i = 0; i < threads.length; i++) { 078 threads[i].join(); 079 } 080 081 assertEquals(calls * threadCount, trr.getMax()); 082 assertEquals(0, trr.getMin()); 083 } 084 085 static class RandomTestData { 086 private long[] keys = new long[NUM_KEYS]; 087 private long min = Long.MAX_VALUE; 088 private long max = 0; 089 090 public RandomTestData() { 091 if (ThreadLocalRandom.current().nextInt(NUM_OF_THREADS) % 2 == 0) { 092 for (int i = 0; i < NUM_KEYS; i++) { 093 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 094 if (keys[i] < min) { 095 min = keys[i]; 096 } 097 if (keys[i] > max) { 098 max = keys[i]; 099 } 100 } 101 } else { 102 for (int i = NUM_KEYS - 1; i >= 0; i--) { 103 keys[i] = i + ThreadLocalRandom.current().nextLong(NUM_OF_THREADS); 104 if (keys[i] < min) { 105 min = keys[i]; 106 } 107 if (keys[i] > max) { 108 max = keys[i]; 109 } 110 } 111 } 112 } 113 114 public long getMax() { 115 return this.max; 116 } 117 118 public long getMin() { 119 return this.min; 120 } 121 } 122 123 static class TrtUpdateRunnable implements Runnable { 124 125 private TimeRangeTracker trt; 126 private RandomTestData data; 127 128 public TrtUpdateRunnable(final TimeRangeTracker trt, final RandomTestData data) { 129 this.trt = trt; 130 this.data = data; 131 } 132 133 @Override 134 public void run() { 135 for (long key : data.keys) { 136 trt.includeTimestamp(key); 137 } 138 } 139 } 140 141 /** 142 * Run a bunch of threads against a single TimeRangeTracker and ensure we arrive at right range. 143 * The data chosen is going to ensure that there are lots collisions, i.e, some other threads may 144 * already update the value while one tries to update min/max value. 145 */ 146 @Test 147 public void testConcurrentIncludeTimestampCorrectness() { 148 RandomTestData[] testData = new RandomTestData[NUM_OF_THREADS]; 149 long min = Long.MAX_VALUE, max = 0; 150 for (int i = 0; i < NUM_OF_THREADS; i++) { 151 testData[i] = new RandomTestData(); 152 if (testData[i].getMin() < min) { 153 min = testData[i].getMin(); 154 } 155 if (testData[i].getMax() > max) { 156 max = testData[i].getMax(); 157 } 158 } 159 160 TimeRangeTracker trt = TimeRangeTracker.create(TimeRangeTracker.Type.SYNC); 161 162 Thread[] t = new Thread[NUM_OF_THREADS]; 163 for (int i = 0; i < NUM_OF_THREADS; i++) { 164 t[i] = new Thread(new TrtUpdateRunnable(trt, testData[i])); 165 t[i].start(); 166 } 167 168 for (Thread thread : t) { 169 try { 170 thread.join(); 171 } catch (InterruptedException e) { 172 e.printStackTrace(); 173 } 174 } 175 176 assertEquals(min, trt.getMin()); 177 assertEquals(max, trt.getMax()); 178 } 179}