001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.Random;
021import java.util.concurrent.atomic.AtomicBoolean;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.MediumTests;
025import org.apache.hadoop.hbase.testclassification.RegionServerTests;
026import org.junit.Assert;
027import org.junit.ClassRule;
028import org.junit.experimental.categories.Category;
029
030/**
031 * This is a hammer test that verifies MultiVersionConcurrencyControl in a
032 * multiple writer single reader scenario.
033 */
034@Category({RegionServerTests.class, MediumTests.class})
035public class TestMultiVersionConcurrencyControl {
036
037  @ClassRule
038  public static final HBaseClassTestRule CLASS_RULE =
039      HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class);
040
041  static class Writer implements Runnable {
042    final AtomicBoolean finished;
043    final MultiVersionConcurrencyControl mvcc;
044    final AtomicBoolean status;
045
046    Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) {
047      this.finished = finished;
048      this.mvcc = mvcc;
049      this.status = status;
050    }
051
052    private Random rnd = new Random();
053    public boolean failed = false;
054
055    @Override
056    public void run() {
057      while (!finished.get()) {
058        MultiVersionConcurrencyControl.WriteEntry e =
059            mvcc.begin();
060        // System.out.println("Begin write: " + e.getWriteNumber());
061        // 10 usec - 500usec (including 0)
062        int sleepTime = rnd.nextInt(500);
063        // 500 * 1000 = 500,000ns = 500 usec
064        // 1 * 100 = 100ns = 1usec
065        try {
066          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
067        } catch (InterruptedException e1) {
068        }
069        try {
070          mvcc.completeAndWait(e);
071        } catch (RuntimeException ex) {
072          // got failure
073          System.out.println(ex.toString());
074          ex.printStackTrace();
075          status.set(false);
076          return;
077          // Report failure if possible.
078        }
079      }
080    }
081  }
082
083  public void testParallelism() throws Exception {
084    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
085
086    final AtomicBoolean finished = new AtomicBoolean(false);
087
088    // fail flag for the reader thread
089    final AtomicBoolean readerFailed = new AtomicBoolean(false);
090    final AtomicLong failedAt = new AtomicLong();
091    Runnable reader = new Runnable() {
092      @Override
093      public void run() {
094        long prev = mvcc.getReadPoint();
095        while (!finished.get()) {
096          long newPrev = mvcc.getReadPoint();
097          if (newPrev < prev) {
098            // serious problem.
099            System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
100            readerFailed.set(true);
101            // might as well give up
102            failedAt.set(newPrev);
103            return;
104          }
105        }
106      }
107    };
108
109    // writer thread parallelism.
110    int n = 20;
111    Thread[] writers = new Thread[n];
112    AtomicBoolean[] statuses = new AtomicBoolean[n];
113    Thread readThread = new Thread(reader);
114
115    for (int i = 0; i < n; ++i) {
116      statuses[i] = new AtomicBoolean(true);
117      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
118      writers[i].start();
119    }
120    readThread.start();
121
122    try {
123      Thread.sleep(10 * 1000);
124    } catch (InterruptedException ex) {
125    }
126
127    finished.set(true);
128
129    readThread.join();
130    for (int i = 0; i < n; ++i) {
131      writers[i].join();
132    }
133
134    // check failure.
135    Assert.assertFalse(readerFailed.get());
136    for (int i = 0; i < n; ++i) {
137      Assert.assertTrue(statuses[i].get());
138    }
139  }
140}