001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.text.SimpleDateFormat;
021import java.util.Date;
022import java.util.Map;
023import java.util.concurrent.ConcurrentHashMap;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.Stoppable;
028import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
029import org.apache.hadoop.hbase.util.NonceKey;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after some
036 * time; if nonce group/client ID is supplied, nonces are stored by client ID.
037 */
038@InterfaceAudience.Private
039public class ServerNonceManager {
040  public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
041  private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class);
042
043  /**
044   * The time to wait in an extremely unlikely case of a conflict with a running op. Only here so
045   * that tests could override it and not wait.
046   */
047  private int conflictWaitIterationMs = 30000;
048
049  private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
050
051  // This object is used to synchronize on in case of collisions, and for cleanup.
052  private static class OperationContext {
053    static final int DONT_PROCEED = 0;
054    static final int PROCEED = 1;
055    static final int WAIT = 2;
056
057    // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity
058    private long data = 0;
059    private static final long STATE_BITS = 3;
060    private static final long WAITING_BIT = 4;
061    private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
062
063    private volatile long mvcc;
064
065    @Override
066    public String toString() {
067      return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
068        + tsFormat.format(new Date(getActivityTime())) + "]";
069    }
070
071    public OperationContext() {
072      setState(WAIT);
073      reportActivity();
074    }
075
076    public void setState(int state) {
077      this.data = (this.data & ~STATE_BITS) | state;
078    }
079
080    public int getState() {
081      return (int) (this.data & STATE_BITS);
082    }
083
084    public void setHasWait() {
085      this.data = this.data | WAITING_BIT;
086    }
087
088    public boolean hasWait() {
089      return (this.data & WAITING_BIT) == WAITING_BIT;
090    }
091
092    public void reportActivity() {
093      long now = EnvironmentEdgeManager.currentTime();
094      this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
095    }
096
097    public boolean isExpired(long minRelevantTime) {
098      return getActivityTime() < (minRelevantTime & (~0L >>> 3));
099    }
100
101    public void setMvcc(long mvcc) {
102      this.mvcc = mvcc;
103    }
104
105    public long getMvcc() {
106      return this.mvcc;
107    }
108
109    private long getActivityTime() {
110      return this.data >>> 3;
111    }
112  }
113
114  /**
115   * Nonces. Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v), NK:
116   * 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes. With 30min expiration time, 5k
117   * increments/appends per sec., we'd use approximately 1Gb, which is a realistic worst case. If
118   * it's much worse, we could use some sort of memory limit and cleanup.
119   */
120  private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>();
121
122  private int deleteNonceGracePeriod;
123
124  public ServerNonceManager(Configuration conf) {
125    // Default - 30 minutes.
126    deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
127    if (deleteNonceGracePeriod < 60 * 1000) {
128      LOG.warn("Nonce grace period " + deleteNonceGracePeriod
129        + " is less than a minute; might be too small to be useful");
130    }
131  }
132
133  public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
134    this.conflictWaitIterationMs = conflictWaitIterationMs;
135  }
136
137  /**
138   * Starts the operation if operation with such nonce has not already succeeded. If the operation
139   * is in progress, waits for it to end and checks whether it has succeeded.
140   * @param group     Nonce group.
141   * @param nonce     Nonce.
142   * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
143   * @return true if the operation has not already succeeded and can proceed; false otherwise.
144   */
145  public boolean startOperation(long group, long nonce, Stoppable stoppable)
146    throws InterruptedException {
147    if (nonce == HConstants.NO_NONCE) return true;
148    NonceKey nk = new NonceKey(group, nonce);
149    OperationContext ctx = new OperationContext();
150    while (true) {
151      OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
152      if (oldResult == null) return true;
153
154      // Collision with some operation - should be extremely rare.
155      synchronized (oldResult) {
156        int oldState = oldResult.getState();
157        LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
158        if (oldState != OperationContext.WAIT) {
159          return oldState == OperationContext.PROCEED; // operation ended
160        }
161        oldResult.setHasWait();
162        oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
163        if (stoppable.isStopped()) {
164          throw new InterruptedException("Server stopped");
165        }
166      }
167    }
168  }
169
170  /**
171   * Ends the operation started by startOperation.
172   * @param group   Nonce group.
173   * @param nonce   Nonce.
174   * @param success Whether the operation has succeeded.
175   */
176  public void endOperation(long group, long nonce, boolean success) {
177    if (nonce == HConstants.NO_NONCE) return;
178    NonceKey nk = new NonceKey(group, nonce);
179    OperationContext newResult = nonces.get(nk);
180    assert newResult != null;
181    synchronized (newResult) {
182      assert newResult.getState() == OperationContext.WAIT;
183      // If we failed, other retries can proceed.
184      newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
185      if (success) {
186        newResult.reportActivity(); // Set time to use for cleanup.
187      } else {
188        OperationContext val = nonces.remove(nk);
189        assert val == newResult;
190      }
191      if (newResult.hasWait()) {
192        LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
193        newResult.notifyAll();
194      }
195    }
196  }
197
198  /**
199   * Store the write point in OperationContext when the operation succeed.
200   * @param group Nonce group.
201   * @param nonce Nonce.
202   * @param mvcc  Write point of the succeed operation.
203   */
204  public void addMvccToOperationContext(long group, long nonce, long mvcc) {
205    if (nonce == HConstants.NO_NONCE) {
206      return;
207    }
208    NonceKey nk = new NonceKey(group, nonce);
209    OperationContext result = nonces.get(nk);
210    assert result != null;
211    synchronized (result) {
212      result.setMvcc(mvcc);
213    }
214  }
215
216  /**
217   * Return the write point of the previous succeed operation.
218   * @param group Nonce group.
219   * @param nonce Nonce.
220   * @return write point of the previous succeed operation.
221   */
222  public long getMvccFromOperationContext(long group, long nonce) {
223    if (nonce == HConstants.NO_NONCE) {
224      return Long.MAX_VALUE;
225    }
226    NonceKey nk = new NonceKey(group, nonce);
227    OperationContext result = nonces.get(nk);
228    return result == null ? Long.MAX_VALUE : result.getMvcc();
229  }
230
231  /**
232   * Reports the operation from WAL during replay.
233   * @param group     Nonce group.
234   * @param nonce     Nonce.
235   * @param writeTime Entry write time, used to ignore entries that are too old.
236   */
237  public void reportOperationFromWal(long group, long nonce, long writeTime) {
238    if (nonce == HConstants.NO_NONCE) return;
239    // Give the write time some slack in case the clocks are not synchronized.
240    long now = EnvironmentEdgeManager.currentTime();
241    if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
242    OperationContext newResult = new OperationContext();
243    newResult.setState(OperationContext.DONT_PROCEED);
244    NonceKey nk = new NonceKey(group, nonce);
245    OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
246    if (oldResult != null) {
247      // Some schemes can have collisions (for example, expiring hashes), so just log it.
248      // We have no idea about the semantics here, so this is the least of many evils.
249      LOG.warn(
250        "Nonce collision during WAL recovery: " + nk + ", " + oldResult + " with " + newResult);
251    }
252  }
253
254  /**
255   * Creates a scheduled chore that is used to clean up old nonces.
256   * @param stoppable Stoppable for the chore.
257   * @return ScheduledChore; the scheduled chore is not started.
258   */
259  public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
260    // By default, it will run every 6 minutes (30 / 5).
261    return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
262      @Override
263      protected void chore() {
264        cleanUpOldNonces();
265      }
266    };
267  }
268
269  private void cleanUpOldNonces() {
270    long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
271    for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
272      OperationContext oc = entry.getValue();
273      if (!oc.isExpired(cutoff)) continue;
274      synchronized (oc) {
275        if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
276        nonces.remove(entry.getKey());
277      }
278    }
279  }
280}