View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.util.LinkedList;
23  
24  import org.apache.hadoop.hbase.util.Bytes;
25  import org.apache.hadoop.hbase.util.ClassSize;
26  
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.commons.logging.Log;
29  
30  /**
31   * Manages the read/write consistency within memstore. This provides
32   * an interface for readers to determine what entries to ignore, and
33   * a mechanism for writers to obtain new write numbers, then "commit"
34   * the new writes for readers to read (thus forming atomic transactions).
35   */
36  public class MultiVersionConsistencyControl {
37    private volatile long memstoreRead = 0;
38    private volatile long memstoreWrite = 0;
39  
40    private final Object readWaiters = new Object();
41  
42    // This is the pending queue of writes.
43    private final LinkedList<WriteEntry> writeQueue =
44        new LinkedList<WriteEntry>();
45  
46    private static final ThreadLocal<Long> perThreadReadPoint =
47        new ThreadLocal<Long>() {
48         @Override
49        protected
50         Long initialValue() {
51           return Long.MAX_VALUE;
52         }
53    };
54  
55    /**
56     * Default constructor. Initializes the memstoreRead/Write points to 0.
57     */
58    public MultiVersionConsistencyControl() {
59      this.memstoreRead = this.memstoreWrite = 0;
60    }
61  
62    /**
63     * Initializes the memstoreRead/Write points appropriately.
64     * @param startPoint
65     */
66    public void initialize(long startPoint) {
67      synchronized (writeQueue) {
68        if (this.memstoreWrite != this.memstoreRead) {
69          throw new RuntimeException("Already used this mvcc. Too late to initialize");
70        }
71  
72        this.memstoreRead = this.memstoreWrite = startPoint;
73      }
74    }
75  
76    /**
77     * Get this thread's read point. Used primarily by the memstore scanner to
78     * know which values to skip (ie: have not been completed/committed to
79     * memstore).
80     */
81    public static long getThreadReadPoint() {
82        return perThreadReadPoint.get();
83    }
84  
85    /**
86     * Set the thread read point to the given value. The thread MVCC
87     * is used by the Memstore scanner so it knows which values to skip.
88     * Give it a value of 0 if you want everything.
89     */
90    public static void setThreadReadPoint(long readPoint) {
91      perThreadReadPoint.set(readPoint);
92    }
93  
94    /**
95     * Set the thread MVCC read point to whatever the current read point is in
96     * this particular instance of MVCC.  Returns the new thread read point value.
97     */
98    public static long resetThreadReadPoint(MultiVersionConsistencyControl mvcc) {
99      perThreadReadPoint.set(mvcc.memstoreReadPoint());
100     return getThreadReadPoint();
101   }
102 
103   /**
104    * Set the thread MVCC read point to 0 (include everything).
105    */
106   public static void resetThreadReadPoint() {
107     perThreadReadPoint.set(0L);
108   }
109 
110   public WriteEntry beginMemstoreInsert() {
111     synchronized (writeQueue) {
112       long nextWriteNumber = ++memstoreWrite;
113       WriteEntry e = new WriteEntry(nextWriteNumber);
114       writeQueue.add(e);
115       return e;
116     }
117   }
118 
119   public void completeMemstoreInsert(WriteEntry e) {
120     advanceMemstore(e);
121     waitForRead(e);
122   }
123 
124   boolean advanceMemstore(WriteEntry e) {
125     synchronized (writeQueue) {
126       e.markCompleted();
127 
128       long nextReadValue = -1;
129       boolean ranOnce=false;
130       while (!writeQueue.isEmpty()) {
131         ranOnce=true;
132         WriteEntry queueFirst = writeQueue.getFirst();
133 
134         if (nextReadValue > 0) {
135           if (nextReadValue+1 != queueFirst.getWriteNumber()) {
136             throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
137                 + nextReadValue + " next: " + queueFirst.getWriteNumber());
138           }
139         }
140 
141         if (queueFirst.isCompleted()) {
142           nextReadValue = queueFirst.getWriteNumber();
143           writeQueue.removeFirst();
144         } else {
145           break;
146         }
147       }
148 
149       if (!ranOnce) {
150         throw new RuntimeException("never was a first");
151       }
152 
153       if (nextReadValue > 0) {
154         synchronized (readWaiters) {
155           memstoreRead = nextReadValue;
156           readWaiters.notifyAll();
157         }
158       }
159       if (memstoreRead >= e.getWriteNumber()) {
160         return true;
161       }
162       return false;
163     }
164   }
165 
166   /**
167    * Wait for the global readPoint to advance upto
168    * the specified transaction number.
169    */
170   public void waitForRead(WriteEntry e) {
171     boolean interrupted = false;
172     synchronized (readWaiters) {
173       while (memstoreRead < e.getWriteNumber()) {
174         try {
175           readWaiters.wait(0);
176         } catch (InterruptedException ie) {
177           // We were interrupted... finish the loop -- i.e. cleanup --and then
178           // on our way out, reset the interrupt flag.
179           interrupted = true;
180         }
181       }
182     }
183     if (interrupted) Thread.currentThread().interrupt();
184   }
185 
186   public long memstoreReadPoint() {
187     return memstoreRead;
188   }
189 
190 
191   public static class WriteEntry {
192     private long writeNumber;
193     private boolean completed = false;
194     WriteEntry(long writeNumber) {
195       this.writeNumber = writeNumber;
196     }
197     void markCompleted() {
198       this.completed = true;
199     }
200     boolean isCompleted() {
201       return this.completed;
202     }
203     long getWriteNumber() {
204       return this.writeNumber;
205     }
206   }
207 
208   public static final long FIXED_SIZE = ClassSize.align(
209       ClassSize.OBJECT +
210       2 * Bytes.SIZEOF_LONG +
211       2 * ClassSize.REFERENCE);
212 
213 }