001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.util.LinkedList; 021import java.util.Optional; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.util.Bytes; 024import org.apache.hadoop.hbase.util.ClassSize; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 030import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper; 031 032/** 033 * Manages the read/write consistency. This provides an interface for readers to determine what 034 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" the new 035 * writes for readers to read (thus forming atomic transactions). 036 */ 037@InterfaceAudience.Private 038public class MultiVersionConcurrencyControl { 039 private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class); 040 private static final long READPOINT_ADVANCE_WAIT_TIME = 10L; 041 042 final String regionName; 043 final AtomicLong readPoint = new AtomicLong(0); 044 final AtomicLong writePoint = new AtomicLong(0); 045 private final Object readWaiters = new Object(); 046 /** 047 * Represents no value, or not set. 048 */ 049 public static final long NONE = -1; 050 051 // This is the pending queue of writes. 052 // 053 // TODO(eclark): Should this be an array of fixed size to 054 // reduce the number of allocations on the write path? 055 // This could be equal to the number of handlers + a small number. 056 // TODO: St.Ack 20150903 Sounds good to me. 057 private final LinkedList<WriteEntry> writeQueue = new LinkedList<>(); 058 059 public MultiVersionConcurrencyControl() { 060 this(null); 061 } 062 063 public MultiVersionConcurrencyControl(String regionName) { 064 this.regionName = regionName; 065 } 066 067 /** 068 * Construct and set read point. Write point is uninitialized. 069 */ 070 public MultiVersionConcurrencyControl(long startPoint) { 071 this(null); 072 tryAdvanceTo(startPoint, NONE); 073 } 074 075 /** 076 * Step the MVCC forward on to a new read/write basis. 077 */ 078 public void advanceTo(long newStartPoint) { 079 while (true) { 080 long seqId = this.getWritePoint(); 081 if (seqId >= newStartPoint) { 082 break; 083 } 084 if (this.tryAdvanceTo(newStartPoint, seqId)) { 085 break; 086 } 087 } 088 } 089 090 /** 091 * Step the MVCC forward on to a new read/write basis. 092 * @param newStartPoint Point to move read and write points to. 093 * @param expected If not -1 (#NONE) 094 * @return Returns false if <code>expected</code> is not equal to the current 095 * <code>readPoint</code> or if <code>startPoint</code> is less than current 096 * <code>readPoint</code> 097 */ 098 boolean tryAdvanceTo(long newStartPoint, long expected) { 099 synchronized (writeQueue) { 100 long currentRead = this.readPoint.get(); 101 long currentWrite = this.writePoint.get(); 102 if (currentRead != currentWrite) { 103 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead 104 + ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); 105 } 106 if (expected != NONE && expected != currentRead) { 107 return false; 108 } 109 110 if (newStartPoint < currentRead) { 111 return false; 112 } 113 114 readPoint.set(newStartPoint); 115 writePoint.set(newStartPoint); 116 } 117 return true; 118 } 119 120 /** 121 * Call {@link #begin(Runnable)} with an empty {@link Runnable}. 122 */ 123 public WriteEntry begin() { 124 return begin(() -> { 125 }); 126 } 127 128 /** 129 * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it 130 * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write 131 * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the 132 * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of 133 * the failed write transaction. 134 * <p> 135 * The {@code action} will be executed under the lock which means it can keep the same order with 136 * mvcc. 137 * @see #complete(WriteEntry) 138 * @see #completeAndWait(WriteEntry) 139 */ 140 public WriteEntry begin(Runnable action) { 141 synchronized (writeQueue) { 142 long nextWriteNumber = writePoint.incrementAndGet(); 143 WriteEntry e = new WriteEntry(nextWriteNumber); 144 writeQueue.add(e); 145 action.run(); 146 return e; 147 } 148 } 149 150 /** 151 * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs to 152 * complete. 153 */ 154 public void await() { 155 // Add a write and then wait on reads to catch up to it. 156 completeAndWait(begin()); 157 } 158 159 /** 160 * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the read 161 * point catches up to our write. At the end of this call, the global read point is at least as 162 * large as the write point of the passed in WriteEntry. Thus, the write is visible to MVCC 163 * readers. 164 */ 165 public void completeAndWait(WriteEntry e) { 166 if (!complete(e)) { 167 waitForRead(e); 168 } 169 } 170 171 /** 172 * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. Call 173 * this even if the write has FAILED (AFTER backing out the write transaction changes completely) 174 * so we can clean up the outstanding transaction. How much is the read point advanced? Let S be 175 * the set of all write numbers that are completed. Set the read point to the highest numbered 176 * write of S. 177 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 178 */ 179 public boolean complete(WriteEntry writeEntry) { 180 synchronized (writeQueue) { 181 writeEntry.markCompleted(); 182 long nextReadValue = NONE; 183 boolean ranOnce = false; 184 while (!writeQueue.isEmpty()) { 185 ranOnce = true; 186 WriteEntry queueFirst = writeQueue.getFirst(); 187 188 if (nextReadValue > 0) { 189 if (nextReadValue + 1 != queueFirst.getWriteNumber()) { 190 throw new RuntimeException("Invariant in complete violated, nextReadValue=" 191 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); 192 } 193 } 194 195 if (queueFirst.isCompleted()) { 196 nextReadValue = queueFirst.getWriteNumber(); 197 writeQueue.removeFirst(); 198 queueFirst.runCompletionAction(); 199 } else { 200 break; 201 } 202 } 203 204 if (!ranOnce) { 205 throw new RuntimeException("There is no first!"); 206 } 207 208 if (nextReadValue > 0) { 209 synchronized (readWaiters) { 210 readPoint.set(nextReadValue); 211 readWaiters.notifyAll(); 212 } 213 } 214 return readPoint.get() >= writeEntry.getWriteNumber(); 215 } 216 } 217 218 /** 219 * Wait for the global readPoint to advance up to the passed in write entry number. 220 */ 221 void waitForRead(WriteEntry e) { 222 boolean interrupted = false; 223 int count = 0; 224 synchronized (readWaiters) { 225 while (readPoint.get() < e.getWriteNumber()) { 226 if (count % 100 == 0 && count > 0) { 227 long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count; 228 LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this); 229 } 230 count++; 231 try { 232 readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME); 233 } catch (InterruptedException ie) { 234 // We were interrupted... finish the loop -- i.e. cleanup --and then 235 // on our way out, reset the interrupt flag. 236 interrupted = true; 237 } 238 } 239 } 240 if (interrupted) { 241 Thread.currentThread().interrupt(); 242 } 243 } 244 245 @Override 246 public String toString() { 247 ToStringHelper helper = 248 MoreObjects.toStringHelper(this).add("readPoint", readPoint).add("writePoint", writePoint); 249 if (this.regionName != null) { 250 helper.add("regionName", this.regionName); 251 } 252 return helper.toString(); 253 } 254 255 public long getReadPoint() { 256 return readPoint.get(); 257 } 258 259 public long getWritePoint() { 260 return writePoint.get(); 261 } 262 263 /** 264 * Write number and whether write has completed given out at start of a write transaction. Every 265 * created WriteEntry must be completed by calling mvcc#complete or #completeAndWait. 266 */ 267 @InterfaceAudience.Private 268 public static final class WriteEntry { 269 private final long writeNumber; 270 private boolean completed = false; 271 /** 272 * Will be called after completion, i.e, when being removed from the 273 * {@link MultiVersionConcurrencyControl#writeQueue}. 274 */ 275 private Optional<Runnable> completionAction = Optional.empty(); 276 277 private WriteEntry(long writeNumber) { 278 this.writeNumber = writeNumber; 279 } 280 281 private void markCompleted() { 282 this.completed = true; 283 } 284 285 private boolean isCompleted() { 286 return this.completed; 287 } 288 289 public void attachCompletionAction(Runnable action) { 290 assert !completionAction.isPresent(); 291 completionAction = Optional.of(action); 292 } 293 294 private void runCompletionAction() { 295 completionAction.ifPresent(Runnable::run); 296 } 297 298 public Optional<Runnable> getCompletionAction() { 299 return completionAction; 300 } 301 302 public long getWriteNumber() { 303 return this.writeNumber; 304 } 305 306 @Override 307 public String toString() { 308 return this.writeNumber + ", " + this.completed; 309 } 310 } 311 312 public static final long FIXED_SIZE = 313 ClassSize.align(ClassSize.OBJECT + 2 * Bytes.SIZEOF_LONG + 2 * ClassSize.REFERENCE); 314}