View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.LinkedList;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.hbase.util.ClassSize;
28  
29  /**
30   * Manages the read/write consistency within memstore. This provides
31   * an interface for readers to determine what entries to ignore, and
32   * a mechanism for writers to obtain new write numbers, then "commit"
33   * the new writes for readers to read (thus forming atomic transactions).
34   */
35  @InterfaceAudience.Private
36  public class MultiVersionConsistencyControl {
37    private static final long NO_WRITE_NUMBER = 0;
38    private volatile long memstoreRead = 0;
39    private final Object readWaiters = new Object();
40  
41    // This is the pending queue of writes.
42    private final LinkedList<WriteEntry> writeQueue =
43        new LinkedList<WriteEntry>();
44  
45    /**
46     * Default constructor. Initializes the memstoreRead/Write points to 0.
47     */
48    public MultiVersionConsistencyControl() {
49    }
50  
51    /**
52     * Initializes the memstoreRead/Write points appropriately.
53     * @param startPoint
54     */
55    public void initialize(long startPoint) {
56      synchronized (writeQueue) {
57        writeQueue.clear();
58        memstoreRead = startPoint;
59      }
60    }
61  
62    /**
63     *
64     * @param initVal The value we used initially and expected it'll be reset later
65     * @return WriteEntry instance.
66     */
67    WriteEntry beginMemstoreInsert() {
68      return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
69    }
70  
71    /**
72     * Get a mvcc write number before an actual one(its log sequence Id) being assigned
73     * @param sequenceId
74     * @return long a faked write number which is bigger enough not to be seen by others before a real
75     *         one is assigned
76     */
77    public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
78      // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
79      // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
80      // because each handler could increment sequence num twice and max concurrent in-flight
81      // transactions is the number of RPC handlers.
82      // we can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
83      // changes touch same row key
84      // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
85      // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all
86      return sequenceId.incrementAndGet() + 1000000000;
87    }
88  
89    /**
90     * This function starts a MVCC transaction with current region's log change sequence number. Since
91     * we set change sequence number when flushing current change to WAL(late binding), the flush
92     * order may differ from the order to start a MVCC transaction. For example, a change begins a
93     * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
94     * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
95     * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
96     * big number is safe because we only need it to prevent current change being seen and the number
97     * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
98     * for MVCC to align with flush sequence.
99     * @param curSeqNum
100    * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
101    */
102   public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
103     WriteEntry e = new WriteEntry(curSeqNum);
104     synchronized (writeQueue) {
105       writeQueue.add(e);
106       return e;
107     }
108   }
109 
110   /**
111    * Complete a {@link WriteEntry} that was created by
112    * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
113    * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
114    * visible to MVCC readers.
115    * @throws IOException
116    */
117   public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
118       throws IOException {
119     if(e == null) return;
120     if (seqId != null) {
121       e.setWriteNumber(seqId.getSequenceId());
122     } else {
123       // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
124       // function beginMemstoreInsertWithSeqNum in case of failures
125       e.setWriteNumber(NO_WRITE_NUMBER);
126     }
127     waitForPreviousTransactionsComplete(e);
128   }
129 
130   /**
131    * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
132    * end of this call, the global read point is at least as large as the write point of the passed
133    * in WriteEntry. Thus, the write is visible to MVCC readers.
134    */
135   public void completeMemstoreInsert(WriteEntry e) {
136     waitForPreviousTransactionsComplete(e);
137   }
138 
139   /**
140    * Mark the {@link WriteEntry} as complete and advance the read point as
141    * much as possible.
142    *
143    * How much is the read point advanced?
144    * Let S be the set of all write numbers that are completed and where all previous write numbers
145    * are also completed.  Then, the read point is advanced to the supremum of S.
146    *
147    * @param e
148    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
149    */
150   boolean advanceMemstore(WriteEntry e) {
151     long nextReadValue = -1;
152     synchronized (writeQueue) {
153       e.markCompleted();
154 
155       while (!writeQueue.isEmpty()) {
156         WriteEntry queueFirst = writeQueue.getFirst();
157         if (queueFirst.isCompleted()) {
158           // Using Max because Edit complete in WAL sync order not arriving order
159           nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
160           writeQueue.removeFirst();
161         } else {
162           break;
163         }
164       }
165 
166       if (nextReadValue > memstoreRead) {
167         memstoreRead = nextReadValue;
168       }
169 
170       // notify waiters on writeQueue before return
171       writeQueue.notifyAll();
172     }
173 
174     if (nextReadValue > 0) {
175       synchronized (readWaiters) {
176         readWaiters.notifyAll();
177       }
178     }
179 
180     if (memstoreRead >= e.getWriteNumber()) {
181       return true;
182     }
183     return false;
184   }
185 
186   /**
187    * Advances the current read point to be given seqNum if it is smaller than
188    * that.
189    */
190   void advanceMemstoreReadPointIfNeeded(long seqNum) {
191     synchronized (writeQueue) {
192       if (this.memstoreRead < seqNum) {
193         memstoreRead = seqNum;
194       }
195     }
196   }
197 
198   /**
199    * Wait for all previous MVCC transactions complete
200    */
201   public void waitForPreviousTransactionsComplete() {
202     WriteEntry w = beginMemstoreInsert();
203     waitForPreviousTransactionsComplete(w);
204   }
205 
206   public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
207     boolean interrupted = false;
208     WriteEntry w = waitedEntry;
209 
210     try {
211       WriteEntry firstEntry = null;
212       do {
213         synchronized (writeQueue) {
214           // writeQueue won't be empty at this point, the following is just a safety check
215           if (writeQueue.isEmpty()) {
216             break;
217           }
218           firstEntry = writeQueue.getFirst();
219           if (firstEntry == w) {
220             // all previous in-flight transactions are done
221             break;
222           }
223           try {
224             writeQueue.wait(0);
225           } catch (InterruptedException ie) {
226             // We were interrupted... finish the loop -- i.e. cleanup --and then
227             // on our way out, reset the interrupt flag.
228             interrupted = true;
229             break;
230           }
231         }
232       } while (firstEntry != null);
233     } finally {
234       if (w != null) {
235         advanceMemstore(w);
236       }
237     }
238     if (interrupted) {
239       Thread.currentThread().interrupt();
240     }
241   }
242 
243   public long memstoreReadPoint() {
244     return memstoreRead;
245   }
246 
247   public static class WriteEntry {
248     private long writeNumber;
249     private volatile boolean completed = false;
250 
251     WriteEntry(long writeNumber) {
252       this.writeNumber = writeNumber;
253     }
254     void markCompleted() {
255       this.completed = true;
256     }
257     boolean isCompleted() {
258       return this.completed;
259     }
260     long getWriteNumber() {
261       return this.writeNumber;
262     }
263     void setWriteNumber(long val){
264       this.writeNumber = val;
265     }
266   }
267 
268   public static final long FIXED_SIZE = ClassSize.align(
269       ClassSize.OBJECT +
270       2 * Bytes.SIZEOF_LONG +
271       2 * ClassSize.REFERENCE);
272 
273 }