001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.text.SimpleDateFormat; 022import java.util.Date; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.ScheduledChore; 029import org.apache.hadoop.hbase.Stoppable; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.NonceKey; 035 036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 037 038/** 039 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after 040 * some time; if nonce group/client ID is supplied, nonces are stored by client ID. 041 */ 042@InterfaceAudience.Private 043public class ServerNonceManager { 044 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod"; 045 private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class); 046 047 /** The time to wait in an extremely unlikely case of a conflict with a running op. 048 * Only here so that tests could override it and not wait. */ 049 private int conflictWaitIterationMs = 30000; 050 051 private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS"); 052 053 // This object is used to synchronize on in case of collisions, and for cleanup. 054 private static class OperationContext { 055 static final int DONT_PROCEED = 0; 056 static final int PROCEED = 1; 057 static final int WAIT = 2; 058 059 // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity 060 private long data = 0; 061 private static final long STATE_BITS = 3; 062 private static final long WAITING_BIT = 4; 063 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS; 064 065 private volatile long mvcc; 066 067 @Override 068 public String toString() { 069 return "[state " + getState() + ", hasWait " + hasWait() + ", activity " 070 + tsFormat.format(new Date(getActivityTime())) + "]"; 071 } 072 073 public OperationContext() { 074 setState(WAIT); 075 reportActivity(); 076 } 077 078 public void setState(int state) { 079 this.data = (this.data & ~STATE_BITS) | state; 080 } 081 082 public int getState() { 083 return (int)(this.data & STATE_BITS); 084 } 085 086 public void setHasWait() { 087 this.data = this.data | WAITING_BIT; 088 } 089 090 public boolean hasWait() { 091 return (this.data & WAITING_BIT) == WAITING_BIT; 092 } 093 094 public void reportActivity() { 095 long now = EnvironmentEdgeManager.currentTime(); 096 this.data = (this.data & ALL_FLAG_BITS) | (now << 3); 097 } 098 099 public boolean isExpired(long minRelevantTime) { 100 return getActivityTime() < (minRelevantTime & (~0L >>> 3)); 101 } 102 103 public void setMvcc(long mvcc) { 104 this.mvcc = mvcc; 105 } 106 107 public long getMvcc() { 108 return this.mvcc; 109 } 110 111 private long getActivityTime() { 112 return this.data >>> 3; 113 } 114 } 115 116 /** 117 * Nonces. 118 * Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v), 119 * NK: 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes. 120 * With 30min expiration time, 5k increments/appends per sec., we'd use approximately 1Gb, 121 * which is a realistic worst case. If it's much worse, we could use some sort of memory 122 * limit and cleanup. 123 */ 124 private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>(); 125 126 private int deleteNonceGracePeriod; 127 128 public ServerNonceManager(Configuration conf) { 129 // Default - 30 minutes. 130 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000); 131 if (deleteNonceGracePeriod < 60 * 1000) { 132 LOG.warn("Nonce grace period " + deleteNonceGracePeriod 133 + " is less than a minute; might be too small to be useful"); 134 } 135 } 136 137 @VisibleForTesting 138 public void setConflictWaitIterationMs(int conflictWaitIterationMs) { 139 this.conflictWaitIterationMs = conflictWaitIterationMs; 140 } 141 142 /** 143 * Starts the operation if operation with such nonce has not already succeeded. If the 144 * operation is in progress, waits for it to end and checks whether it has succeeded. 145 * @param group Nonce group. 146 * @param nonce Nonce. 147 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped. 148 * @return true if the operation has not already succeeded and can proceed; false otherwise. 149 */ 150 public boolean startOperation(long group, long nonce, Stoppable stoppable) 151 throws InterruptedException { 152 if (nonce == HConstants.NO_NONCE) return true; 153 NonceKey nk = new NonceKey(group, nonce); 154 OperationContext ctx = new OperationContext(); 155 while (true) { 156 OperationContext oldResult = nonces.putIfAbsent(nk, ctx); 157 if (oldResult == null) return true; 158 159 // Collision with some operation - should be extremely rare. 160 synchronized (oldResult) { 161 int oldState = oldResult.getState(); 162 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult); 163 if (oldState != OperationContext.WAIT) { 164 return oldState == OperationContext.PROCEED; // operation ended 165 } 166 oldResult.setHasWait(); 167 oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop 168 if (stoppable.isStopped()) { 169 throw new InterruptedException("Server stopped"); 170 } 171 } 172 } 173 } 174 175 /** 176 * Ends the operation started by startOperation. 177 * @param group Nonce group. 178 * @param nonce Nonce. 179 * @param success Whether the operation has succeeded. 180 */ 181 public void endOperation(long group, long nonce, boolean success) { 182 if (nonce == HConstants.NO_NONCE) return; 183 NonceKey nk = new NonceKey(group, nonce); 184 OperationContext newResult = nonces.get(nk); 185 assert newResult != null; 186 synchronized (newResult) { 187 assert newResult.getState() == OperationContext.WAIT; 188 // If we failed, other retries can proceed. 189 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED); 190 if (success) { 191 newResult.reportActivity(); // Set time to use for cleanup. 192 } else { 193 OperationContext val = nonces.remove(nk); 194 assert val == newResult; 195 } 196 if (newResult.hasWait()) { 197 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult); 198 newResult.notifyAll(); 199 } 200 } 201 } 202 203 /** 204 * Store the write point in OperationContext when the operation succeed. 205 * @param group Nonce group. 206 * @param nonce Nonce. 207 * @param mvcc Write point of the succeed operation. 208 */ 209 public void addMvccToOperationContext(long group, long nonce, long mvcc) { 210 if (nonce == HConstants.NO_NONCE) { 211 return; 212 } 213 NonceKey nk = new NonceKey(group, nonce); 214 OperationContext result = nonces.get(nk); 215 assert result != null; 216 synchronized (result) { 217 result.setMvcc(mvcc); 218 } 219 } 220 221 /** 222 * Return the write point of the previous succeed operation. 223 * @param group Nonce group. 224 * @param nonce Nonce. 225 * @return write point of the previous succeed operation. 226 */ 227 public long getMvccFromOperationContext(long group, long nonce) { 228 if (nonce == HConstants.NO_NONCE) { 229 return Long.MAX_VALUE; 230 } 231 NonceKey nk = new NonceKey(group, nonce); 232 OperationContext result = nonces.get(nk); 233 return result == null ? Long.MAX_VALUE : result.getMvcc(); 234 } 235 236 /** 237 * Reports the operation from WAL during replay. 238 * @param group Nonce group. 239 * @param nonce Nonce. 240 * @param writeTime Entry write time, used to ignore entries that are too old. 241 */ 242 public void reportOperationFromWal(long group, long nonce, long writeTime) { 243 if (nonce == HConstants.NO_NONCE) return; 244 // Give the write time some slack in case the clocks are not synchronized. 245 long now = EnvironmentEdgeManager.currentTime(); 246 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return; 247 OperationContext newResult = new OperationContext(); 248 newResult.setState(OperationContext.DONT_PROCEED); 249 NonceKey nk = new NonceKey(group, nonce); 250 OperationContext oldResult = nonces.putIfAbsent(nk, newResult); 251 if (oldResult != null) { 252 // Some schemes can have collisions (for example, expiring hashes), so just log it. 253 // We have no idea about the semantics here, so this is the least of many evils. 254 LOG.warn("Nonce collision during WAL recovery: " + nk 255 + ", " + oldResult + " with " + newResult); 256 } 257 } 258 259 /** 260 * Creates a scheduled chore that is used to clean up old nonces. 261 * @param stoppable Stoppable for the chore. 262 * @return ScheduledChore; the scheduled chore is not started. 263 */ 264 public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) { 265 // By default, it will run every 6 minutes (30 / 5). 266 return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) { 267 @Override 268 protected void chore() { 269 cleanUpOldNonces(); 270 } 271 }; 272 } 273 274 private void cleanUpOldNonces() { 275 long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod; 276 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) { 277 OperationContext oc = entry.getValue(); 278 if (!oc.isExpired(cutoff)) continue; 279 synchronized (oc) { 280 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue; 281 nonces.remove(entry.getKey()); 282 } 283 } 284 } 285}