001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.concurrent.ThreadLocalRandom;
021import java.util.concurrent.atomic.AtomicBoolean;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.HBaseClassTestRule;
024import org.apache.hadoop.hbase.testclassification.MediumTests;
025import org.apache.hadoop.hbase.testclassification.RegionServerTests;
026import org.junit.Assert;
027import org.junit.ClassRule;
028import org.junit.Test;
029import org.junit.experimental.categories.Category;
030
031/**
032 * This is a hammer test that verifies MultiVersionConcurrencyControl in a multiple writer single
033 * reader scenario.
034 */
035@Category({ RegionServerTests.class, MediumTests.class })
036public class TestMultiVersionConcurrencyControl {
037
038  @ClassRule
039  public static final HBaseClassTestRule CLASS_RULE =
040    HBaseClassTestRule.forClass(TestMultiVersionConcurrencyControl.class);
041
042  static class Writer implements Runnable {
043    final AtomicBoolean finished;
044    final MultiVersionConcurrencyControl mvcc;
045    final AtomicBoolean status;
046
047    Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) {
048      this.finished = finished;
049      this.mvcc = mvcc;
050      this.status = status;
051    }
052
053    public boolean failed = false;
054
055    @Override
056    public void run() {
057      while (!finished.get()) {
058        MultiVersionConcurrencyControl.WriteEntry e = mvcc.begin();
059        // System.out.println("Begin write: " + e.getWriteNumber());
060        // 10 usec - 500usec (including 0)
061        int sleepTime = ThreadLocalRandom.current().nextInt(500);
062        // 500 * 1000 = 500,000ns = 500 usec
063        // 1 * 100 = 100ns = 1usec
064        try {
065          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
066        } catch (InterruptedException e1) {
067        }
068        try {
069          mvcc.completeAndWait(e);
070        } catch (RuntimeException ex) {
071          // got failure
072          System.out.println(ex.toString());
073          ex.printStackTrace();
074          status.set(false);
075          return;
076          // Report failure if possible.
077        }
078      }
079    }
080  }
081
082  @Test
083  public void testParallelism() throws Exception {
084    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
085
086    final AtomicBoolean finished = new AtomicBoolean(false);
087
088    // fail flag for the reader thread
089    final AtomicBoolean readerFailed = new AtomicBoolean(false);
090    final AtomicLong failedAt = new AtomicLong();
091    Runnable reader = new Runnable() {
092      @Override
093      public void run() {
094        long prev = mvcc.getReadPoint();
095        while (!finished.get()) {
096          long newPrev = mvcc.getReadPoint();
097          if (newPrev < prev) {
098            // serious problem.
099            System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
100            readerFailed.set(true);
101            // might as well give up
102            failedAt.set(newPrev);
103            return;
104          }
105        }
106      }
107    };
108
109    // writer thread parallelism.
110    int n = 20;
111    Thread[] writers = new Thread[n];
112    AtomicBoolean[] statuses = new AtomicBoolean[n];
113    Thread readThread = new Thread(reader);
114
115    for (int i = 0; i < n; ++i) {
116      statuses[i] = new AtomicBoolean(true);
117      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
118      writers[i].start();
119    }
120    readThread.start();
121
122    try {
123      Thread.sleep(10 * 1000);
124    } catch (InterruptedException ex) {
125    }
126
127    finished.set(true);
128
129    readThread.join();
130    for (int i = 0; i < n; ++i) {
131      writers[i].join();
132    }
133
134    // check failure.
135    Assert.assertFalse(readerFailed.get());
136    for (int i = 0; i < n; ++i) {
137      Assert.assertTrue(statuses[i].get());
138    }
139  }
140}