001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.text.SimpleDateFormat; 021import java.util.Date; 022import java.util.Map; 023import java.util.concurrent.ConcurrentHashMap; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.Stoppable; 028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 029import org.apache.hadoop.hbase.util.NonceKey; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after some 036 * time; if nonce group/client ID is supplied, nonces are stored by client ID. 037 */ 038@InterfaceAudience.Private 039public class ServerNonceManager { 040 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod"; 041 private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class); 042 043 /** 044 * The time to wait in an extremely unlikely case of a conflict with a running op. Only here so 045 * that tests could override it and not wait. 046 */ 047 private int conflictWaitIterationMs = 30000; 048 049 private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS"); 050 051 // This object is used to synchronize on in case of collisions, and for cleanup. 052 private static class OperationContext { 053 static final int DONT_PROCEED = 0; 054 static final int PROCEED = 1; 055 static final int WAIT = 2; 056 057 // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity 058 private long data = 0; 059 private static final long STATE_BITS = 3; 060 private static final long WAITING_BIT = 4; 061 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS; 062 063 private volatile long mvcc; 064 065 @Override 066 public String toString() { 067 return "[state " + getState() + ", hasWait " + hasWait() + ", activity " 068 + tsFormat.format(new Date(getActivityTime())) + "]"; 069 } 070 071 public OperationContext() { 072 setState(WAIT); 073 reportActivity(); 074 } 075 076 public void setState(int state) { 077 this.data = (this.data & ~STATE_BITS) | state; 078 } 079 080 public int getState() { 081 return (int) (this.data & STATE_BITS); 082 } 083 084 public void setHasWait() { 085 this.data = this.data | WAITING_BIT; 086 } 087 088 public boolean hasWait() { 089 return (this.data & WAITING_BIT) == WAITING_BIT; 090 } 091 092 public void reportActivity() { 093 long now = EnvironmentEdgeManager.currentTime(); 094 this.data = (this.data & ALL_FLAG_BITS) | (now << 3); 095 } 096 097 public boolean isExpired(long minRelevantTime) { 098 return getActivityTime() < (minRelevantTime & (~0L >>> 3)); 099 } 100 101 public void setMvcc(long mvcc) { 102 this.mvcc = mvcc; 103 } 104 105 public long getMvcc() { 106 return this.mvcc; 107 } 108 109 private long getActivityTime() { 110 return this.data >>> 3; 111 } 112 } 113 114 /** 115 * Nonces. Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v), NK: 116 * 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes. With 30min expiration time, 5k 117 * increments/appends per sec., we'd use approximately 1Gb, which is a realistic worst case. If 118 * it's much worse, we could use some sort of memory limit and cleanup. 119 */ 120 private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>(); 121 122 private int deleteNonceGracePeriod; 123 124 public ServerNonceManager(Configuration conf) { 125 // Default - 30 minutes. 126 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000); 127 if (deleteNonceGracePeriod < 60 * 1000) { 128 LOG.warn("Nonce grace period " + deleteNonceGracePeriod 129 + " is less than a minute; might be too small to be useful"); 130 } 131 } 132 133 public void setConflictWaitIterationMs(int conflictWaitIterationMs) { 134 this.conflictWaitIterationMs = conflictWaitIterationMs; 135 } 136 137 /** 138 * Starts the operation if operation with such nonce has not already succeeded. If the operation 139 * is in progress, waits for it to end and checks whether it has succeeded. 140 * @param group Nonce group. 141 * @param nonce Nonce. 142 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped. 143 * @return true if the operation has not already succeeded and can proceed; false otherwise. 144 */ 145 public boolean startOperation(long group, long nonce, Stoppable stoppable) 146 throws InterruptedException { 147 if (nonce == HConstants.NO_NONCE) return true; 148 NonceKey nk = new NonceKey(group, nonce); 149 OperationContext ctx = new OperationContext(); 150 while (true) { 151 OperationContext oldResult = nonces.putIfAbsent(nk, ctx); 152 if (oldResult == null) return true; 153 154 // Collision with some operation - should be extremely rare. 155 synchronized (oldResult) { 156 int oldState = oldResult.getState(); 157 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult); 158 if (oldState != OperationContext.WAIT) { 159 return oldState == OperationContext.PROCEED; // operation ended 160 } 161 oldResult.setHasWait(); 162 oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop 163 if (stoppable.isStopped()) { 164 throw new InterruptedException("Server stopped"); 165 } 166 } 167 } 168 } 169 170 /** 171 * Ends the operation started by startOperation. 172 * @param group Nonce group. 173 * @param nonce Nonce. 174 * @param success Whether the operation has succeeded. 175 */ 176 public void endOperation(long group, long nonce, boolean success) { 177 if (nonce == HConstants.NO_NONCE) return; 178 NonceKey nk = new NonceKey(group, nonce); 179 OperationContext newResult = nonces.get(nk); 180 assert newResult != null; 181 synchronized (newResult) { 182 assert newResult.getState() == OperationContext.WAIT; 183 // If we failed, other retries can proceed. 184 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED); 185 if (success) { 186 newResult.reportActivity(); // Set time to use for cleanup. 187 } else { 188 OperationContext val = nonces.remove(nk); 189 assert val == newResult; 190 } 191 if (newResult.hasWait()) { 192 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult); 193 newResult.notifyAll(); 194 } 195 } 196 } 197 198 /** 199 * Store the write point in OperationContext when the operation succeed. 200 * @param group Nonce group. 201 * @param nonce Nonce. 202 * @param mvcc Write point of the succeed operation. 203 */ 204 public void addMvccToOperationContext(long group, long nonce, long mvcc) { 205 if (nonce == HConstants.NO_NONCE) { 206 return; 207 } 208 NonceKey nk = new NonceKey(group, nonce); 209 OperationContext result = nonces.get(nk); 210 assert result != null; 211 synchronized (result) { 212 result.setMvcc(mvcc); 213 } 214 } 215 216 /** 217 * Return the write point of the previous succeed operation. 218 * @param group Nonce group. 219 * @param nonce Nonce. 220 * @return write point of the previous succeed operation. 221 */ 222 public long getMvccFromOperationContext(long group, long nonce) { 223 if (nonce == HConstants.NO_NONCE) { 224 return Long.MAX_VALUE; 225 } 226 NonceKey nk = new NonceKey(group, nonce); 227 OperationContext result = nonces.get(nk); 228 return result == null ? Long.MAX_VALUE : result.getMvcc(); 229 } 230 231 /** 232 * Reports the operation from WAL during replay. 233 * @param group Nonce group. 234 * @param nonce Nonce. 235 * @param writeTime Entry write time, used to ignore entries that are too old. 236 */ 237 public void reportOperationFromWal(long group, long nonce, long writeTime) { 238 if (nonce == HConstants.NO_NONCE) return; 239 // Give the write time some slack in case the clocks are not synchronized. 240 long now = EnvironmentEdgeManager.currentTime(); 241 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return; 242 OperationContext newResult = new OperationContext(); 243 newResult.setState(OperationContext.DONT_PROCEED); 244 NonceKey nk = new NonceKey(group, nonce); 245 OperationContext oldResult = nonces.putIfAbsent(nk, newResult); 246 if (oldResult != null) { 247 // Some schemes can have collisions (for example, expiring hashes), so just log it. 248 // We have no idea about the semantics here, so this is the least of many evils. 249 LOG.warn( 250 "Nonce collision during WAL recovery: " + nk + ", " + oldResult + " with " + newResult); 251 } 252 } 253 254 /** 255 * Creates a scheduled chore that is used to clean up old nonces. 256 * @param stoppable Stoppable for the chore. 257 * @return ScheduledChore; the scheduled chore is not started. 258 */ 259 public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) { 260 // By default, it will run every 6 minutes (30 / 5). 261 return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) { 262 @Override 263 protected void chore() { 264 cleanUpOldNonces(); 265 } 266 }; 267 } 268 269 private void cleanUpOldNonces() { 270 long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod; 271 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) { 272 OperationContext oc = entry.getValue(); 273 if (!oc.isExpired(cutoff)) continue; 274 synchronized (oc) { 275 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue; 276 nonces.remove(entry.getKey()); 277 } 278 } 279 } 280}