1 /** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, software 14 * distributed under the License is distributed on an "AS IS" BASIS, 15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 16 * See the License for the specific language governing permissions and 17 * limitations under the License. 18 */ 19 package org.apache.hadoop.hbase.regionserver; 20 21 import java.io.IOException; 22 import java.util.LinkedHashSet; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 import org.apache.hadoop.hbase.classification.InterfaceAudience; 26 import org.apache.hadoop.hbase.util.Bytes; 27 import org.apache.hadoop.hbase.util.ClassSize; 28 29 /** 30 * Manages the read/write consistency within memstore. This provides 31 * an interface for readers to determine what entries to ignore, and 32 * a mechanism for writers to obtain new write numbers, then "commit" 33 * the new writes for readers to read (thus forming atomic transactions). 34 */ 35 @InterfaceAudience.Private 36 public class MultiVersionConsistencyControl { 37 static final long NO_WRITE_NUMBER = 0; 38 private volatile long memstoreRead = 0; 39 private final Object readWaiters = new Object(); 40 41 // This is the pending queue of writes. 42 private final LinkedHashSet<WriteEntry> writeQueue = 43 new LinkedHashSet<WriteEntry>(); 44 45 /** 46 * Default constructor. Initializes the memstoreRead/Write points to 0. 47 */ 48 public MultiVersionConsistencyControl() { 49 } 50 51 /** 52 * Initializes the memstoreRead/Write points appropriately. 53 * @param startPoint 54 */ 55 public void initialize(long startPoint) { 56 synchronized (writeQueue) { 57 writeQueue.clear(); 58 memstoreRead = startPoint; 59 } 60 } 61 62 /** 63 * 64 * @param initVal The value we used initially and expected it'll be reset later 65 * @return WriteEntry instance. 66 */ 67 WriteEntry beginMemstoreInsert() { 68 return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER); 69 } 70 71 /** 72 * Get a mvcc write number before an actual one(its log sequence Id) being assigned 73 * @param sequenceId 74 * @return long a faked write number which is bigger enough not to be seen by others before a real 75 * one is assigned 76 */ 77 public static long getPreAssignedWriteNumber(AtomicLong sequenceId) { 78 // the 1 billion is just an arbitrary big number to guard no scanner will reach it before 79 // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers 80 // because each handler could increment sequence num twice and max concurrent in-flight 81 // transactions is the number of RPC handlers. 82 // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple 83 // changes touch same row key. 84 // If for any reason, the bumped value isn't reset due to failure situations, we'll reset 85 // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all. 86 // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done? 87 return sequenceId.incrementAndGet() + 1000000000; 88 } 89 90 /** 91 * This function starts a MVCC transaction with current region's log change sequence number. Since 92 * we set change sequence number when flushing current change to WAL(late binding), the flush 93 * order may differ from the order to start a MVCC transaction. For example, a change begins a 94 * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we 95 * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent 96 * transactions will reuse the number till current MVCC completes(success or fail). The "faked" 97 * big number is safe because we only need it to prevent current change being seen and the number 98 * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order 99 * for MVCC to align with flush sequence. 100 * @param curSeqNum 101 * @return WriteEntry a WriteEntry instance with the passed in curSeqNum 102 */ 103 public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) { 104 return beginMemstoreInsertWithSeqNum(curSeqNum, false); 105 } 106 107 private WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum, boolean complete) { 108 WriteEntry e = new WriteEntry(curSeqNum); 109 if (complete) { 110 e.markCompleted(); 111 } 112 synchronized (writeQueue) { 113 writeQueue.add(e); 114 return e; 115 } 116 } 117 118 /** 119 * Complete a {@link WriteEntry} that was created by 120 * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read 121 * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is 122 * visible to MVCC readers. 123 * @throws IOException 124 */ 125 public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId) 126 throws IOException { 127 if(e == null) return; 128 if (seqId != null) { 129 e.setWriteNumber(seqId.getSequenceId()); 130 } else { 131 // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside 132 // function beginMemstoreInsertWithSeqNum in case of failures 133 e.setWriteNumber(NO_WRITE_NUMBER); 134 } 135 waitForPreviousTransactionsComplete(e); 136 } 137 138 /** 139 * Cancel a write insert that failed. 140 * Removes the write entry without advancing read point or without interfering with write 141 * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method 142 * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see 143 * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark 144 * it as for special handling). 145 * @param writeEntry Failed attempt at write. Does cleanup. 146 */ 147 public void cancelMemstoreInsert(WriteEntry writeEntry) { 148 // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance 149 // readpoint and gets my little writeEntry completed and removed from queue of outstanding 150 // events which seems right. St.Ack 20150901. 151 writeEntry.setWriteNumber(NO_WRITE_NUMBER); 152 advanceMemstore(writeEntry); 153 } 154 155 /** 156 * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the 157 * end of this call, the global read point is at least as large as the write point of the passed 158 * in WriteEntry. Thus, the write is visible to MVCC readers. 159 */ 160 public void completeMemstoreInsert(WriteEntry e) { 161 waitForPreviousTransactionsComplete(e); 162 } 163 164 /** 165 * Mark the {@link WriteEntry} as complete and advance the read point as 166 * much as possible. 167 * 168 * How much is the read point advanced? 169 * Let S be the set of all write numbers that are completed and where all previous write numbers 170 * are also completed. Then, the read point is advanced to the supremum of S. 171 * 172 * @param e 173 * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber) 174 */ 175 boolean advanceMemstore(WriteEntry e) { 176 long nextReadValue = -1; 177 synchronized (writeQueue) { 178 e.markCompleted(); 179 180 while (!writeQueue.isEmpty()) { 181 WriteEntry queueFirst = writeQueue.iterator().next(); 182 if (queueFirst.isCompleted()) { 183 // Using Max because Edit complete in WAL sync order not arriving order 184 nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber()); 185 writeQueue.remove(queueFirst); 186 } else { 187 break; 188 } 189 } 190 191 if (nextReadValue > memstoreRead) { 192 memstoreRead = nextReadValue; 193 } 194 195 // notify waiters on writeQueue before return 196 writeQueue.notifyAll(); 197 } 198 199 if (nextReadValue > 0) { 200 synchronized (readWaiters) { 201 readWaiters.notifyAll(); 202 } 203 } 204 205 if (memstoreRead >= e.getWriteNumber()) { 206 return true; 207 } 208 return false; 209 } 210 211 /** 212 * Advances the current read point to be given seqNum if it is smaller than 213 * that. 214 */ 215 void advanceMemstoreReadPointIfNeeded(long seqNum) { 216 synchronized (writeQueue) { 217 if (this.memstoreRead < seqNum) { 218 memstoreRead = seqNum; 219 } 220 } 221 } 222 223 /** 224 * Wait for all previous MVCC transactions complete 225 */ 226 public void waitForPreviousTransactionsComplete() { 227 WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true); 228 waitForPreviousTransactionsComplete(w); 229 } 230 231 public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) { 232 boolean interrupted = false; 233 WriteEntry w = waitedEntry; 234 w.markCompleted(); 235 236 try { 237 WriteEntry firstEntry = null; 238 do { 239 synchronized (writeQueue) { 240 if (writeQueue.isEmpty()) { 241 break; 242 } 243 firstEntry = writeQueue.iterator().next(); 244 if (firstEntry == w) { 245 // all previous in-flight transactions are done 246 break; 247 } 248 // WriteEntry already was removed from the queue by another handler 249 if (!writeQueue.contains(w)) { 250 break; 251 } 252 try { 253 writeQueue.wait(0); 254 } catch (InterruptedException ie) { 255 // We were interrupted... finish the loop -- i.e. cleanup --and then 256 // on our way out, reset the interrupt flag. 257 interrupted = true; 258 break; 259 } 260 } 261 } while (firstEntry != null); 262 } finally { 263 advanceMemstore(w); 264 } 265 if (interrupted) { 266 Thread.currentThread().interrupt(); 267 } 268 } 269 270 public long memstoreReadPoint() { 271 return memstoreRead; 272 } 273 274 public static class WriteEntry { 275 private long writeNumber; 276 private volatile boolean completed = false; 277 278 WriteEntry(long writeNumber) { 279 this.writeNumber = writeNumber; 280 } 281 void markCompleted() { 282 this.completed = true; 283 } 284 boolean isCompleted() { 285 return this.completed; 286 } 287 long getWriteNumber() { 288 return this.writeNumber; 289 } 290 void setWriteNumber(long val){ 291 this.writeNumber = val; 292 } 293 } 294 295 public static final long FIXED_SIZE = ClassSize.align( 296 ClassSize.OBJECT + 297 2 * Bytes.SIZEOF_LONG + 298 2 * ClassSize.REFERENCE); 299 300 }