001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.util.LinkedList; 022import java.util.concurrent.atomic.AtomicLong; 023import org.apache.hadoop.hbase.util.Bytes; 024import org.apache.hadoop.hbase.util.ClassSize; 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; 030import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects.ToStringHelper; 031 032/** 033 * Manages the read/write consistency. This provides an interface for readers to determine what 034 * entries to ignore, and a mechanism for writers to obtain new write numbers, then "commit" 035 * the new writes for readers to read (thus forming atomic transactions). 036 */ 037@InterfaceAudience.Private 038public class MultiVersionConcurrencyControl { 039 private static final Logger LOG = LoggerFactory.getLogger(MultiVersionConcurrencyControl.class); 040 private static final long READPOINT_ADVANCE_WAIT_TIME = 10L; 041 042 final String regionName; 043 final AtomicLong readPoint = new AtomicLong(0); 044 final AtomicLong writePoint = new AtomicLong(0); 045 private final Object readWaiters = new Object(); 046 /** 047 * Represents no value, or not set. 048 */ 049 public static final long NONE = -1; 050 051 // This is the pending queue of writes. 052 // 053 // TODO(eclark): Should this be an array of fixed size to 054 // reduce the number of allocations on the write path? 055 // This could be equal to the number of handlers + a small number. 056 // TODO: St.Ack 20150903 Sounds good to me. 057 private final LinkedList<WriteEntry> writeQueue = new LinkedList<>(); 058 059 public MultiVersionConcurrencyControl() { 060 this(null); 061 } 062 063 public MultiVersionConcurrencyControl(String regionName) { 064 this.regionName = regionName; 065 } 066 067 /** 068 * Construct and set read point. Write point is uninitialized. 069 */ 070 public MultiVersionConcurrencyControl(long startPoint) { 071 this(null); 072 tryAdvanceTo(startPoint, NONE); 073 } 074 075 /** 076 * Step the MVCC forward on to a new read/write basis. 077 * @param newStartPoint 078 */ 079 public void advanceTo(long newStartPoint) { 080 while (true) { 081 long seqId = this.getWritePoint(); 082 if (seqId >= newStartPoint) { 083 break; 084 } 085 if (this.tryAdvanceTo(newStartPoint, seqId)) { 086 break; 087 } 088 } 089 } 090 091 /** 092 * Step the MVCC forward on to a new read/write basis. 093 * @param newStartPoint Point to move read and write points to. 094 * @param expected If not -1 (#NONE) 095 * @return Returns false if <code>expected</code> is not equal to the 096 * current <code>readPoint</code> or if <code>startPoint</code> is less than current 097 * <code>readPoint</code> 098 */ 099 boolean tryAdvanceTo(long newStartPoint, long expected) { 100 synchronized (writeQueue) { 101 long currentRead = this.readPoint.get(); 102 long currentWrite = this.writePoint.get(); 103 if (currentRead != currentWrite) { 104 throw new RuntimeException("Already used this mvcc; currentRead=" + currentRead + 105 ", currentWrite=" + currentWrite + "; too late to tryAdvanceTo"); 106 } 107 if (expected != NONE && expected != currentRead) { 108 return false; 109 } 110 111 if (newStartPoint < currentRead) { 112 return false; 113 } 114 115 readPoint.set(newStartPoint); 116 writePoint.set(newStartPoint); 117 } 118 return true; 119 } 120 121 /** 122 * Call {@link #begin(Runnable)} with an empty {@link Runnable}. 123 */ 124 public WriteEntry begin() { 125 return begin(() -> {}); 126 } 127 128 /** 129 * Start a write transaction. Create a new {@link WriteEntry} with a new write number and add it 130 * to our queue of ongoing writes. Return this WriteEntry instance. To complete the write 131 * transaction and wait for it to be visible, call {@link #completeAndWait(WriteEntry)}. If the 132 * write failed, call {@link #complete(WriteEntry)} so we can clean up AFTER removing ALL trace of 133 * the failed write transaction. 134 * <p> 135 * The {@code action} will be executed under the lock which means it can keep the same order with 136 * mvcc. 137 * @see #complete(WriteEntry) 138 * @see #completeAndWait(WriteEntry) 139 */ 140 public WriteEntry begin(Runnable action) { 141 synchronized (writeQueue) { 142 long nextWriteNumber = writePoint.incrementAndGet(); 143 WriteEntry e = new WriteEntry(nextWriteNumber); 144 writeQueue.add(e); 145 action.run(); 146 return e; 147 } 148 } 149 150 /** 151 * Wait until the read point catches up to the write point; i.e. wait on all outstanding mvccs 152 * to complete. 153 */ 154 public void await() { 155 // Add a write and then wait on reads to catch up to it. 156 completeAndWait(begin()); 157 } 158 159 /** 160 * Complete a {@link WriteEntry} that was created by {@link #begin()} then wait until the 161 * read point catches up to our write. 162 * 163 * At the end of this call, the global read point is at least as large as the write point 164 * of the passed in WriteEntry. Thus, the write is visible to MVCC readers. 165 */ 166 public void completeAndWait(WriteEntry e) { 167 if (!complete(e)) { 168 waitForRead(e); 169 } 170 } 171 172 /** 173 * Mark the {@link WriteEntry} as complete and advance the read point as much as possible. 174 * Call this even if the write has FAILED (AFTER backing out the write transaction 175 * changes completely) so we can clean up the outstanding transaction. 176 * 177 * How much is the read point advanced? 178 * 179 * Let S be the set of all write numbers that are completed. Set the read point to the highest 180 * numbered write of S. 181 * 182 * @param writeEntry 183 * 184 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 185 */ 186 public boolean complete(WriteEntry writeEntry) { 187 synchronized (writeQueue) { 188 writeEntry.markCompleted(); 189 long nextReadValue = NONE; 190 boolean ranOnce = false; 191 while (!writeQueue.isEmpty()) { 192 ranOnce = true; 193 WriteEntry queueFirst = writeQueue.getFirst(); 194 195 if (nextReadValue > 0) { 196 if (nextReadValue + 1 != queueFirst.getWriteNumber()) { 197 throw new RuntimeException("Invariant in complete violated, nextReadValue=" 198 + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); 199 } 200 } 201 202 if (queueFirst.isCompleted()) { 203 nextReadValue = queueFirst.getWriteNumber(); 204 writeQueue.removeFirst(); 205 } else { 206 break; 207 } 208 } 209 210 if (!ranOnce) { 211 throw new RuntimeException("There is no first!"); 212 } 213 214 if (nextReadValue > 0) { 215 synchronized (readWaiters) { 216 readPoint.set(nextReadValue); 217 readWaiters.notifyAll(); 218 } 219 } 220 return readPoint.get() >= writeEntry.getWriteNumber(); 221 } 222 } 223 224 /** 225 * Wait for the global readPoint to advance up to the passed in write entry number. 226 */ 227 void waitForRead(WriteEntry e) { 228 boolean interrupted = false; 229 int count = 0; 230 synchronized (readWaiters) { 231 while (readPoint.get() < e.getWriteNumber()) { 232 if (count % 100 == 0 && count > 0) { 233 long totalWaitTillNow = READPOINT_ADVANCE_WAIT_TIME * count; 234 LOG.warn("STUCK for : " + totalWaitTillNow + " millis. " + this); 235 } 236 count++; 237 try { 238 readWaiters.wait(READPOINT_ADVANCE_WAIT_TIME); 239 } catch (InterruptedException ie) { 240 // We were interrupted... finish the loop -- i.e. cleanup --and then 241 // on our way out, reset the interrupt flag. 242 interrupted = true; 243 } 244 } 245 } 246 if (interrupted) { 247 Thread.currentThread().interrupt(); 248 } 249 } 250 251 @Override 252 public String toString() { 253 ToStringHelper helper = MoreObjects.toStringHelper(this).add("readPoint", readPoint) 254 .add("writePoint", writePoint); 255 if (this.regionName != null) { 256 helper.add("regionName", this.regionName); 257 } 258 return helper.toString(); 259 } 260 261 public long getReadPoint() { 262 return readPoint.get(); 263 } 264 265 public long getWritePoint() { 266 return writePoint.get(); 267 } 268 269 /** 270 * Write number and whether write has completed given out at start of a write transaction. 271 * Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait. 272 */ 273 @InterfaceAudience.Private 274 public static class WriteEntry { 275 private final long writeNumber; 276 private boolean completed = false; 277 278 WriteEntry(long writeNumber) { 279 this.writeNumber = writeNumber; 280 } 281 282 void markCompleted() { 283 this.completed = true; 284 } 285 286 boolean isCompleted() { 287 return this.completed; 288 } 289 290 public long getWriteNumber() { 291 return this.writeNumber; 292 } 293 294 @Override 295 public String toString() { 296 return this.writeNumber + ", " + this.completed; 297 } 298 } 299 300 public static final long FIXED_SIZE = ClassSize.align( 301 ClassSize.OBJECT + 302 2 * Bytes.SIZEOF_LONG + 303 2 * ClassSize.REFERENCE); 304}