001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.util.LinkedList;
021import java.util.Optional;
022import java.util.concurrent.atomic.AtomicLong;
023import org.apache.hadoop.hbase.util.Bytes;
024import org.apache.hadoop.hbase.util.ClassSize;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
030import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper;
031
032/**
033 * Manages the read/write consistency. This provides an interface for readers to determine what
034 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" the new
035 * writes for readers to read (thus forming atomic transactions).
036 */
037@InterfaceAudience.Private
038public class MultiVersionConcurrencyControl {
039  private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class);
040  private static final long READPOINT_ADVANCE_WAIT_TIME = 10L;
041
042  final String regionName;
043  final AtomicLong readPoint = new AtomicLong(0);
044  final AtomicLong writePoint = new AtomicLong(0);
045  private final Object readWaiters = new Object();
046  /**
047   * Represents no value, or not set.
048   */
049  public static final long NONE = -1;
050
051  // This is the pending queue of writes.
052  //
053  // TODO(eclark): Should this be an array of fixed size to
054  // reduce the number of allocations on the write path?
055  // This could be equal to the number of handlers + a small number.
056  // TODO: St.Ack 20150903 Sounds good to me.
057  private final LinkedList<WriteEntry> writeQueue = new LinkedList<>();
058
059  public MultiVersionConcurrencyControl() {
060    this(null);
061  }
062
063  public MultiVersionConcurrencyControl(String regionName) {
064    this.regionName = regionName;
065  }
066
067  /**
068   * Construct and set read point. Write point is uninitialized.
069   */
070  public MultiVersionConcurrencyControl(long startPoint) {
071    this(null);
072    tryAdvanceTo(startPoint, NONE);
073  }
074
075  /**
076   * Step the MVCC forward on to a new read/write basis.
077   */
078  public void advanceTo(long newStartPoint) {
079    while (true) {
080      long seqId = this.getWritePoint();
081      if (seqId >= newStartPoint) {
082        break;
083      }
084      if (this.tryAdvanceTo(newStartPoint, seqId)) {
085        break;
086      }
087    }
088  }
089
090  /**
091   * Step the MVCC forward on to a new read/write basis.
092   * @param newStartPoint Point to move read and write points to.
093   * @param expected      If not -1 (#NONE)
094   * @return Returns false if <code>expected</code> is not equal to the current
095   *         <code>readPoint</code> or if <code>startPoint</code> is less than current
096   *         <code>readPoint</code>
097   */
098  boolean tryAdvanceTo(long newStartPoint, long expected) {
099    synchronized (writeQueue) {
100      long currentRead = this.readPoint.get();
101      long currentWrite = this.writePoint.get();
102      if (currentRead != currentWrite) {
103        throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead
104          + ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo");
105      }
106      if (expected != NONE && expected != currentRead) {
107        return false;
108      }
109
110      if (newStartPoint < currentRead) {
111        return false;
112      }
113
114      readPoint.set(newStartPoint);
115      writePoint.set(newStartPoint);
116    }
117    return true;
118  }
119
120  /**
121   * Call {@link #begin(Runnable)} with an empty {@link Runnable}.
122   */
123  public WriteEntry begin() {
124    return begin(() -> {
125    });
126  }
127
128  /**
129   * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it
130   * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write
131   * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the
132   * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of
133   * the failed write transaction.
134   * <p>
135   * The {@code action} will be executed under the lock which means it can keep the same order with
136   * mvcc.
137   * @see #complete(WriteEntry)
138   * @see #completeAndWait(WriteEntry)
139   */
140  public WriteEntry begin(Runnable action) {
141    synchronized (writeQueue) {
142      long nextWriteNumber = writePoint.incrementAndGet();
143      WriteEntry e = new WriteEntry(nextWriteNumber);
144      writeQueue.add(e);
145      action.run();
146      return e;
147    }
148  }
149
150  /**
151   * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs to
152   * complete.
153   */
154  public void await() {
155    // Add a write and then wait on reads to catch up to it.
156    completeAndWait(begin());
157  }
158
159  /**
160   * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the read
161   * point catches up to our write. At the end of this call, the global read point is at least as
162   * large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC
163   * readers.
164   */
165  public void completeAndWait(WriteEntry e) {
166    if (!complete(e)) {
167      waitForRead(e);
168    }
169  }
170
171  /**
172   * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. Call
173   * this even if the write has FAILED (AFTER backing out the write transaction changes completely)
174   * so we can clean up the outstanding transaction. How much is the read point advanced? Let S be
175   * the set of all write numbers that are completed. Set the read point to the highest numbered
176   * write of S.
177   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
178   */
179  public boolean complete(WriteEntry writeEntry) {
180    synchronized (writeQueue) {
181      writeEntry.markCompleted();
182      long nextReadValue = NONE;
183      boolean ranOnce = false;
184      while (!writeQueue.isEmpty()) {
185        ranOnce = true;
186        WriteEntry queueFirst = writeQueue.getFirst();
187
188        if (nextReadValue > 0) {
189          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
190            throw new RuntimeException("Invariant in complete violated, nextReadValue="
191              + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
192          }
193        }
194
195        if (queueFirst.isCompleted()) {
196          nextReadValue = queueFirst.getWriteNumber();
197          writeQueue.removeFirst();
198          queueFirst.runCompletionAction();
199        } else {
200          break;
201        }
202      }
203
204      if (!ranOnce) {
205        throw new RuntimeException("There is no first!");
206      }
207
208      if (nextReadValue > 0) {
209        synchronized (readWaiters) {
210          readPoint.set(nextReadValue);
211          readWaiters.notifyAll();
212        }
213      }
214      return readPoint.get() >= writeEntry.getWriteNumber();
215    }
216  }
217
218  /**
219   * Wait for the global readPoint to advance up to the passed in write entry number.
220   */
221  void waitForRead(WriteEntry e) {
222    boolean interrupted = false;
223    int count = 0;
224    synchronized (readWaiters) {
225      while (readPoint.get() < e.getWriteNumber()) {
226        if (count % 100 == 0 && count > 0) {
227          long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count;
228          LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this);
229        }
230        count++;
231        try {
232          readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME);
233        } catch (InterruptedException ie) {
234          // We were interrupted... finish the loop -- i.e. cleanup --and then
235          // on our way out, reset the interrupt flag.
236          interrupted = true;
237        }
238      }
239    }
240    if (interrupted) {
241      Thread.currentThread().interrupt();
242    }
243  }
244
245  @Override
246  public String toString() {
247    ToStringHelper helper =
248      MoreObjects.toStringHelper(this).add("readPoint", readPoint).add("writePoint", writePoint);
249    if (this.regionName != null) {
250      helper.add("regionName", this.regionName);
251    }
252    return helper.toString();
253  }
254
255  public long getReadPoint() {
256    return readPoint.get();
257  }
258
259  public long getWritePoint() {
260    return writePoint.get();
261  }
262
263  /**
264   * Write number and whether write has completed given out at start of a write transaction. Every
265   * created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
266   */
267  @InterfaceAudience.Private
268  public static final class WriteEntry {
269    private final long writeNumber;
270    private boolean completed = false;
271    /**
272     * Will be called after completion, i.e, when being removed from the
273     * {@link MultiVersionConcurrencyControl#writeQueue}.
274     */
275    private Optional<Runnable> completionAction = Optional.empty();
276
277    private WriteEntry(long writeNumber) {
278      this.writeNumber = writeNumber;
279    }
280
281    private void markCompleted() {
282      this.completed = true;
283    }
284
285    private boolean isCompleted() {
286      return this.completed;
287    }
288
289    public void attachCompletionAction(Runnable action) {
290      assert !completionAction.isPresent();
291      completionAction = Optional.of(action);
292    }
293
294    private void runCompletionAction() {
295      completionAction.ifPresent(Runnable::run);
296    }
297
298    public Optional<Runnable> getCompletionAction() {
299      return completionAction;
300    }
301
302    public long getWriteNumber() {
303      return this.writeNumber;
304    }
305
306    @Override
307    public String toString() {
308      return this.writeNumber + ", " + this.completed;
309    }
310  }
311
312  public static final long FIXED_SIZE =
313    ClassSize.align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE);
314}