001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 022import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 023 024import java.util.LinkedList; 025import java.util.concurrent.atomic.AtomicLong; 026 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.hbase.util.ClassSize; 032 033 034/** 035 * Manages the read/write consistency. This provides an interface for readers to determine what 036 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" 037 * the new writes for readers to read (thus forming atomic transactions). 038 */ 039@InterfaceAudience.Private 040public class MultiVersionConcurrencyControl { 041 private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class); 042 043 final AtomicLong readPoint = new AtomicLong(0); 044 final AtomicLong writePoint = new AtomicLong(0); 045 private final Object readWaiters = new Object(); 046 /** 047 * Represents no value, or not set. 048 */ 049 public static final long NONE = -1; 050 051 // This is the pending queue of writes. 052 // 053 // TODO(eclark): Should this be an array of fixed size to 054 // reduce the number of allocations on the write path? 055 // This could be equal to the number of handlers + a small number. 056 // TODO: St.Ack 20150903 Sounds good to me. 057 private final LinkedList<WriteEntry> writeQueue = new LinkedList<>(); 058 059 public MultiVersionConcurrencyControl() { 060 super(); 061 } 062 063 /** 064 * Construct and set read point. Write point is uninitialized. 065 */ 066 public MultiVersionConcurrencyControl(long startPoint) { 067 tryAdvanceTo(startPoint, NONE); 068 } 069 070 /** 071 * Step the MVCC forward on to a new read/write basis. 072 * @param newStartPoint 073 */ 074 public void advanceTo(long newStartPoint) { 075 while (true) { 076 long seqId = this.getWritePoint(); 077 if (seqId >= newStartPoint) { 078 break; 079 } 080 if (this.tryAdvanceTo(newStartPoint, seqId)) { 081 break; 082 } 083 } 084 } 085 086 /** 087 * Step the MVCC forward on to a new read/write basis. 088 * @param newStartPoint Point to move read and write points to. 089 * @param expected If not -1 (#NONE) 090 * @return Returns false if <code>expected</code> is not equal to the 091 * current <code>readPoint</code> or if <code>startPoint</code> is less than current 092 * <code>readPoint</code> 093 */ 094 boolean tryAdvanceTo(long newStartPoint, long expected) { 095 synchronized (writeQueue) { 096 long currentRead = this.readPoint.get(); 097 long currentWrite = this.writePoint.get(); 098 if (currentRead != currentWrite) { 099 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead + 100 ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); 101 } 102 if (expected != NONE && expected != currentRead) { 103 return false; 104 } 105 106 if (newStartPoint < currentRead) { 107 return false; 108 } 109 110 readPoint.set(newStartPoint); 111 writePoint.set(newStartPoint); 112 } 113 return true; 114 } 115 116 /** 117 * Call {@link #begin(Runnable)} with an empty {@link Runnable}. 118 */ 119 public WriteEntry begin() { 120 return begin(() -> {}); 121 } 122 123 /** 124 * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it 125 * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write 126 * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the 127 * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of 128 * the failed write transaction. 129 * <p> 130 * The {@code action} will be executed under the lock which means it can keep the same order with 131 * mvcc. 132 * @see #complete(WriteEntry) 133 * @see #completeAndWait(WriteEntry) 134 */ 135 public WriteEntry begin(Runnable action) { 136 synchronized (writeQueue) { 137 long nextWriteNumber = writePoint.incrementAndGet(); 138 WriteEntry e = new WriteEntry(nextWriteNumber); 139 writeQueue.add(e); 140 action.run(); 141 return e; 142 } 143 } 144 145 /** 146 * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs 147 * to complete. 148 */ 149 public void await() { 150 // Add a write and then wait on reads to catch up to it. 151 completeAndWait(begin()); 152 } 153 154 /** 155 * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the 156 * read point catches up to our write. 157 * 158 * At the end of this call, the global read point is at least as large as the write point 159 * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. 160 */ 161 public void completeAndWait(WriteEntry e) { 162 if (!complete(e)) { 163 waitForRead(e); 164 } 165 } 166 167 /** 168 * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. 169 * Call this even if the write has FAILED (AFTER backing out the write transaction 170 * changes completely) so we can clean up the outstanding transaction. 171 * 172 * How much is the read point advanced? 173 * 174 * Let S be the set of all write numbers that are completed. Set the read point to the highest 175 * numbered write of S. 176 * 177 * @param writeEntry 178 * 179 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 180 */ 181 public boolean complete(WriteEntry writeEntry) { 182 synchronized (writeQueue) { 183 writeEntry.markCompleted(); 184 long nextReadValue = NONE; 185 boolean ranOnce = false; 186 while (!writeQueue.isEmpty()) { 187 ranOnce = true; 188 WriteEntry queueFirst = writeQueue.getFirst(); 189 190 if (nextReadValue > 0) { 191 if (nextReadValue + 1 != queueFirst.getWriteNumber()) { 192 throw new RuntimeException("Invariant in complete violated, nextReadValue=" 193 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); 194 } 195 } 196 197 if (queueFirst.isCompleted()) { 198 nextReadValue = queueFirst.getWriteNumber(); 199 writeQueue.removeFirst(); 200 } else { 201 break; 202 } 203 } 204 205 if (!ranOnce) { 206 throw new RuntimeException("There is no first!"); 207 } 208 209 if (nextReadValue > 0) { 210 synchronized (readWaiters) { 211 readPoint.set(nextReadValue); 212 readWaiters.notifyAll(); 213 } 214 } 215 return readPoint.get() >= writeEntry.getWriteNumber(); 216 } 217 } 218 219 /** 220 * Wait for the global readPoint to advance up to the passed in write entry number. 221 */ 222 void waitForRead(WriteEntry e) { 223 boolean interrupted = false; 224 int count = 0; 225 synchronized (readWaiters) { 226 while (readPoint.get() < e.getWriteNumber()) { 227 if (count % 100 == 0 && count > 0) { 228 LOG.warn("STUCK: " + this); 229 } 230 count++; 231 try { 232 readWaiters.wait(10); 233 } catch (InterruptedException ie) { 234 // We were interrupted... finish the loop -- i.e. cleanup --and then 235 // on our way out, reset the interrupt flag. 236 interrupted = true; 237 } 238 } 239 } 240 if (interrupted) { 241 Thread.currentThread().interrupt(); 242 } 243 } 244 245 @VisibleForTesting 246 @Override 247 public String toString() { 248 return MoreObjects.toStringHelper(this) 249 .add("readPoint", readPoint) 250 .add("writePoint", writePoint).toString(); 251 } 252 253 public long getReadPoint() { 254 return readPoint.get(); 255 } 256 257 @VisibleForTesting 258 public long getWritePoint() { 259 return writePoint.get(); 260 } 261 262 /** 263 * Write number and whether write has completed given out at start of a write transaction. 264 * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait. 265 */ 266 @InterfaceAudience.Private 267 public static class WriteEntry { 268 private final long writeNumber; 269 private boolean completed = false; 270 271 WriteEntry(long writeNumber) { 272 this.writeNumber = writeNumber; 273 } 274 275 void markCompleted() { 276 this.completed = true; 277 } 278 279 boolean isCompleted() { 280 return this.completed; 281 } 282 283 public long getWriteNumber() { 284 return this.writeNumber; 285 } 286 287 @Override 288 public String toString() { 289 return this.writeNumber + ", " + this.completed; 290 } 291 } 292 293 public static final long FIXED_SIZE = ClassSize.align( 294 ClassSize.OBJECT + 295 2 * Bytes.SIZEOF_LONG + 296 2 * ClassSize.REFERENCE); 297}