View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.LinkedList;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.hbase.util.ClassSize;
28  
29  /**
30   * Manages the read/write consistency within memstore. This provides
31   * an interface for readers to determine what entries to ignore, and
32   * a mechanism for writers to obtain new write numbers, then "commit"
33   * the new writes for readers to read (thus forming atomic transactions).
34   */
35  @InterfaceAudience.Private
36  public class MultiVersionConsistencyControl {
37    private static final long NO_WRITE_NUMBER = 0;
38    private volatile long memstoreRead = 0; 
39    private final Object readWaiters = new Object();
40  
41    // This is the pending queue of writes.
42    private final LinkedList<WriteEntry> writeQueue =
43        new LinkedList<WriteEntry>();
44    
45    /**
46     * Default constructor. Initializes the memstoreRead/Write points to 0.
47     */
48    public MultiVersionConsistencyControl() {
49    }
50  
51    /**
52     * Initializes the memstoreRead/Write points appropriately.
53     * @param startPoint
54     */
55    public void initialize(long startPoint) {
56      synchronized (writeQueue) {
57        writeQueue.clear();
58        memstoreRead = startPoint;
59      }
60    }
61  
62    /**
63     * 
64     * @param initVal The value we used initially and expected it'll be reset later
65     * @return WriteEntry instance.
66     */
67    WriteEntry beginMemstoreInsert() {
68      return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
69    }
70    
71    /**
72     * Get a mvcc write number before an actual one(its log sequence Id) being assigned
73     * @param sequenceId
74     * @return long a faked write number which is bigger enough not to be seen by others before a real
75     *         one is assigned
76     */
77    public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
78      // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
79      // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
80      // because each handler could increment sequence num twice and max concurrent in-flight
81      // transactions is the number of RPC handlers.
82      // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
83      // changes touch same row key
84      // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
85      // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
86      return sequenceId.incrementAndGet() + 1000000000;  
87    }
88    
89    /**
90     * This function starts a MVCC transaction with current region's log change sequence number. Since
91     * we set change sequence number when flushing current change to WAL(late binding), the flush
92     * order may differ from the order to start a MVCC transaction. For example, a change begins a
93     * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
94     * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
95     * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
96     * big number is safe because we only need it to prevent current change being seen and the number
97     * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
98     * for MVCC to align with flush sequence.
99     * @param curSeqNum
100    * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
101    */
102   public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
103     WriteEntry e = new WriteEntry(curSeqNum);
104     synchronized (writeQueue) {
105       writeQueue.add(e);
106       return e;
107     }
108   }
109 
110   /**
111    * Complete a {@link WriteEntry} that was created by
112    * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
113    * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
114    * visible to MVCC readers.
115    * @throws IOException
116    */
117   public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceNumber seqNum)
118       throws IOException {
119     if(e == null) return;
120     if (seqNum != null) {
121       e.setWriteNumber(seqNum.getSequenceNumber());
122     } else {
123       // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
124       // function beginMemstoreInsertWithSeqNum in case of failures
125       e.setWriteNumber(NO_WRITE_NUMBER);
126     }
127     waitForPreviousTransactionsComplete(e);
128   }
129   
130   /**
131    * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
132    * end of this call, the global read point is at least as large as the write point of the passed
133    * in WriteEntry. Thus, the write is visible to MVCC readers.
134    */
135   public void completeMemstoreInsert(WriteEntry e) {
136     waitForPreviousTransactionsComplete(e);
137   }
138 
139   /**
140    * Mark the {@link WriteEntry} as complete and advance the read point as
141    * much as possible.
142    *
143    * How much is the read point advanced?
144    * Let S be the set of all write numbers that are completed and where all previous write numbers
145    * are also completed.  Then, the read point is advanced to the supremum of S.
146    *
147    * @param e
148    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
149    */
150   boolean advanceMemstore(WriteEntry e) {
151     long nextReadValue = -1;
152     synchronized (writeQueue) {
153       e.markCompleted();
154 
155       while (!writeQueue.isEmpty()) {
156         WriteEntry queueFirst = writeQueue.getFirst();
157         if (queueFirst.isCompleted()) {
158           // Using Max because Edit complete in WAL sync order not arriving order
159           nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
160           writeQueue.removeFirst();
161         } else {
162           break;
163         }
164       }
165 
166       if (nextReadValue > memstoreRead) {
167         memstoreRead = nextReadValue;
168       }
169 
170       // notify waiters on writeQueue before return
171       writeQueue.notifyAll();
172     }
173 
174     if (nextReadValue > 0) {
175       synchronized (readWaiters) {
176         readWaiters.notifyAll();
177       }
178     }
179 
180     if (memstoreRead >= e.getWriteNumber()) {
181       return true;
182     }
183     return false;
184   }
185 
186   /**
187    * Wait for all previous MVCC transactions complete
188    */
189   public void waitForPreviousTransactionsComplete() {
190     WriteEntry w = beginMemstoreInsert();
191     waitForPreviousTransactionsComplete(w);
192   }
193   
194   public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
195     boolean interrupted = false;
196     WriteEntry w = waitedEntry;
197 
198     try {
199       WriteEntry firstEntry = null;
200       do {
201         synchronized (writeQueue) {
202           // writeQueue won't be empty at this point, the following is just a safety check
203           if (writeQueue.isEmpty()) {
204             break;
205           }
206           firstEntry = writeQueue.getFirst();
207           if (firstEntry == w) {
208             // all previous in-flight transactions are done
209             break;
210           }
211           try {
212             writeQueue.wait(0);
213           } catch (InterruptedException ie) {
214             // We were interrupted... finish the loop -- i.e. cleanup --and then
215             // on our way out, reset the interrupt flag.
216             interrupted = true;
217             break;
218           }
219         }
220       } while (firstEntry != null);
221     } finally {
222       if (w != null) {
223         advanceMemstore(w);
224       }
225     }
226     if (interrupted) {
227       Thread.currentThread().interrupt();
228     }
229   }
230 
231   public long memstoreReadPoint() {
232     return memstoreRead;
233   }
234 
235   public static class WriteEntry {
236     private long writeNumber;
237     private boolean completed = false;
238 
239     WriteEntry(long writeNumber) {
240       this.writeNumber = writeNumber;
241     }
242     void markCompleted() {
243       this.completed = true;
244     }
245     boolean isCompleted() {
246       return this.completed;
247     }
248     long getWriteNumber() {
249       return this.writeNumber;
250     }
251     void setWriteNumber(long val){
252       this.writeNumber = val;
253     }
254   }
255 
256   public static final long FIXED_SIZE = ClassSize.align(
257       ClassSize.OBJECT +
258       2 * Bytes.SIZEOF_LONG +
259       2 * ClassSize.REFERENCE);
260 
261 }