1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hbase.regionserver.wal; 19 20 import java.util.concurrent.ExecutionException; 21 import java.util.concurrent.TimeUnit; 22 23 import org.apache.hadoop.hbase.classification.InterfaceAudience; 24 import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 25 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 26 import org.apache.htrace.Span; 27 28 /** 29 * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till 30 * the sync completes. 31 * 32 * <p>Handlers coming in call append, append, append, and then do a flush/sync of 33 * the edits they have appended the WAL before returning. Since sync takes a while to 34 * complete, we give the Handlers back this sync future to wait on until the 35 * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a 36 * sync runner thread; when it completes, it finishes up the future, the handler get or failed 37 * check completes and the Handler can then progress. 38 * <p> 39 * This is just a partial implementation of Future; we just implement get and 40 * failure. Unimplemented methods throw {@link UnsupportedOperationException}. 41 * <p> 42 * There is not a one-to-one correlation between dfs sync invocations and 43 * instances of this class. A single dfs sync call may complete and mark many 44 * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync 45 * call every time a Handler asks for it. 46 * <p> 47 * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even 48 * if it the first time, start the sync, then park the 'hitched' thread on a call to 49 * #get(). 50 */ 51 @InterfaceAudience.Private 52 class SyncFuture { 53 // Implementation notes: I tried using a cyclicbarrier in here for handler and sync threads 54 // to coordinate on but it did not give any obvious advantage and some issues with order in which 55 // events happen. 56 private static final long NOT_DONE = 0; 57 58 /** 59 * The sequence at which we were added to the ring buffer. 60 */ 61 private long ringBufferSequence; 62 63 /** 64 * The sequence that was set in here when we were marked done. Should be equal 65 * or > ringBufferSequence. Put this data member into the NOT_DONE state while this 66 * class is in use. But for the first position on construction, let it be -1 so we can 67 * immediately call {@link #reset(long, Span)} below and it will work. 68 */ 69 private long doneSequence = -1; 70 71 /** 72 * If error, the associated throwable. Set when the future is 'done'. 73 */ 74 private Throwable throwable = null; 75 76 private Thread t; 77 78 /** 79 * Optionally carry a disconnected scope to the SyncRunner. 80 */ 81 private Span span; 82 83 /** 84 * Call this method to clear old usage and get it ready for new deploy. Call 85 * this method even if it is being used for the first time. 86 * 87 * @param sequence sequenceId from this Future's position in the RingBuffer 88 * @return this 89 */ 90 synchronized SyncFuture reset(final long sequence) { 91 return reset(sequence, null); 92 } 93 94 /** 95 * Call this method to clear old usage and get it ready for new deploy. Call 96 * this method even if it is being used for the first time. 97 * 98 * @param sequence sequenceId from this Future's position in the RingBuffer 99 * @param span curren span, detached from caller. Don't forget to attach it when 100 * resuming after a call to {@link #get()}. 101 * @return this 102 */ 103 synchronized SyncFuture reset(final long sequence, Span span) { 104 if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); 105 t = Thread.currentThread(); 106 if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); 107 this.doneSequence = NOT_DONE; 108 this.ringBufferSequence = sequence; 109 this.span = span; 110 this.throwable = null; 111 return this; 112 } 113 114 @Override 115 public synchronized String toString() { 116 return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence; 117 } 118 119 synchronized long getRingBufferSequence() { 120 return this.ringBufferSequence; 121 } 122 123 /** 124 * Retrieve the {@code span} instance from this Future. EventHandler calls 125 * this method to continue the span. Thread waiting on this Future musn't call 126 * this method until AFTER calling {@link #get()} and the future has been 127 * released back to the originating thread. 128 */ 129 synchronized Span getSpan() { 130 return this.span; 131 } 132 133 /** 134 * Used to re-attach a {@code span} to the Future. Called by the EventHandler 135 * after a it has completed processing and detached the span from its scope. 136 */ 137 synchronized void setSpan(Span span) { 138 this.span = span; 139 } 140 141 /** 142 * @param sequence Sync sequence at which this future 'completed'. 143 * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). 144 * @return True if we successfully marked this outstanding future as completed/done. 145 * Returns false if this future is already 'done' when this method called. 146 */ 147 synchronized boolean done(final long sequence, final Throwable t) { 148 if (isDone()) return false; 149 this.throwable = t; 150 if (sequence < this.ringBufferSequence) { 151 // Something badly wrong. 152 if (throwable == null) { 153 this.throwable = new IllegalStateException("sequence=" + sequence + 154 ", ringBufferSequence=" + this.ringBufferSequence); 155 } 156 } 157 // Mark done. 158 this.doneSequence = sequence; 159 // Wake up waiting threads. 160 notify(); 161 return true; 162 } 163 164 public boolean cancel(boolean mayInterruptIfRunning) { 165 throw new UnsupportedOperationException(); 166 } 167 168 public synchronized long get(long timeout) throws InterruptedException, 169 ExecutionException, TimeoutIOException { 170 final long done = EnvironmentEdgeManager.currentTime() + timeout; 171 while (!isDone()) { 172 wait(1000); 173 if (EnvironmentEdgeManager.currentTime() >= done) { 174 throw new TimeoutIOException("Failed to get sync result after " 175 + timeout + " ms for ringBufferSequence=" + this.ringBufferSequence 176 + ", WAL system stuck?"); 177 } 178 } 179 if (this.throwable != null) throw new ExecutionException(this.throwable); 180 return this.doneSequence; 181 } 182 183 public Long get(long timeout, TimeUnit unit) 184 throws InterruptedException, ExecutionException { 185 throw new UnsupportedOperationException(); 186 } 187 188 public boolean isCancelled() { 189 throw new UnsupportedOperationException(); 190 } 191 192 synchronized boolean isDone() { 193 return this.doneSequence != NOT_DONE; 194 } 195 196 synchronized boolean isThrowable() { 197 return isDone() && getThrowable() != null; 198 } 199 200 synchronized Throwable getThrowable() { 201 return this.throwable; 202 } 203 }