001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Random;
021import java.util.concurrent.atomic.AtomicBoolean;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.MediumTests;
025import org.apache.hadoop.hbase.testclassification.RegionServerTests;
026import org.junit.Assert;
027import org.junit.ClassRule;
028import org.junit.Test;
029import org.junit.experimental.categories.Category;
030
031/**
032 * This is a hammer test that verifies MultiVersionConcurrencyControl in a
033 * multiple writer single reader scenario.
034 */
035@Category({RegionServerTests.class, MediumTests.class})
036public class TestMultiVersionConcurrencyControl {
037
038  @ClassRule
039  public static final HBaseClassTestRule CLASS_RULE =
040      HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class);
041
042  static class Writer implements Runnable {
043    final AtomicBoolean finished;
044    final MultiVersionConcurrencyControl mvcc;
045    final AtomicBoolean status;
046
047    Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) {
048      this.finished = finished;
049      this.mvcc = mvcc;
050      this.status = status;
051    }
052
053    private Random rnd = new Random();
054    public boolean failed = false;
055
056    @Override
057    public void run() {
058      while (!finished.get()) {
059        MultiVersionConcurrencyControl.WriteEntry e =
060            mvcc.begin();
061        // System.out.println("Begin write: " + e.getWriteNumber());
062        // 10 usec - 500usec (including 0)
063        int sleepTime = rnd.nextInt(500);
064        // 500 * 1000 = 500,000ns = 500 usec
065        // 1 * 100 = 100ns = 1usec
066        try {
067          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
068        } catch (InterruptedException e1) {
069        }
070        try {
071          mvcc.completeAndWait(e);
072        } catch (RuntimeException ex) {
073          // got failure
074          System.out.println(ex.toString());
075          ex.printStackTrace();
076          status.set(false);
077          return;
078          // Report failure if possible.
079        }
080      }
081    }
082  }
083
084  @Test
085  public void testParallelism() throws Exception {
086    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
087
088    final AtomicBoolean finished = new AtomicBoolean(false);
089
090    // fail flag for the reader thread
091    final AtomicBoolean readerFailed = new AtomicBoolean(false);
092    final AtomicLong failedAt = new AtomicLong();
093    Runnable reader = new Runnable() {
094      @Override
095      public void run() {
096        long prev = mvcc.getReadPoint();
097        while (!finished.get()) {
098          long newPrev = mvcc.getReadPoint();
099          if (newPrev < prev) {
100            // serious problem.
101            System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
102            readerFailed.set(true);
103            // might as well give up
104            failedAt.set(newPrev);
105            return;
106          }
107        }
108      }
109    };
110
111    // writer thread parallelism.
112    int n = 20;
113    Thread[] writers = new Thread[n];
114    AtomicBoolean[] statuses = new AtomicBoolean[n];
115    Thread readThread = new Thread(reader);
116
117    for (int i = 0; i < n; ++i) {
118      statuses[i] = new AtomicBoolean(true);
119      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
120      writers[i].start();
121    }
122    readThread.start();
123
124    try {
125      Thread.sleep(10 * 1000);
126    } catch (InterruptedException ex) {
127    }
128
129    finished.set(true);
130
131    readThread.join();
132    for (int i = 0; i < n; ++i) {
133      writers[i].join();
134    }
135
136    // check failure.
137    Assert.assertFalse(readerFailed.get());
138    for (int i = 0; i < n; ++i) {
139      Assert.assertTrue(statuses[i].get());
140    }
141  }
142}