1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver;
20
21 import java.text.SimpleDateFormat;
22 import java.util.Date;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.ScheduledChore;
31 import org.apache.hadoop.hbase.Stoppable;
32 import org.apache.hadoop.hbase.classification.InterfaceAudience;
33 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
34 import org.apache.hadoop.hbase.util.NonceKey;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38
39
40
41
42 @InterfaceAudience.Private
43 public class ServerNonceManager {
44 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
45 private static final Log LOG = LogFactory.getLog(ServerNonceManager.class);
46
47
48
49 private int conflictWaitIterationMs = 30000;
50
51 private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
52
53
54 private static class OperationContext {
55 static final int DONT_PROCEED = 0;
56 static final int PROCEED = 1;
57 static final int WAIT = 2;
58
59
60 private long data = 0;
61 private static final long STATE_BITS = 3;
62 private static final long WAITING_BIT = 4;
63 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
64
65 @Override
66 public String toString() {
67 return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
68 + tsFormat.format(new Date(getActivityTime())) + "]";
69 }
70
71 public OperationContext() {
72 setState(WAIT);
73 reportActivity();
74 }
75
76 public void setState(int state) {
77 this.data = (this.data & ~STATE_BITS) | state;
78 }
79
80 public int getState() {
81 return (int)(this.data & STATE_BITS);
82 }
83
84 public void setHasWait() {
85 this.data = this.data | WAITING_BIT;
86 }
87
88 public boolean hasWait() {
89 return (this.data & WAITING_BIT) == WAITING_BIT;
90 }
91
92 public void reportActivity() {
93 long now = EnvironmentEdgeManager.currentTime();
94 this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
95 }
96
97 public boolean isExpired(long minRelevantTime) {
98 return getActivityTime() < (minRelevantTime & (~0l >>> 3));
99 }
100
101 private long getActivityTime() {
102 return this.data >>> 3;
103 }
104 }
105
106
107
108
109
110
111
112
113
114 private ConcurrentHashMap<NonceKey, OperationContext> nonces =
115 new ConcurrentHashMap<NonceKey, OperationContext>();
116
117 private int deleteNonceGracePeriod;
118
119 public ServerNonceManager(Configuration conf) {
120
121 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
122 if (deleteNonceGracePeriod < 60 * 1000) {
123 LOG.warn("Nonce grace period " + deleteNonceGracePeriod
124 + " is less than a minute; might be too small to be useful");
125 }
126 }
127
128 @VisibleForTesting
129 public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
130 this.conflictWaitIterationMs = conflictWaitIterationMs;
131 }
132
133
134
135
136
137
138
139
140
141 public boolean startOperation(long group, long nonce, Stoppable stoppable)
142 throws InterruptedException {
143 if (nonce == HConstants.NO_NONCE) return true;
144 NonceKey nk = new NonceKey(group, nonce);
145 OperationContext ctx = new OperationContext();
146 while (true) {
147 OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
148 if (oldResult == null) return true;
149
150
151 synchronized (oldResult) {
152 int oldState = oldResult.getState();
153 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
154 if (oldState != OperationContext.WAIT) {
155 return oldState == OperationContext.PROCEED;
156 }
157 oldResult.setHasWait();
158 oldResult.wait(this.conflictWaitIterationMs);
159 if (stoppable.isStopped()) {
160 throw new InterruptedException("Server stopped");
161 }
162 }
163 }
164 }
165
166
167
168
169
170
171
172 public void endOperation(long group, long nonce, boolean success) {
173 if (nonce == HConstants.NO_NONCE) return;
174 NonceKey nk = new NonceKey(group, nonce);
175 OperationContext newResult = nonces.get(nk);
176 assert newResult != null;
177 synchronized (newResult) {
178 assert newResult.getState() == OperationContext.WAIT;
179
180 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
181 if (success) {
182 newResult.reportActivity();
183 } else {
184 OperationContext val = nonces.remove(nk);
185 assert val == newResult;
186 }
187 if (newResult.hasWait()) {
188 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
189 newResult.notifyAll();
190 }
191 }
192 }
193
194
195
196
197
198
199
200 public void reportOperationFromWal(long group, long nonce, long writeTime) {
201 if (nonce == HConstants.NO_NONCE) return;
202
203 long now = EnvironmentEdgeManager.currentTime();
204 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
205 OperationContext newResult = new OperationContext();
206 newResult.setState(OperationContext.DONT_PROCEED);
207 NonceKey nk = new NonceKey(group, nonce);
208 OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
209 if (oldResult != null) {
210
211
212 LOG.warn("Nonce collision during WAL recovery: " + nk
213 + ", " + oldResult + " with " + newResult);
214 }
215 }
216
217
218
219
220
221
222 public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
223
224 return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
225 @Override
226 protected void chore() {
227 cleanUpOldNonces();
228 }
229 };
230 }
231
232 private void cleanUpOldNonces() {
233 long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
234 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
235 OperationContext oc = entry.getValue();
236 if (!oc.isExpired(cutoff)) continue;
237 synchronized (oc) {
238 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
239 nonces.remove(entry.getKey());
240 }
241 }
242 }
243 }