001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.Random; 021import java.util.concurrent.atomic.AtomicBoolean; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.HBaseClassTestRule; 024import org.apache.hadoop.hbase.testclassification.MediumTests; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.junit.Assert; 027import org.junit.ClassRule; 028import org.junit.Test; 029import org.junit.experimental.categories.Category; 030 031/** 032 * This is a hammer test that verifies MultiVersionConcurrencyControl in a 033 * multiple writer single reader scenario. 034 */ 035@Category({RegionServerTests.class, MediumTests.class}) 036public class TestMultiVersionConcurrencyControl { 037 038 @ClassRule 039 public static final HBaseClassTestRule CLASS_RULE = 040 HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class); 041 042 static class Writer implements Runnable { 043 final AtomicBoolean finished; 044 final MultiVersionConcurrencyControl mvcc; 045 final AtomicBoolean status; 046 047 Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) { 048 this.finished = finished; 049 this.mvcc = mvcc; 050 this.status = status; 051 } 052 053 private Random rnd = new Random(); 054 public boolean failed = false; 055 056 @Override 057 public void run() { 058 while (!finished.get()) { 059 MultiVersionConcurrencyControl.WriteEntry e = 060 mvcc.begin(); 061 // System.out.println("Begin write: " + e.getWriteNumber()); 062 // 10 usec - 500usec (including 0) 063 int sleepTime = rnd.nextInt(500); 064 // 500 * 1000 = 500,000ns = 500 usec 065 // 1 * 100 = 100ns = 1usec 066 try { 067 if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000); 068 } catch (InterruptedException e1) { 069 } 070 try { 071 mvcc.completeAndWait(e); 072 } catch (RuntimeException ex) { 073 // got failure 074 System.out.println(ex.toString()); 075 ex.printStackTrace(); 076 status.set(false); 077 return; 078 // Report failure if possible. 079 } 080 } 081 } 082 } 083 084 @Test 085 public void testParallelism() throws Exception { 086 final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); 087 088 final AtomicBoolean finished = new AtomicBoolean(false); 089 090 // fail flag for the reader thread 091 final AtomicBoolean readerFailed = new AtomicBoolean(false); 092 final AtomicLong failedAt = new AtomicLong(); 093 Runnable reader = new Runnable() { 094 @Override 095 public void run() { 096 long prev = mvcc.getReadPoint(); 097 while (!finished.get()) { 098 long newPrev = mvcc.getReadPoint(); 099 if (newPrev < prev) { 100 // serious problem. 101 System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev); 102 readerFailed.set(true); 103 // might as well give up 104 failedAt.set(newPrev); 105 return; 106 } 107 } 108 } 109 }; 110 111 // writer thread parallelism. 112 int n = 20; 113 Thread[] writers = new Thread[n]; 114 AtomicBoolean[] statuses = new AtomicBoolean[n]; 115 Thread readThread = new Thread(reader); 116 117 for (int i = 0; i < n; ++i) { 118 statuses[i] = new AtomicBoolean(true); 119 writers[i] = new Thread(new Writer(finished, mvcc, statuses[i])); 120 writers[i].start(); 121 } 122 readThread.start(); 123 124 try { 125 Thread.sleep(10 * 1000); 126 } catch (InterruptedException ex) { 127 } 128 129 finished.set(true); 130 131 readThread.join(); 132 for (int i = 0; i < n; ++i) { 133 writers[i].join(); 134 } 135 136 // check failure. 137 Assert.assertFalse(readerFailed.get()); 138 for (int i = 0; i < n; ++i) { 139 Assert.assertTrue(statuses[i].get()); 140 } 141 } 142}