001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.time.Instant; 021import java.time.ZoneId; 022import java.time.format.DateTimeFormatter; 023import java.util.Map; 024import java.util.concurrent.ConcurrentHashMap; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.Stoppable; 029import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 030import org.apache.hadoop.hbase.util.NonceKey; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after some 037 * time; if nonce group/client ID is supplied, nonces are stored by client ID. 038 */ 039@InterfaceAudience.Private 040public class ServerNonceManager { 041 public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod"; 042 private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class); 043 044 /** 045 * The time to wait in an extremely unlikely case of a conflict with a running op. Only here so 046 * that tests could override it and not wait. 047 */ 048 private int conflictWaitIterationMs = 30000; 049 050 private static final DateTimeFormatter TS_FORMAT = 051 DateTimeFormatter.ofPattern("HH:mm:ss.SSS").withZone(ZoneId.systemDefault()); 052 053 // This object is used to synchronize on in case of collisions, and for cleanup. 054 private static class OperationContext { 055 static final int DONT_PROCEED = 0; 056 static final int PROCEED = 1; 057 static final int WAIT = 2; 058 059 // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity 060 private long data = 0; 061 private static final long STATE_BITS = 3; 062 private static final long WAITING_BIT = 4; 063 private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS; 064 065 private volatile long mvcc; 066 067 @Override 068 public String toString() { 069 return "[state " + getState() + ", hasWait " + hasWait() + ", activity " 070 + TS_FORMAT.format(Instant.ofEpochMilli(getActivityTime())) + "]"; 071 } 072 073 public OperationContext() { 074 setState(WAIT); 075 reportActivity(); 076 } 077 078 public void setState(int state) { 079 this.data = (this.data & ~STATE_BITS) | state; 080 } 081 082 public int getState() { 083 return (int) (this.data & STATE_BITS); 084 } 085 086 public void setHasWait() { 087 this.data = this.data | WAITING_BIT; 088 } 089 090 public boolean hasWait() { 091 return (this.data & WAITING_BIT) == WAITING_BIT; 092 } 093 094 public void reportActivity() { 095 long now = EnvironmentEdgeManager.currentTime(); 096 this.data = (this.data & ALL_FLAG_BITS) | (now << 3); 097 } 098 099 public boolean isExpired(long minRelevantTime) { 100 return getActivityTime() < (minRelevantTime & (~0L >>> 3)); 101 } 102 103 public void setMvcc(long mvcc) { 104 this.mvcc = mvcc; 105 } 106 107 public long getMvcc() { 108 return this.mvcc; 109 } 110 111 private long getActivityTime() { 112 return this.data >>> 3; 113 } 114 } 115 116 /** 117 * Nonces. Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v), NK: 118 * 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes. With 30min expiration time, 5k 119 * increments/appends per sec., we'd use approximately 1Gb, which is a realistic worst case. If 120 * it's much worse, we could use some sort of memory limit and cleanup. 121 */ 122 private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>(); 123 124 private int deleteNonceGracePeriod; 125 126 public ServerNonceManager(Configuration conf) { 127 // Default - 30 minutes. 128 deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000); 129 if (deleteNonceGracePeriod < 60 * 1000) { 130 LOG.warn("Nonce grace period " + deleteNonceGracePeriod 131 + " is less than a minute; might be too small to be useful"); 132 } 133 } 134 135 public void setConflictWaitIterationMs(int conflictWaitIterationMs) { 136 this.conflictWaitIterationMs = conflictWaitIterationMs; 137 } 138 139 /** 140 * Starts the operation if operation with such nonce has not already succeeded. If the operation 141 * is in progress, waits for it to end and checks whether it has succeeded. 142 * @param group Nonce group. 143 * @param nonce Nonce. 144 * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped. 145 * @return true if the operation has not already succeeded and can proceed; false otherwise. 146 */ 147 public boolean startOperation(long group, long nonce, Stoppable stoppable) 148 throws InterruptedException { 149 if (nonce == HConstants.NO_NONCE) return true; 150 NonceKey nk = new NonceKey(group, nonce); 151 OperationContext ctx = new OperationContext(); 152 while (true) { 153 OperationContext oldResult = nonces.putIfAbsent(nk, ctx); 154 if (oldResult == null) return true; 155 156 // Collision with some operation - should be extremely rare. 157 synchronized (oldResult) { 158 int oldState = oldResult.getState(); 159 LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult); 160 if (oldState != OperationContext.WAIT) { 161 return oldState == OperationContext.PROCEED; // operation ended 162 } 163 oldResult.setHasWait(); 164 oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop 165 if (stoppable.isStopped()) { 166 throw new InterruptedException("Server stopped"); 167 } 168 } 169 } 170 } 171 172 /** 173 * Ends the operation started by startOperation. 174 * @param group Nonce group. 175 * @param nonce Nonce. 176 * @param success Whether the operation has succeeded. 177 */ 178 public void endOperation(long group, long nonce, boolean success) { 179 if (nonce == HConstants.NO_NONCE) return; 180 NonceKey nk = new NonceKey(group, nonce); 181 OperationContext newResult = nonces.get(nk); 182 assert newResult != null; 183 synchronized (newResult) { 184 assert newResult.getState() == OperationContext.WAIT; 185 // If we failed, other retries can proceed. 186 newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED); 187 if (success) { 188 newResult.reportActivity(); // Set time to use for cleanup. 189 } else { 190 OperationContext val = nonces.remove(nk); 191 assert val == newResult; 192 } 193 if (newResult.hasWait()) { 194 LOG.debug("Conflict with running op ended: " + nk + ", " + newResult); 195 newResult.notifyAll(); 196 } 197 } 198 } 199 200 /** 201 * Store the write point in OperationContext when the operation succeed. 202 * @param group Nonce group. 203 * @param nonce Nonce. 204 * @param mvcc Write point of the succeed operation. 205 */ 206 public void addMvccToOperationContext(long group, long nonce, long mvcc) { 207 if (nonce == HConstants.NO_NONCE) { 208 return; 209 } 210 NonceKey nk = new NonceKey(group, nonce); 211 OperationContext result = nonces.get(nk); 212 assert result != null; 213 synchronized (result) { 214 result.setMvcc(mvcc); 215 } 216 } 217 218 /** 219 * Return the write point of the previous succeed operation. 220 * @param group Nonce group. 221 * @param nonce Nonce. 222 * @return write point of the previous succeed operation. 223 */ 224 public long getMvccFromOperationContext(long group, long nonce) { 225 if (nonce == HConstants.NO_NONCE) { 226 return Long.MAX_VALUE; 227 } 228 NonceKey nk = new NonceKey(group, nonce); 229 OperationContext result = nonces.get(nk); 230 return result == null ? Long.MAX_VALUE : result.getMvcc(); 231 } 232 233 /** 234 * Reports the operation from WAL during replay. 235 * @param group Nonce group. 236 * @param nonce Nonce. 237 * @param writeTime Entry write time, used to ignore entries that are too old. 238 */ 239 public void reportOperationFromWal(long group, long nonce, long writeTime) { 240 if (nonce == HConstants.NO_NONCE) return; 241 // Give the write time some slack in case the clocks are not synchronized. 242 long now = EnvironmentEdgeManager.currentTime(); 243 if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return; 244 OperationContext newResult = new OperationContext(); 245 newResult.setState(OperationContext.DONT_PROCEED); 246 NonceKey nk = new NonceKey(group, nonce); 247 OperationContext oldResult = nonces.putIfAbsent(nk, newResult); 248 if (oldResult != null) { 249 // Some schemes can have collisions (for example, expiring hashes), so just log it. 250 // We have no idea about the semantics here, so this is the least of many evils. 251 LOG.warn( 252 "Nonce collision during WAL recovery: " + nk + ", " + oldResult + " with " + newResult); 253 } 254 } 255 256 /** 257 * Creates a scheduled chore that is used to clean up old nonces. 258 * @param stoppable Stoppable for the chore. 259 * @return ScheduledChore; the scheduled chore is not started. 260 */ 261 public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) { 262 // By default, it will run every 6 minutes (30 / 5). 263 return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) { 264 @Override 265 protected void chore() { 266 cleanUpOldNonces(); 267 } 268 }; 269 } 270 271 private void cleanUpOldNonces() { 272 long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod; 273 for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) { 274 OperationContext oc = entry.getValue(); 275 if (!oc.isExpired(cutoff)) continue; 276 synchronized (oc) { 277 if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue; 278 nonces.remove(entry.getKey()); 279 } 280 } 281 } 282}