001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Random; 021import java.util.concurrent.atomic.AtomicBoolean; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.testclassification.MediumTests; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.junit.Assert; 027import org.junit.ClassRule; 028import org.junit.experimental.categories.Category; 029 030/** 031 * This is a hammer test that verifies MultiVersionConcurrencyControl in a 032 * multiple writer single reader scenario. 033 */ 034@Category({RegionServerTests.class, MediumTests.class}) 035public class TestMultiVersionConcurrencyControl { 036 037 @ClassRule 038 public static final HBaseClassTestRule CLASS_RULE = 039 HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class); 040 041 static class Writer implements Runnable { 042 final AtomicBoolean finished; 043 final MultiVersionConcurrencyControl mvcc; 044 final AtomicBoolean status; 045 046 Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) { 047 this.finished = finished; 048 this.mvcc = mvcc; 049 this.status = status; 050 } 051 052 private Random rnd = new Random(); 053 public boolean failed = false; 054 055 @Override 056 public void run() { 057 while (!finished.get()) { 058 MultiVersionConcurrencyControl.WriteEntry e = 059 mvcc.begin(); 060 // System.out.println("Begin write: " + e.getWriteNumber()); 061 // 10 usec - 500usec (including 0) 062 int sleepTime = rnd.nextInt(500); 063 // 500 * 1000 = 500,000ns = 500 usec 064 // 1 * 100 = 100ns = 1usec 065 try { 066 if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000); 067 } catch (InterruptedException e1) { 068 } 069 try { 070 mvcc.completeAndWait(e); 071 } catch (RuntimeException ex) { 072 // got failure 073 System.out.println(ex.toString()); 074 ex.printStackTrace(); 075 status.set(false); 076 return; 077 // Report failure if possible. 078 } 079 } 080 } 081 } 082 083 public void testParallelism() throws Exception { 084 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 085 086 final AtomicBoolean finished = new AtomicBoolean(false); 087 088 // fail flag for the reader thread 089 final AtomicBoolean readerFailed = new AtomicBoolean(false); 090 final AtomicLong failedAt = new AtomicLong(); 091 Runnable reader = new Runnable() { 092 @Override 093 public void run() { 094 long prev = mvcc.getReadPoint(); 095 while (!finished.get()) { 096 long newPrev = mvcc.getReadPoint(); 097 if (newPrev < prev) { 098 // serious problem. 099 System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev); 100 readerFailed.set(true); 101 // might as well give up 102 failedAt.set(newPrev); 103 return; 104 } 105 } 106 } 107 }; 108 109 // writer thread parallelism. 110 int n = 20; 111 Thread[] writers = new Thread[n]; 112 AtomicBoolean[] statuses = new AtomicBoolean[n]; 113 Thread readThread = new Thread(reader); 114 115 for (int i = 0; i < n; ++i) { 116 statuses[i] = new AtomicBoolean(true); 117 writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); 118 writers[i].start(); 119 } 120 readThread.start(); 121 122 try { 123 Thread.sleep(10 * 1000); 124 } catch (InterruptedException ex) { 125 } 126 127 finished.set(true); 128 129 readThread.join(); 130 for (int i = 0; i < n; ++i) { 131 writers[i].join(); 132 } 133 134 // check failure. 135 Assert.assertFalse(readerFailed.get()); 136 for (int i = 0; i < n; ++i) { 137 Assert.assertTrue(statuses[i].get()); 138 } 139 } 140}