View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.TimeUnit;
22  
23  import org.apache.hadoop.hbase.classification.InterfaceAudience;
24  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
25  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
26  import org.apache.htrace.Span;
27  
28  /**
29   * A Future on a filesystem sync call.  It given to a client or 'Handler' for it to wait on till
30   * the sync completes.
31   *
32   * <p>Handlers coming in call append, append, append, and then do a flush/sync of
33   * the edits they have appended the WAL before returning. Since sync takes a while to
34   * complete, we give the Handlers back this sync future to wait on until the
35   * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
36   * sync runner thread; when it completes, it finishes up the future, the handler get or failed
37   * check completes and the Handler can then progress.
38   * <p>
39   * This is just a partial implementation of Future; we just implement get and
40   * failure.  Unimplemented methods throw {@link UnsupportedOperationException}.
41   * <p>
42   * There is not a one-to-one correlation between dfs sync invocations and
43   * instances of this class. A single dfs sync call may complete and mark many
44   * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
45   * call every time a Handler asks for it.
46   * <p>
47   * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
48   * if it the first time, start the sync, then park the 'hitched' thread on a call to
49   * #get().
50   */
51  @InterfaceAudience.Private
52  class SyncFuture {
53    // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
54    // to coordinate on but it did not give any obvious advantage and some issues with order in which
55    // events happen.
56    private static final long NOT_DONE = 0;
57  
58    /**
59     * The sequence at which we were added to the ring buffer.
60     */
61    private long ringBufferSequence;
62  
63    /**
64     * The sequence that was set in here when we were marked done. Should be equal
65     * or > ringBufferSequence.  Put this data member into the NOT_DONE state while this
66     * class is in use.  But for the first position on construction, let it be -1 so we can
67     * immediately call {@link #reset(long, Span)} below and it will work.
68     */
69    private long doneSequence = -1;
70  
71    /**
72     * If error, the associated throwable. Set when the future is 'done'.
73     */
74    private Throwable throwable = null;
75  
76    private Thread t;
77  
78    /**
79     * Optionally carry a disconnected scope to the SyncRunner.
80     */
81    private Span span;
82  
83    /**
84     * Call this method to clear old usage and get it ready for new deploy. Call
85     * this method even if it is being used for the first time.
86     *
87     * @param sequence sequenceId from this Future's position in the RingBuffer
88     * @return this
89     */
90    synchronized SyncFuture reset(final long sequence) {
91      return reset(sequence, null);
92    }
93  
94    /**
95     * Call this method to clear old usage and get it ready for new deploy. Call
96     * this method even if it is being used for the first time.
97     *
98     * @param sequence sequenceId from this Future's position in the RingBuffer
99     * @param span curren span, detached from caller. Don't forget to attach it when
100    *             resuming after a call to {@link #get()}.
101    * @return this
102    */
103   synchronized SyncFuture reset(final long sequence, Span span) {
104     if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
105     t = Thread.currentThread();
106     if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
107     this.doneSequence = NOT_DONE;
108     this.ringBufferSequence = sequence;
109     this.span = span;
110     this.throwable = null;
111     return this;
112   }
113 
114   @Override
115   public synchronized String toString() {
116     return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
117   }
118 
119   synchronized long getRingBufferSequence() {
120     return this.ringBufferSequence;
121   }
122 
123   /**
124    * Retrieve the {@code span} instance from this Future. EventHandler calls
125    * this method to continue the span. Thread waiting on this Future musn't call
126    * this method until AFTER calling {@link #get()} and the future has been
127    * released back to the originating thread.
128    */
129   synchronized Span getSpan() {
130     return this.span;
131   }
132 
133   /**
134    * Used to re-attach a {@code span} to the Future. Called by the EventHandler
135    * after a it has completed processing and detached the span from its scope.
136    */
137   synchronized void setSpan(Span span) {
138     this.span = span;
139   }
140 
141   /**
142    * @param sequence Sync sequence at which this future 'completed'.
143    * @param t Can be null.  Set if we are 'completing' on error (and this 't' is the error).
144    * @return True if we successfully marked this outstanding future as completed/done.
145    * Returns false if this future is already 'done' when this method called.
146    */
147   synchronized boolean done(final long sequence, final Throwable t) {
148     if (isDone()) return false;
149     this.throwable = t;
150     if (sequence < this.ringBufferSequence) {
151       // Something badly wrong.
152       if (throwable == null) {
153         this.throwable = new IllegalStateException("sequence=" + sequence +
154           ", ringBufferSequence=" + this.ringBufferSequence);
155       }
156     }
157     // Mark done.
158     this.doneSequence = sequence;
159     // Wake up waiting threads.
160     notify();
161     return true;
162   }
163 
164   public boolean cancel(boolean mayInterruptIfRunning) {
165     throw new UnsupportedOperationException();
166   }
167 
168   public synchronized long get(long timeout) throws InterruptedException,
169       ExecutionException, TimeoutIOException {
170     final long done = EnvironmentEdgeManager.currentTime() + timeout;
171     while (!isDone()) {
172       wait(1000);
173       if (EnvironmentEdgeManager.currentTime() >= done) {
174         throw new TimeoutIOException("Failed to get sync result after "
175             + timeout + " ms for ringBufferSequence=" + this.ringBufferSequence
176             + ", WAL system stuck?");
177       }
178     }
179     if (this.throwable != null) throw new ExecutionException(this.throwable);
180     return this.doneSequence;
181   }
182 
183   public Long get(long timeout, TimeUnit unit)
184   throws InterruptedException, ExecutionException {
185     throw new UnsupportedOperationException();
186   }
187 
188   public boolean isCancelled() {
189     throw new UnsupportedOperationException();
190   }
191 
192   synchronized boolean isDone() {
193     return this.doneSequence != NOT_DONE;
194   }
195 
196   synchronized boolean isThrowable() {
197     return isDone() && getThrowable() != null;
198   }
199 
200   synchronized Throwable getThrowable() {
201     return this.throwable;
202   }
203 }