001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
022import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
023
024import java.util.LinkedList;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.hadoop.hbase.util.ClassSize;
032
033
034/**
035 * Manages the read/write consistency. This provides an interface for readers to determine what
036 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit"
037 * the new writes for readers to read (thus forming atomic transactions).
038 */
039@InterfaceAudience.Private
040public class MultiVersionConcurrencyControl {
041  private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class);
042
043  final AtomicLong readPoint = new AtomicLong(0);
044  final AtomicLong writePoint = new AtomicLong(0);
045  private final Object readWaiters = new Object();
046  /**
047   * Represents no value, or not set.
048   */
049  public static final long NONE = -1;
050
051  // This is the pending queue of writes.
052  //
053  // TODO(eclark): Should this be an array of fixed size to
054  // reduce the number of allocations on the write path?
055  // This could be equal to the number of handlers + a small number.
056  // TODO: St.Ack 20150903 Sounds good to me.
057  private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
058
059  public MultiVersionConcurrencyControl() {
060    super();
061  }
062
063  /**
064   * Construct and set read point. Write point is uninitialized.
065   */
066  public MultiVersionConcurrencyControl(long startPoint) {
067    tryAdvanceTo(startPoint, NONE);
068  }
069
070  /**
071   * Step the MVCC forward on to a new read/write basis.
072   * @param newStartPoint
073   */
074  public void advanceTo(long newStartPoint) {
075    while (true) {
076      long seqId = this.getWritePoint();
077      if (seqId >= newStartPoint) {
078        break;
079      }
080      if (this.tryAdvanceTo(newStartPoint, seqId)) {
081        break;
082      }
083    }
084  }
085
086  /**
087   * Step the MVCC forward on to a new read/write basis.
088   * @param newStartPoint Point to move read and write points to.
089   * @param expected If not -1 (#NONE)
090   * @return Returns false if <code>expected</code> is not equal to the
091   * current <code>readPoint</code> or if <code>startPoint</code> is less than current
092   * <code>readPoint</code>
093   */
094  boolean tryAdvanceTo(long newStartPoint, long expected) {
095    synchronized (writeQueue) {
096      long currentRead = this.readPoint.get();
097      long currentWrite = this.writePoint.get();
098      if (currentRead != currentWrite) {
099        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead +
100          ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
101      }
102      if (expected != NONE && expected != currentRead) {
103        return false;
104      }
105
106      if (newStartPoint < currentRead) {
107        return false;
108      }
109
110      readPoint.set(newStartPoint);
111      writePoint.set(newStartPoint);
112    }
113    return true;
114  }
115
116  /**
117   * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
118   */
119  public WriteEntry begin() {
120    return begin(() -> {});
121  }
122
123  /**
124   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
125   * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
126   * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
127   * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
128   * the failed write transaction.
129   * <p>
130   * The {@code action} will be executed under the lock which means it can keep the same order with
131   * mvcc.
132   * @see #complete(WriteEntry)
133   * @see #completeAndWait(WriteEntry)
134   */
135  public WriteEntry begin(Runnable action) {
136    synchronized (writeQueue) {
137      long nextWriteNumber = writePoint.incrementAndGet();
138      WriteEntry e = new WriteEntry(nextWriteNumber);
139      writeQueue.add(e);
140      action.run();
141      return e;
142    }
143  }
144
145  /**
146   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs
147   * to complete.
148   */
149  public void await() {
150    // Add a write and then wait on reads to catch up to it.
151    completeAndWait(begin());
152  }
153
154  /**
155   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the
156   * read point catches up to our write.
157   *
158   * At the end of this call, the global read point is at least as large as the write point
159   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
160   */
161  public void completeAndWait(WriteEntry e) {
162    if (!complete(e)) {
163      waitForRead(e);
164    }
165  }
166
167  /**
168   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible.
169   * Call this even if the write has FAILED (AFTER backing out the write transaction
170   * changes completely) so we can clean up the outstanding transaction.
171   *
172   * How much is the read point advanced?
173   *
174   * Let S be the set of all write numbers that are completed. Set the read point to the highest
175   * numbered write of S.
176   *
177   * @param writeEntry
178   *
179   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
180   */
181  public boolean complete(WriteEntry writeEntry) {
182    synchronized (writeQueue) {
183      writeEntry.markCompleted();
184      long nextReadValue = NONE;
185      boolean ranOnce = false;
186      while (!writeQueue.isEmpty()) {
187        ranOnce = true;
188        WriteEntry queueFirst = writeQueue.getFirst();
189
190        if (nextReadValue > 0) {
191          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
192            throw new RuntimeException("Invariant in complete violated, nextReadValue="
193                + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
194          }
195        }
196
197        if (queueFirst.isCompleted()) {
198          nextReadValue = queueFirst.getWriteNumber();
199          writeQueue.removeFirst();
200        } else {
201          break;
202        }
203      }
204
205      if (!ranOnce) {
206        throw new RuntimeException("There is no first!");
207      }
208
209      if (nextReadValue > 0) {
210        synchronized (readWaiters) {
211          readPoint.set(nextReadValue);
212          readWaiters.notifyAll();
213        }
214      }
215      return readPoint.get() >= writeEntry.getWriteNumber();
216    }
217  }
218
219  /**
220   * Wait for the global readPoint to advance up to the passed in write entry number.
221   */
222  void waitForRead(WriteEntry e) {
223    boolean interrupted = false;
224    int count = 0;
225    synchronized (readWaiters) {
226      while (readPoint.get() < e.getWriteNumber()) {
227        if (count % 100 == 0 && count > 0) {
228          LOG.warn("STUCK: " + this);
229        }
230        count++;
231        try {
232          readWaiters.wait(10);
233        } catch (InterruptedException ie) {
234          // We were interrupted... finish the loop -- i.e. cleanup --and then
235          // on our way out, reset the interrupt flag.
236          interrupted = true;
237        }
238      }
239    }
240    if (interrupted) {
241      Thread.currentThread().interrupt();
242    }
243  }
244
245  @VisibleForTesting
246  @Override
247  public String toString() {
248    return MoreObjects.toStringHelper(this)
249        .add("readPoint", readPoint)
250        .add("writePoint", writePoint).toString();
251  }
252
253  public long getReadPoint() {
254    return readPoint.get();
255  }
256
257  @VisibleForTesting
258  public long getWritePoint() {
259    return writePoint.get();
260  }
261
262  /**
263   * Write number and whether write has completed given out at start of a write transaction.
264   * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
265   */
266  @InterfaceAudience.Private
267  public static class WriteEntry {
268    private final long writeNumber;
269    private boolean completed = false;
270
271    WriteEntry(long writeNumber) {
272      this.writeNumber = writeNumber;
273    }
274
275    void markCompleted() {
276      this.completed = true;
277    }
278
279    boolean isCompleted() {
280      return this.completed;
281    }
282
283    public long getWriteNumber() {
284      return this.writeNumber;
285    }
286
287    @Override
288    public String toString() {
289      return this.writeNumber + ", " + this.completed;
290    }
291  }
292
293  public static final long FIXED_SIZE = ClassSize.align(
294      ClassSize.OBJECT +
295      2 * Bytes.SIZEOF_LONG +
296      2 * ClassSize.REFERENCE);
297}