001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.util.concurrent.ThreadLocalRandom;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.hbase.testclassification.MediumTests;
027import org.apache.hadoop.hbase.testclassification.RegionServerTests;
028import org.junit.jupiter.api.Tag;
029import org.junit.jupiter.api.Test;
030
031/**
032 * This is a hammer test that verifies MultiVersionConcurrencyControl in a multiple writer single
033 * reader scenario.
034 */
035@Tag(RegionServerTests.TAG)
036@Tag(MediumTests.TAG)
037public class TestMultiVersionConcurrencyControl {
038
039  static class Writer implements Runnable {
040    final AtomicBoolean finished;
041    final MultiVersionConcurrencyControl mvcc;
042    final AtomicBoolean status;
043
044    Writer(AtomicBoolean finished, MultiVersionConcurrencyControl mvcc, AtomicBoolean status) {
045      this.finished = finished;
046      this.mvcc = mvcc;
047      this.status = status;
048    }
049
050    public boolean failed = false;
051
052    @Override
053    public void run() {
054      while (!finished.get()) {
055        MultiVersionConcurrencyControl.WriteEntry e = mvcc.begin();
056        // System.out.println("Begin write: " + e.getWriteNumber());
057        // 10 usec - 500usec (including 0)
058        int sleepTime = ThreadLocalRandom.current().nextInt(500);
059        // 500 * 1000 = 500,000ns = 500 usec
060        // 1 * 100 = 100ns = 1usec
061        try {
062          if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
063        } catch (InterruptedException e1) {
064        }
065        try {
066          mvcc.completeAndWait(e);
067        } catch (RuntimeException ex) {
068          // got failure
069          System.out.println(ex.toString());
070          ex.printStackTrace();
071          status.set(false);
072          return;
073          // Report failure if possible.
074        }
075      }
076    }
077  }
078
079  @Test
080  public void testParallelism() throws Exception {
081    final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
082
083    final AtomicBoolean finished = new AtomicBoolean(false);
084
085    // fail flag for the reader thread
086    final AtomicBoolean readerFailed = new AtomicBoolean(false);
087    final AtomicLong failedAt = new AtomicLong();
088    Runnable reader = new Runnable() {
089      @Override
090      public void run() {
091        long prev = mvcc.getReadPoint();
092        while (!finished.get()) {
093          long newPrev = mvcc.getReadPoint();
094          if (newPrev < prev) {
095            // serious problem.
096            System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
097            readerFailed.set(true);
098            // might as well give up
099            failedAt.set(newPrev);
100            return;
101          }
102        }
103      }
104    };
105
106    // writer thread parallelism.
107    int n = 20;
108    Thread[] writers = new Thread[n];
109    AtomicBoolean[] statuses = new AtomicBoolean[n];
110    Thread readThread = new Thread(reader);
111
112    for (int i = 0; i < n; ++i) {
113      statuses[i] = new AtomicBoolean(true);
114      writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
115      writers[i].start();
116    }
117    readThread.start();
118
119    try {
120      Thread.sleep(10 * 1000);
121    } catch (InterruptedException ex) {
122    }
123
124    finished.set(true);
125
126    readThread.join();
127    for (int i = 0; i < n; ++i) {
128      writers[i].join();
129    }
130
131    // check failure.
132    assertFalse(readerFailed.get());
133    for (int i = 0; i < n; ++i) {
134      assertTrue(statuses[i].get());
135    }
136  }
137}