View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.LinkedHashSet;
23  import java.util.concurrent.atomic.AtomicLong;
24  
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.util.Bytes;
27  import org.apache.hadoop.hbase.util.ClassSize;
28  
29  /**
30   * Manages the read/write consistency within memstore. This provides
31   * an interface for readers to determine what entries to ignore, and
32   * a mechanism for writers to obtain new write numbers, then "commit"
33   * the new writes for readers to read (thus forming atomic transactions).
34   */
35  @InterfaceAudience.Private
36  public class MultiVersionConsistencyControl {
37    static final long NO_WRITE_NUMBER = 0;
38    private volatile long memstoreRead = 0;
39    private final Object readWaiters = new Object();
40  
41    // This is the pending queue of writes.
42    private final LinkedHashSet<WriteEntry> writeQueue =
43        new LinkedHashSet<WriteEntry>();
44  
45    /**
46     * Default constructor. Initializes the memstoreRead/Write points to 0.
47     */
48    public MultiVersionConsistencyControl() {
49    }
50  
51    /**
52     * Initializes the memstoreRead/Write points appropriately.
53     * @param startPoint
54     */
55    public void initialize(long startPoint) {
56      synchronized (writeQueue) {
57        writeQueue.clear();
58        memstoreRead = startPoint;
59      }
60    }
61  
62    /**
63     *
64     * @param initVal The value we used initially and expected it'll be reset later
65     * @return WriteEntry instance.
66     */
67    WriteEntry beginMemstoreInsert() {
68      return beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER);
69    }
70  
71    /**
72     * Get a mvcc write number before an actual one(its log sequence Id) being assigned
73     * @param sequenceId
74     * @return long a faked write number which is bigger enough not to be seen by others before a real
75     *         one is assigned
76     */
77    public static long getPreAssignedWriteNumber(AtomicLong sequenceId) {
78      // the 1 billion is just an arbitrary big number to guard no scanner will reach it before
79      // current MVCC completes. Theoretically the bump only needs to be 2 * the number of handlers
80      // because each handler could increment sequence num twice and max concurrent in-flight
81      // transactions is the number of RPC handlers.
82      // We can't use Long.MAX_VALUE because we still want to maintain the ordering when multiple
83      // changes touch same row key.
84      // If for any reason, the bumped value isn't reset due to failure situations, we'll reset
85      // curSeqNum to NO_WRITE_NUMBER in order NOT to advance memstore read point at all.
86      // St.Ack 20150901 Where is the reset to NO_WRITE_NUMBER done?
87      return sequenceId.incrementAndGet() + 1000000000;
88    }
89  
90    /**
91     * This function starts a MVCC transaction with current region's log change sequence number. Since
92     * we set change sequence number when flushing current change to WAL(late binding), the flush
93     * order may differ from the order to start a MVCC transaction. For example, a change begins a
94     * MVCC firstly may complete later than a change which starts MVCC at a later time. Therefore, we
95     * add a safe bumper to the passed in sequence number to start a MVCC so that no other concurrent
96     * transactions will reuse the number till current MVCC completes(success or fail). The "faked"
97     * big number is safe because we only need it to prevent current change being seen and the number
98     * will be reset to real sequence number(set in log sync) right before we complete a MVCC in order
99     * for MVCC to align with flush sequence.
100    * @param curSeqNum
101    * @return WriteEntry a WriteEntry instance with the passed in curSeqNum
102    */
103   public WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum) {
104     return beginMemstoreInsertWithSeqNum(curSeqNum, false);
105   }
106 
107   private WriteEntry beginMemstoreInsertWithSeqNum(long curSeqNum, boolean complete) {
108     WriteEntry e = new WriteEntry(curSeqNum);
109     if (complete) {
110       e.markCompleted();
111     }
112     synchronized (writeQueue) {
113       writeQueue.add(e);
114       return e;
115     }
116   }
117 
118   /**
119    * Complete a {@link WriteEntry} that was created by
120    * {@link #beginMemstoreInsertWithSeqNum(long)}. At the end of this call, the global read
121    * point is at least as large as the write point of the passed in WriteEntry. Thus, the write is
122    * visible to MVCC readers.
123    * @throws IOException
124    */
125   public void completeMemstoreInsertWithSeqNum(WriteEntry e, SequenceId seqId)
126       throws IOException {
127     if(e == null) return;
128     if (seqId != null) {
129       e.setWriteNumber(seqId.getSequenceId());
130     } else {
131       // set the value to NO_WRITE_NUMBER in order NOT to advance memstore readpoint inside
132       // function beginMemstoreInsertWithSeqNum in case of failures
133       e.setWriteNumber(NO_WRITE_NUMBER);
134     }
135     waitForPreviousTransactionsComplete(e);
136   }
137 
138   /**
139    * Cancel a write insert that failed.
140    * Removes the write entry without advancing read point or without interfering with write
141    * entries queued behind us. It is like #advanceMemstore(WriteEntry) only this method
142    * will move the read point to the sequence id that is in WriteEntry even if it ridiculous (see
143    * the trick in HRegion where we call {@link #getPreAssignedWriteNumber(AtomicLong)} just to mark
144    * it as for special handling).
145    * @param writeEntry Failed attempt at write. Does cleanup.
146    */
147   public void cancelMemstoreInsert(WriteEntry writeEntry) {
148     // I'm not clear on how this voodoo all works but setting write number to -1 does NOT advance
149     // readpoint and gets my little writeEntry completed and removed from queue of outstanding
150     // events which seems right.  St.Ack 20150901.
151     writeEntry.setWriteNumber(NO_WRITE_NUMBER);
152     advanceMemstore(writeEntry);
153   }
154 
155   /**
156    * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}. At the
157    * end of this call, the global read point is at least as large as the write point of the passed
158    * in WriteEntry. Thus, the write is visible to MVCC readers.
159    */
160   public void completeMemstoreInsert(WriteEntry e) {
161     waitForPreviousTransactionsComplete(e);
162   }
163 
164   /**
165    * Mark the {@link WriteEntry} as complete and advance the read point as
166    * much as possible.
167    *
168    * How much is the read point advanced?
169    * Let S be the set of all write numbers that are completed and where all previous write numbers
170    * are also completed.  Then, the read point is advanced to the supremum of S.
171    *
172    * @param e
173    * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
174    */
175   boolean advanceMemstore(WriteEntry e) {
176     long nextReadValue = -1;
177     synchronized (writeQueue) {
178       e.markCompleted();
179 
180       while (!writeQueue.isEmpty()) {
181         WriteEntry queueFirst = writeQueue.iterator().next();
182         if (queueFirst.isCompleted()) {
183           // Using Max because Edit complete in WAL sync order not arriving order
184           nextReadValue = Math.max(nextReadValue, queueFirst.getWriteNumber());
185           writeQueue.remove(queueFirst);
186         } else {
187           break;
188         }
189       }
190 
191       if (nextReadValue > memstoreRead) {
192         memstoreRead = nextReadValue;
193       }
194 
195       // notify waiters on writeQueue before return
196       writeQueue.notifyAll();
197     }
198 
199     if (nextReadValue > 0) {
200       synchronized (readWaiters) {
201         readWaiters.notifyAll();
202       }
203     }
204 
205     if (memstoreRead >= e.getWriteNumber()) {
206       return true;
207     }
208     return false;
209   }
210 
211   /**
212    * Advances the current read point to be given seqNum if it is smaller than
213    * that.
214    */
215   void advanceMemstoreReadPointIfNeeded(long seqNum) {
216     synchronized (writeQueue) {
217       if (this.memstoreRead < seqNum) {
218         memstoreRead = seqNum;
219       }
220     }
221   }
222 
223   /**
224    * Wait for all previous MVCC transactions complete
225    */
226   public void waitForPreviousTransactionsComplete() {
227     WriteEntry w = beginMemstoreInsertWithSeqNum(NO_WRITE_NUMBER, true);
228     waitForPreviousTransactionsComplete(w);
229   }
230 
231   public void waitForPreviousTransactionsComplete(WriteEntry waitedEntry) {
232     boolean interrupted = false;
233     WriteEntry w = waitedEntry;
234     w.markCompleted();
235 
236     try {
237       WriteEntry firstEntry = null;
238       do {
239         synchronized (writeQueue) {
240           if (writeQueue.isEmpty()) {
241             break;
242           }
243           firstEntry = writeQueue.iterator().next();
244           if (firstEntry == w) {
245             // all previous in-flight transactions are done
246             break;
247           }
248           // WriteEntry already was removed from the queue by another handler
249           if (!writeQueue.contains(w)) {
250             break;
251           }
252           try {
253             writeQueue.wait(0);
254           } catch (InterruptedException ie) {
255             // We were interrupted... finish the loop -- i.e. cleanup --and then
256             // on our way out, reset the interrupt flag.
257             interrupted = true;
258             break;
259           }
260         }
261       } while (firstEntry != null);
262     } finally {
263       advanceMemstore(w);
264     }
265     if (interrupted) {
266       Thread.currentThread().interrupt();
267     }
268   }
269 
270   public long memstoreReadPoint() {
271     return memstoreRead;
272   }
273 
274   public static class WriteEntry {
275     private long writeNumber;
276     private volatile boolean completed = false;
277 
278     WriteEntry(long writeNumber) {
279       this.writeNumber = writeNumber;
280     }
281     void markCompleted() {
282       this.completed = true;
283     }
284     boolean isCompleted() {
285       return this.completed;
286     }
287     long getWriteNumber() {
288       return this.writeNumber;
289     }
290     void setWriteNumber(long val){
291       this.writeNumber = val;
292     }
293   }
294 
295   public static final long FIXED_SIZE = ClassSize.align(
296       ClassSize.OBJECT +
297       2 * Bytes.SIZEOF_LONG +
298       2 * ClassSize.REFERENCE);
299 
300 }