View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.util.LinkedList;
22  
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.util.Bytes;
25  import org.apache.hadoop.hbase.util.ClassSize;
26  
27  /**
28   * Manages the read/write consistency within memstore. This provides
29   * an interface for readers to determine what entries to ignore, and
30   * a mechanism for writers to obtain new write numbers, then "commit"
31   * the new writes for readers to read (thus forming atomic transactions).
32   */
33  @InterfaceAudience.Private
34  public class MultiVersionConsistencyControl {
35    private volatile long memstoreRead = 0;
36    private volatile long memstoreWrite = 0;
37  
38    private final Object readWaiters = new Object();
39  
40    // This is the pending queue of writes.
41    private final LinkedList<WriteEntry> writeQueue =
42        new LinkedList<WriteEntry>();
43  
44    /**
45     * Default constructor. Initializes the memstoreRead/Write points to 0.
46     */
47    public MultiVersionConsistencyControl() {
48      this.memstoreRead = this.memstoreWrite = 0;
49    }
50  
51    /**
52     * Initializes the memstoreRead/Write points appropriately.
53     * @param startPoint
54     */
55    public void initialize(long startPoint) {
56      synchronized (writeQueue) {
57        if (this.memstoreWrite != this.memstoreRead) {
58          throw new RuntimeException("Already used this mvcc. Too late to initialize");
59        }
60  
61        this.memstoreRead = this.memstoreWrite = startPoint;
62      }
63    }
64  
65    /**
66     * Generate and return a {@link WriteEntry} with a new write number.
67     * To complete the WriteEntry and wait for it to be visible,
68     * call {@link #completeMemstoreInsert(WriteEntry)}.
69     */
70    public WriteEntry beginMemstoreInsert() {
71      synchronized (writeQueue) {
72        long nextWriteNumber = ++memstoreWrite;
73        WriteEntry e = new WriteEntry(nextWriteNumber);
74        writeQueue.add(e);
75        return e;
76      }
77    }
78  
79    /**
80     * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
81     *
82     * At the end of this call, the global read point is at least as large as the write point
83     * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
84     */
85    public void completeMemstoreInsert(WriteEntry e) {
86      advanceMemstore(e);
87      waitForRead(e);
88    }
89  
90    /**
91     * Mark the {@link WriteEntry} as complete and advance the read point as
92     * much as possible.
93     *
94     * How much is the read point advanced?
95     * Let S be the set of all write numbers that are completed and where all previous write numbers
96     * are also completed.  Then, the read point is advanced to the supremum of S.
97     *
98     * @param e
99     * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
100    */
101   boolean advanceMemstore(WriteEntry e) {
102     synchronized (writeQueue) {
103       e.markCompleted();
104 
105       long nextReadValue = -1;
106       boolean ranOnce=false;
107       while (!writeQueue.isEmpty()) {
108         ranOnce=true;
109         WriteEntry queueFirst = writeQueue.getFirst();
110 
111         if (nextReadValue > 0) {
112           if (nextReadValue+1 != queueFirst.getWriteNumber()) {
113             throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
114                 + nextReadValue + " next: " + queueFirst.getWriteNumber());
115           }
116         }
117 
118         if (queueFirst.isCompleted()) {
119           nextReadValue = queueFirst.getWriteNumber();
120           writeQueue.removeFirst();
121         } else {
122           break;
123         }
124       }
125 
126       if (!ranOnce) {
127         throw new RuntimeException("never was a first");
128       }
129 
130       if (nextReadValue > 0) {
131         synchronized (readWaiters) {
132           memstoreRead = nextReadValue;
133           readWaiters.notifyAll();
134         }
135       }
136       if (memstoreRead >= e.getWriteNumber()) {
137         return true;
138       }
139       return false;
140     }
141   }
142 
143   /**
144    * Wait for the global readPoint to advance upto
145    * the specified transaction number.
146    */
147   public void waitForRead(WriteEntry e) {
148     boolean interrupted = false;
149     synchronized (readWaiters) {
150       while (memstoreRead < e.getWriteNumber()) {
151         try {
152           readWaiters.wait(0);
153         } catch (InterruptedException ie) {
154           // We were interrupted... finish the loop -- i.e. cleanup --and then
155           // on our way out, reset the interrupt flag.
156           interrupted = true;
157         }
158       }
159     }
160     if (interrupted) Thread.currentThread().interrupt();
161   }
162 
163   public long memstoreReadPoint() {
164     return memstoreRead;
165   }
166 
167 
168   public static class WriteEntry {
169     private long writeNumber;
170     private boolean completed = false;
171     WriteEntry(long writeNumber) {
172       this.writeNumber = writeNumber;
173     }
174     void markCompleted() {
175       this.completed = true;
176     }
177     boolean isCompleted() {
178       return this.completed;
179     }
180     long getWriteNumber() {
181       return this.writeNumber;
182     }
183   }
184 
185   public static final long FIXED_SIZE = ClassSize.align(
186       ClassSize.OBJECT +
187       2 * Bytes.SIZEOF_LONG +
188       2 * ClassSize.REFERENCE);
189 
190 }