001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.util.LinkedList;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.util.Bytes;
024import org.apache.hadoop.hbase.util.ClassSize;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
030import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper;
031
032/**
033 * Manages the read/write consistency. This provides an interface for readers to determine what
034 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
035 * the new writes for readers to read (thus forming atomic transactions).
036 */
037@InterfaceAudience.Private
038public class MultiVersionConcurrencyControl {
039  private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class);
040  private static final long READPOINT_ADVANCE_WAIT_TIME = 10L;
041
042  final String regionName;
043  final AtomicLong readPoint = new AtomicLong(0);
044  final AtomicLong writePoint = new AtomicLong(0);
045  private final Object readWaiters = new Object();
046  /**
047   * Represents no value, or not set.
048   */
049  public static final long NONE = -1;
050
051  // This is the pending queue of writes.
052  //
053  // TODO(eclark): Should this be an array of fixed size to
054  // reduce the number of allocations on the write path?
055  // This could be equal to the number of handlers + a small number.
056  // TODO: St.Ack 20150903 Sounds good to me.
057  private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
058
059  public MultiVersionConcurrencyControl() {
060    this(null);
061  }
062
063  public MultiVersionConcurrencyControl(String regionName) {
064    this.regionName = regionName;
065  }
066
067  /**
068   * Construct and set read point. Write point is uninitialized.
069   */
070  public MultiVersionConcurrencyControl(long startPoint) {
071    this(null);
072    tryAdvanceTo(startPoint, NONE);
073  }
074
075  /**
076   * Step the MVCC forward on to a new read/write basis.
077   * @param newStartPoint
078   */
079  public void advanceTo(long newStartPoint) {
080    while (true) {
081      long seqId = this.getWritePoint();
082      if (seqId >= newStartPoint) {
083        break;
084      }
085      if (this.tryAdvanceTo(newStartPoint, seqId)) {
086        break;
087      }
088    }
089  }
090
091  /**
092   * Step the MVCC forward on to a new read/write basis.
093   * @param newStartPoint Point to move read and write points to.
094   * @param expected If not -1 (#NONE)
095   * @return Returns false if <code>expected</code> is not equal to the
096   * current <code>readPoint</code> or if <code>startPoint</code> is less than current
097   * <code>readPoint</code>
098   */
099  boolean tryAdvanceTo(long newStartPoint, long expected) {
100    synchronized (writeQueue) {
101      long currentRead = this.readPoint.get();
102      long currentWrite = this.writePoint.get();
103      if (currentRead != currentWrite) {
104        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
105          ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
106      }
107      if (expected != NONE && expected != currentRead) {
108        return false;
109      }
110
111      if (newStartPoint < currentRead) {
112        return false;
113      }
114
115      readPoint.set(newStartPoint);
116      writePoint.set(newStartPoint);
117    }
118    return true;
119  }
120
121  /**
122   * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
123   */
124  public WriteEntry begin() {
125    return begin(() -> {});
126  }
127
128  /**
129   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
130   * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
131   * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
132   * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
133   * the failed write transaction.
134   * <p>
135   * The {@code action} will be executed under the lock which means it can keep the same order with
136   * mvcc.
137   * @see #complete(WriteEntry)
138   * @see #completeAndWait(WriteEntry)
139   */
140  public WriteEntry begin(Runnable action) {
141    synchronized (writeQueue) {
142      long nextWriteNumber = writePoint.incrementAndGet();
143      WriteEntry e = new WriteEntry(nextWriteNumber);
144      writeQueue.add(e);
145      action.run();
146      return e;
147    }
148  }
149
150  /**
151   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
152   * to complete.
153   */
154  public void await() {
155    // Add a write and then wait on reads to catch up to it.
156    completeAndWait(begin());
157  }
158
159  /**
160   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
161   * read point catches up to our write.
162   *
163   * At the end of this call, the global read point is at least as large as the write point
164   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
165   */
166  public void completeAndWait(WriteEntry e) {
167    if (!complete(e)) {
168      waitForRead(e);
169    }
170  }
171
172  /**
173   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
174   * Call this even if the write has FAILED (AFTER backing out the write transaction
175   * changes completely) so we can clean up the outstanding transaction.
176   *
177   * How much is the read point advanced?
178   *
179   * Let S be the set of all write numbers that are completed. Set the read point to the highest
180   * numbered write of S.
181   *
182   * @param writeEntry
183   *
184   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
185   */
186  public boolean complete(WriteEntry writeEntry) {
187    synchronized (writeQueue) {
188      writeEntry.markCompleted();
189      long nextReadValue = NONE;
190      boolean ranOnce = false;
191      while (!writeQueue.isEmpty()) {
192        ranOnce = true;
193        WriteEntry queueFirst = writeQueue.getFirst();
194
195        if (nextReadValue > 0) {
196          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
197            throw new RuntimeException("Invariant in complete violated, nextReadValue="
198                + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
199          }
200        }
201
202        if (queueFirst.isCompleted()) {
203          nextReadValue = queueFirst.getWriteNumber();
204          writeQueue.removeFirst();
205        } else {
206          break;
207        }
208      }
209
210      if (!ranOnce) {
211        throw new RuntimeException("There is no first!");
212      }
213
214      if (nextReadValue > 0) {
215        synchronized (readWaiters) {
216          readPoint.set(nextReadValue);
217          readWaiters.notifyAll();
218        }
219      }
220      return readPoint.get() >= writeEntry.getWriteNumber();
221    }
222  }
223
224  /**
225   * Wait for the global readPoint to advance up to the passed in write entry number.
226   */
227  void waitForRead(WriteEntry e) {
228    boolean interrupted = false;
229    int count = 0;
230    synchronized (readWaiters) {
231      while (readPoint.get() < e.getWriteNumber()) {
232        if (count % 100 == 0 && count > 0) {
233          long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count;
234          LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this);
235        }
236        count++;
237        try {
238          readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME);
239        } catch (InterruptedException ie) {
240          // We were interrupted... finish the loop -- i.e. cleanup --and then
241          // on our way out, reset the interrupt flag.
242          interrupted = true;
243        }
244      }
245    }
246    if (interrupted) {
247      Thread.currentThread().interrupt();
248    }
249  }
250
251  @Override
252  public String toString() {
253    ToStringHelper helper = MoreObjects.toStringHelper(this).add("readPoint", readPoint)
254        .add("writePoint", writePoint);
255    if (this.regionName != null) {
256      helper.add("regionName", this.regionName);
257    }
258    return helper.toString();
259  }
260
261  public long getReadPoint() {
262    return readPoint.get();
263  }
264
265  public long getWritePoint() {
266    return writePoint.get();
267  }
268
269  /**
270   * Write number and whether write has completed given out at start of a write transaction.
271   * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
272   */
273  @InterfaceAudience.Private
274  public static class WriteEntry {
275    private final long writeNumber;
276    private boolean completed = false;
277
278    WriteEntry(long writeNumber) {
279      this.writeNumber = writeNumber;
280    }
281
282    void markCompleted() {
283      this.completed = true;
284    }
285
286    boolean isCompleted() {
287      return this.completed;
288    }
289
290    public long getWriteNumber() {
291      return this.writeNumber;
292    }
293
294    @Override
295    public String toString() {
296      return this.writeNumber + ", " + this.completed;
297    }
298  }
299
300  public static final long FIXED_SIZE = ClassSize.align(
301      ClassSize.OBJECT +
302      2 * Bytes.SIZEOF_LONG +
303      2 * ClassSize.REFERENCE);
304}