1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.hadoop.hbase.classification.InterfaceAudience;
24 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
25 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
26 import org.apache.htrace.Span;
27
28 /**
29 * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till
30 * the sync completes.
31 *
32 * <p>Handlers coming in call append, append, append, and then do a flush/sync of
33 * the edits they have appended the WAL before returning. Since sync takes a while to
34 * complete, we give the Handlers back this sync future to wait on until the
35 * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a
36 * sync runner thread; when it completes, it finishes up the future, the handler get or failed
37 * check completes and the Handler can then progress.
38 * <p>
39 * This is just a partial implementation of Future; we just implement get and
40 * failure. Unimplemented methods throw {@link UnsupportedOperationException}.
41 * <p>
42 * There is not a one-to-one correlation between dfs sync invocations and
43 * instances of this class. A single dfs sync call may complete and mark many
44 * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync
45 * call every time a Handler asks for it.
46 * <p>
47 * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even
48 * if it the first time, start the sync, then park the 'hitched' thread on a call to
49 * #get().
50 */
51 @InterfaceAudience.Private
52 class SyncFuture {
53 // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads
54 // to coordinate on but it did not give any obvious advantage and some issues with order in which
55 // events happen.
56 private static final long NOT_DONE = 0;
57
58 /**
59 * The sequence at which we were added to the ring buffer.
60 */
61 private long ringBufferSequence;
62
63 /**
64 * The sequence that was set in here when we were marked done. Should be equal
65 * or > ringBufferSequence. Put this data member into the NOT_DONE state while this
66 * class is in use. But for the first position on construction, let it be -1 so we can
67 * immediately call {@link #reset(long, Span)} below and it will work.
68 */
69 private long doneSequence = -1;
70
71 /**
72 * If error, the associated throwable. Set when the future is 'done'.
73 */
74 private Throwable throwable = null;
75
76 private Thread t;
77
78 /**
79 * Optionally carry a disconnected scope to the SyncRunner.
80 */
81 private Span span;
82
83 /**
84 * Call this method to clear old usage and get it ready for new deploy. Call
85 * this method even if it is being used for the first time.
86 *
87 * @param sequence sequenceId from this Future's position in the RingBuffer
88 * @return this
89 */
90 synchronized SyncFuture reset(final long sequence) {
91 return reset(sequence, null);
92 }
93
94 /**
95 * Call this method to clear old usage and get it ready for new deploy. Call
96 * this method even if it is being used for the first time.
97 *
98 * @param sequence sequenceId from this Future's position in the RingBuffer
99 * @param span curren span, detached from caller. Don't forget to attach it when
100 * resuming after a call to {@link #get()}.
101 * @return this
102 */
103 synchronized SyncFuture reset(final long sequence, Span span) {
104 if (t != null && t != Thread.currentThread()) throw new IllegalStateException();
105 t = Thread.currentThread();
106 if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread());
107 this.doneSequence = NOT_DONE;
108 this.ringBufferSequence = sequence;
109 this.span = span;
110 this.throwable = null;
111 return this;
112 }
113
114 @Override
115 public synchronized String toString() {
116 return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence;
117 }
118
119 synchronized long getRingBufferSequence() {
120 return this.ringBufferSequence;
121 }
122
123 /**
124 * Retrieve the {@code span} instance from this Future. EventHandler calls
125 * this method to continue the span. Thread waiting on this Future musn't call
126 * this method until AFTER calling {@link #get()} and the future has been
127 * released back to the originating thread.
128 */
129 synchronized Span getSpan() {
130 return this.span;
131 }
132
133 /**
134 * Used to re-attach a {@code span} to the Future. Called by the EventHandler
135 * after a it has completed processing and detached the span from its scope.
136 */
137 synchronized void setSpan(Span span) {
138 this.span = span;
139 }
140
141 /**
142 * @param sequence Sync sequence at which this future 'completed'.
143 * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
144 * @return True if we successfully marked this outstanding future as completed/done.
145 * Returns false if this future is already 'done' when this method called.
146 */
147 synchronized boolean done(final long sequence, final Throwable t) {
148 if (isDone()) return false;
149 this.throwable = t;
150 if (sequence < this.ringBufferSequence) {
151 // Something badly wrong.
152 if (throwable == null) {
153 this.throwable = new IllegalStateException("sequence=" + sequence +
154 ", ringBufferSequence=" + this.ringBufferSequence);
155 }
156 }
157 // Mark done.
158 this.doneSequence = sequence;
159 // Wake up waiting threads.
160 notify();
161 return true;
162 }
163
164 public boolean cancel(boolean mayInterruptIfRunning) {
165 throw new UnsupportedOperationException();
166 }
167
168 public synchronized long get(long timeout) throws InterruptedException,
169 ExecutionException, TimeoutIOException {
170 final long done = EnvironmentEdgeManager.currentTime() + timeout;
171 while (!isDone()) {
172 wait(1000);
173 if (EnvironmentEdgeManager.currentTime() >= done) {
174 throw new TimeoutIOException("Failed to get sync result after "
175 + timeout + " ms for ringBufferSequence=" + this.ringBufferSequence
176 + ", WAL system stuck?");
177 }
178 }
179 if (this.throwable != null) throw new ExecutionException(this.throwable);
180 return this.doneSequence;
181 }
182
183 public Long get(long timeout, TimeUnit unit)
184 throws InterruptedException, ExecutionException {
185 throw new UnsupportedOperationException();
186 }
187
188 public boolean isCancelled() {
189 throw new UnsupportedOperationException();
190 }
191
192 synchronized boolean isDone() {
193 return this.doneSequence != NOT_DONE;
194 }
195
196 synchronized boolean isThrowable() {
197 return isDone() && getThrowable() != null;
198 }
199
200 synchronized Throwable getThrowable() {
201 return this.throwable;
202 }
203 }