View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.regionserver.wal;
19  
20  import java.util.concurrent.ExecutionException;
21  
22  import org.apache.hadoop.hbase.classification.InterfaceAudience;
23  import org.apache.htrace.Span;
24  
25  /**
26   * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the
27   * sync completes.
28   * <p>
29   * Handlers coming in call append, append, append, and then do a flush/sync of the edits they have
30   * appended the WAL before returning. Since sync takes a while to complete, we give the Handlers
31   * back this sync future to wait on until the actual HDFS sync completes. Meantime this sync future
32   * goes across a queue and is handled by a background thread; when it completes, it finishes up the
33   * future, the handler get or failed check completes and the Handler can then progress.
34   * <p>
35   * This is just a partial implementation of Future; we just implement get and failure.
36   * <p>
37   * There is not a one-to-one correlation between dfs sync invocations and instances of this class. A
38   * single dfs sync call may complete and mark many SyncFutures as done; i.e. we batch up sync calls
39   * rather than do a dfs sync call every time a Handler asks for it.
40   * <p>
41   * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even if it the first
42   * time, start the sync, then park the 'hitched' thread on a call to #get().
43   */
44  @InterfaceAudience.Private
45  class SyncFuture {
46    // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
47    // to coordinate on but it did not give any obvious advantage and some issues with order in which
48    // events happen.
49    private static final long NOT_DONE = -1L;
50  
51    /**
52     * The transaction id of this operation, monotonically increases.
53     */
54    private long txid;
55  
56    /**
57     * The transaction id that was set in here when we were marked done. Should be equal or > txnId.
58     * Put this data member into the NOT_DONE state while this class is in use.
59     */
60    private long doneTxid;
61  
62    /**
63     * If error, the associated throwable. Set when the future is 'done'.
64     */
65    private Throwable throwable;
66  
67    private Thread t;
68  
69    /**
70     * Optionally carry a disconnected scope to the SyncRunner.
71     */
72    private Span span;
73  
74    SyncFuture(long txid, Span span) {
75      this.t = Thread.currentThread();
76      this.txid = txid;
77      this.span = span;
78      this.doneTxid = NOT_DONE;
79    }
80  
81    /**
82     * Call this method to clear old usage and get it ready for new deploy.
83     * @param txid the new transaction id
84     * @param span current span, detached from caller. Don't forget to attach it when resuming after a
85     *          call to {@link #get()}.
86     * @return this
87     */
88    synchronized SyncFuture reset(final long txid, Span span) {
89      if (t != null && t != Thread.currentThread()) {
90        throw new IllegalStateException();
91      }
92      t = Thread.currentThread();
93      if (!isDone()) {
94        throw new IllegalStateException("" + txid + " " + Thread.currentThread());
95      }
96      this.doneTxid = NOT_DONE;
97      this.txid = txid;
98      this.span = span;
99      return this;
100   }
101 
102   @Override
103   public synchronized String toString() {
104     return "done=" + isDone() + ", txid=" + this.txid;
105   }
106 
107   synchronized long getTxid() {
108     return this.txid;
109   }
110 
111   /**
112    * Retrieve the {@code span} instance from this Future. EventHandler calls this method to continue
113    * the span. Thread waiting on this Future musn't call this method until AFTER calling
114    * {@link #get()} and the future has been released back to the originating thread.
115    */
116   synchronized Span getSpan() {
117     return this.span;
118   }
119 
120   /**
121    * Used to re-attach a {@code span} to the Future. Called by the EventHandler after a it has
122    * completed processing and detached the span from its scope.
123    */
124   synchronized void setSpan(Span span) {
125     this.span = span;
126   }
127 
128   /**
129    * @param txid the transaction id at which this future 'completed'.
130    * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
131    * @return True if we successfully marked this outstanding future as completed/done. Returns false
132    *         if this future is already 'done' when this method called.
133    */
134   synchronized boolean done(final long txid, final Throwable t) {
135     if (isDone()) {
136       return false;
137     }
138     this.throwable = t;
139     if (txid < this.txid) {
140       // Something badly wrong.
141       if (throwable == null) {
142         this.throwable =
143             new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid);
144       }
145     }
146     // Mark done.
147     this.doneTxid = txid;
148     // Wake up waiting threads.
149     notify();
150     return true;
151   }
152 
153   boolean cancel(boolean mayInterruptIfRunning) {
154     throw new UnsupportedOperationException();
155   }
156 
157   synchronized long get() throws InterruptedException, ExecutionException {
158     while (!isDone()) {
159       wait(1000);
160     }
161     if (this.throwable != null) {
162       throw new ExecutionException(this.throwable);
163     }
164     return this.doneTxid;
165   }
166 
167   synchronized boolean isDone() {
168     return this.doneTxid != NOT_DONE;
169   }
170 
171   synchronized boolean isThrowable() {
172     return isDone() && getThrowable() != null;
173   }
174 
175   synchronized Throwable getThrowable() {
176     return this.throwable;
177   }
178 }