001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import java.util.concurrent.ExecutionException; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.ReentrantLock; 024import org.apache.hadoop.hbase.exceptions.TimeoutIOException; 025import org.apache.yetus.audience.InterfaceAudience; 026 027/** 028 * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the 029 * sync completes. 030 * <p> 031 * Handlers coming in call append, append, append, and then do a flush/sync of the edits they have 032 * appended the WAL before returning. Since sync takes a while to complete, we give the Handlers 033 * back this sync future to wait on until the actual HDFS sync completes. Meantime this sync future 034 * goes across a queue and is handled by a background thread; when it completes, it finishes up the 035 * future, the handler get or failed check completes and the Handler can then progress. 036 * <p> 037 * This is just a partial implementation of Future; we just implement get and failure. 038 * <p> 039 * There is not a one-to-one correlation between dfs sync invocations and instances of this class. A 040 * single dfs sync call may complete and mark many SyncFutures as done; i.e. we batch up sync calls 041 * rather than do a dfs sync call every time a Handler asks for it. 042 * <p> 043 * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even if it the first 044 * time, start the sync, then park the 'hitched' thread on a call to #get(). 045 */ 046@InterfaceAudience.Private 047class SyncFuture { 048 049 private static final long NOT_DONE = -1L; 050 private Thread t; 051 052 /** 053 * Lock protecting the thread-safe fields. 054 */ 055 private final ReentrantLock doneLock; 056 057 /** 058 * Condition to wait on for client threads. 059 */ 060 private final Condition doneCondition; 061 062 /* 063 * Fields below are protected by {@link SyncFuture#doneLock}. 064 */ 065 066 /** 067 * The transaction id that was set in here when we were marked done. Should be equal or > txnId. 068 * Put this data member into the NOT_DONE state while this class is in use. 069 */ 070 private long doneTxid; 071 072 /** 073 * If error, the associated throwable. Set when the future is 'done'. 074 */ 075 private Throwable throwable; 076 077 /* 078 * Fields below are created once at reset() and accessed without any lock. Should be ok as they 079 * are immutable for this instance of sync future until it is reset. 080 */ 081 082 /** 083 * The transaction id of this operation, monotonically increases. 084 */ 085 private long txid; 086 087 private boolean forceSync; 088 089 SyncFuture() { 090 this.doneLock = new ReentrantLock(); 091 this.doneCondition = doneLock.newCondition(); 092 } 093 094 /** 095 * Call this method to clear old usage and get it ready for new deploy. 096 * @param txid the new transaction id 097 */ 098 SyncFuture reset(long txid, boolean forceSync) { 099 if (t != null && t != Thread.currentThread()) { 100 throw new IllegalStateException(); 101 } 102 t = Thread.currentThread(); 103 if (!isDone()) { 104 throw new IllegalStateException("" + txid + " " + Thread.currentThread()); 105 } 106 this.doneTxid = NOT_DONE; 107 this.forceSync = forceSync; 108 this.txid = txid; 109 this.throwable = null; 110 return this; 111 } 112 113 @Override 114 public String toString() { 115 return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() + " threadName=" 116 + t.getName(); 117 } 118 119 long getTxid() { 120 return this.txid; 121 } 122 123 boolean isForceSync() { 124 return forceSync; 125 } 126 127 /** 128 * Returns the thread that owned this sync future, use with caution as we return the reference to 129 * the actual thread object. 130 * @return the associated thread instance. 131 */ 132 Thread getThread() { 133 return t; 134 } 135 136 /** 137 * @param txid the transaction id at which this future 'completed'. 138 * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). 139 * @return True if we successfully marked this outstanding future as completed/done. Returns false 140 * if this future is already 'done' when this method called. 141 */ 142 boolean done(final long txid, final Throwable t) { 143 doneLock.lock(); 144 try { 145 if (doneTxid != NOT_DONE) { 146 return false; 147 } 148 this.throwable = t; 149 if (txid < this.txid) { 150 // Something badly wrong. 151 if (throwable == null) { 152 this.throwable = 153 new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid); 154 } 155 } 156 // Mark done. 157 this.doneTxid = txid; 158 doneCondition.signalAll(); 159 return true; 160 } finally { 161 doneLock.unlock(); 162 } 163 } 164 165 long get(long timeoutNs) throws InterruptedException, ExecutionException, TimeoutIOException { 166 doneLock.lock(); 167 try { 168 while (doneTxid == NOT_DONE) { 169 if (!doneCondition.await(timeoutNs, TimeUnit.NANOSECONDS)) { 170 throw new TimeoutIOException( 171 "Failed to get sync result after " + TimeUnit.NANOSECONDS.toMillis(timeoutNs) 172 + " ms for txid=" + this.txid + ", WAL system stuck?"); 173 } 174 } 175 if (this.throwable != null) { 176 throw new ExecutionException(this.throwable); 177 } 178 return this.doneTxid; 179 } finally { 180 doneLock.unlock(); 181 } 182 } 183 184 boolean isDone() { 185 doneLock.lock(); 186 try { 187 return this.doneTxid != NOT_DONE; 188 } finally { 189 doneLock.unlock(); 190 } 191 } 192 193 Throwable getThrowable() { 194 doneLock.lock(); 195 try { 196 if (doneTxid == NOT_DONE) { 197 return null; 198 } 199 return this.throwable; 200 } finally { 201 doneLock.unlock(); 202 } 203 } 204}