1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.htrace.Span;
25
26 /**
27 * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till
28 * the sync completes.
29 *
30 * <p>Handlers coming in call append, append, append, and then do a flush/sync of
31 * the edits they have appended the WAL before returning. Since sync takes a while to
32 * complete, we give the Handlers back this sync future to wait on until the
33 * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
34 * sync runner thread; when it completes, it finishes up the future, the handler get or failed
35 * check completes and the Handler can then progress.
36 * <p>
37 * This is just a partial implementation of Future; we just implement get and
38 * failure. Unimplemented methods throw {@link UnsupportedOperationException}.
39 * <p>
40 * There is not a one-to-one correlation between dfs sync invocations and
41 * instances of this class. A single dfs sync call may complete and mark many
42 * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
43 * call every time a Handler asks for it.
44 * <p>
45 * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
46 * if it the first time, start the sync, then park the 'hitched' thread on a call to
47 * #get().
48 */
49 @InterfaceAudience.Private
50 class SyncFuture {
51 // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
52 // to coordinate on but it did not give any obvious advantage and some issues with order in which
53 // events happen.
54 private static final long NOT_DONE = 0;
55
56 /**
57 * The sequence at which we were added to the ring buffer.
58 */
59 private long ringBufferSequence;
60
61 /**
62 * The sequence that was set in here when we were marked done. Should be equal
63 * or > ringBufferSequence. Put this data member into the NOT_DONE state while this
64 * class is in use. But for the first position on construction, let it be -1 so we can
65 * immediately call {@link #reset(long, Span)} below and it will work.
66 */
67 private long doneSequence = -1;
68
69 /**
70 * If error, the associated throwable. Set when the future is 'done'.
71 */
72 private Throwable throwable = null;
73
74 private Thread t;
75
76 /**
77 * Optionally carry a disconnected scope to the SyncRunner.
78 */
79 private Span span;
80
81 /**
82 * Call this method to clear old usage and get it ready for new deploy. Call
83 * this method even if it is being used for the first time.
84 *
85 * @param sequence sequenceId from this Future's position in the RingBuffer
86 * @return this
87 */
88 synchronized SyncFuture reset(final long sequence) {
89 return reset(sequence, null);
90 }
91
92 /**
93 * Call this method to clear old usage and get it ready for new deploy. Call
94 * this method even if it is being used for the first time.
95 *
96 * @param sequence sequenceId from this Future's position in the RingBuffer
97 * @param span curren span, detached from caller. Don't forget to attach it when
98 * resuming after a call to {@link #get()}.
99 * @return this
100 */
101 synchronized SyncFuture reset(final long sequence, Span span) {
102 if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
103 t = Thread.currentThread();
104 if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
105 this.doneSequence = NOT_DONE;
106 this.ringBufferSequence = sequence;
107 this.span = span;
108 return this;
109 }
110
111 @Override
112 public synchronized String toString() {
113 return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
114 }
115
116 synchronized long getRingBufferSequence() {
117 return this.ringBufferSequence;
118 }
119
120 /**
121 * Retrieve the {@code span} instance from this Future. EventHandler calls
122 * this method to continue the span. Thread waiting on this Future musn't call
123 * this method until AFTER calling {@link #get()} and the future has been
124 * released back to the originating thread.
125 */
126 synchronized Span getSpan() {
127 return this.span;
128 }
129
130 /**
131 * Used to re-attach a {@code span} to the Future. Called by the EventHandler
132 * after a it has completed processing and detached the span from its scope.
133 */
134 synchronized void setSpan(Span span) {
135 this.span = span;
136 }
137
138 /**
139 * @param sequence Sync sequence at which this future 'completed'.
140 * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
141 * @return True if we successfully marked this outstanding future as completed/done.
142 * Returns false if this future is already 'done' when this method called.
143 */
144 synchronized boolean done(final long sequence, final Throwable t) {
145 if (isDone()) return false;
146 this.throwable = t;
147 if (sequence < this.ringBufferSequence) {
148 // Something badly wrong.
149 if (throwable == null) {
150 this.throwable = new IllegalStateException("sequence=" + sequence +
151 ", ringBufferSequence=" + this.ringBufferSequence);
152 }
153 }
154 // Mark done.
155 this.doneSequence = sequence;
156 // Wake up waiting threads.
157 notify();
158 return true;
159 }
160
161 public boolean cancel(boolean mayInterruptIfRunning) {
162 throw new UnsupportedOperationException();
163 }
164
165 public synchronized long get() throws InterruptedException, ExecutionException {
166 while (!isDone()) {
167 wait(1000);
168 }
169 if (this.throwable != null) throw new ExecutionException(this.throwable);
170 return this.doneSequence;
171 }
172
173 public Long get(long timeout, TimeUnit unit)
174 throws InterruptedException, ExecutionException {
175 throw new UnsupportedOperationException();
176 }
177
178 public boolean isCancelled() {
179 throw new UnsupportedOperationException();
180 }
181
182 synchronized boolean isDone() {
183 return this.doneSequence != NOT_DONE;
184 }
185
186 synchronized boolean isThrowable() {
187 return isDone() && getThrowable() != null;
188 }
189
190 synchronized Throwable getThrowable() {
191 return this.throwable;
192 }
193 }