001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.regionserver;
020
021import java.text.SimpleDateFormat;
022import java.util.Date;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.ScheduledChore;
029import org.apache.hadoop.hbase.Stoppable;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
034import org.apache.hadoop.hbase.util.NonceKey;
035
036import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
037
038/**
039 * Implementation of nonce manager that stores nonces in a hash map and cleans them up after
040 * some time; if nonce group/client ID is supplied, nonces are stored by client ID.
041 */
042@InterfaceAudience.Private
043public class ServerNonceManager {
044  public static final String HASH_NONCE_GRACE_PERIOD_KEY = "hbase.server.hashNonce.gracePeriod";
045  private static final Logger LOG = LoggerFactory.getLogger(ServerNonceManager.class);
046
047  /** The time to wait in an extremely unlikely case of a conflict with a running op.
048   * Only here so that tests could override it and not wait. */
049  private int conflictWaitIterationMs = 30000;
050
051  private static final SimpleDateFormat tsFormat = new SimpleDateFormat("HH:mm:ss.SSS");
052
053  // This object is used to synchronize on in case of collisions, and for cleanup.
054  private static class OperationContext {
055    static final int DONT_PROCEED = 0;
056    static final int PROCEED = 1;
057    static final int WAIT = 2;
058
059    // 0..1 - state, 2..2 - whether anyone is waiting, 3.. - ts of last activity
060    private long data = 0;
061    private static final long STATE_BITS = 3;
062    private static final long WAITING_BIT = 4;
063    private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
064
065    private volatile long mvcc;
066
067    @Override
068    public String toString() {
069      return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
070          + tsFormat.format(new Date(getActivityTime())) + "]";
071    }
072
073    public OperationContext() {
074      setState(WAIT);
075      reportActivity();
076    }
077
078    public void setState(int state) {
079      this.data = (this.data & ~STATE_BITS) | state;
080    }
081
082    public int getState() {
083      return (int)(this.data & STATE_BITS);
084    }
085
086    public void setHasWait() {
087      this.data = this.data | WAITING_BIT;
088    }
089
090    public boolean hasWait() {
091      return (this.data & WAITING_BIT) == WAITING_BIT;
092    }
093
094    public void reportActivity() {
095      long now = EnvironmentEdgeManager.currentTime();
096      this.data = (this.data & ALL_FLAG_BITS) | (now << 3);
097    }
098
099    public boolean isExpired(long minRelevantTime) {
100      return getActivityTime() < (minRelevantTime & (~0L >>> 3));
101    }
102
103    public void setMvcc(long mvcc) {
104      this.mvcc = mvcc;
105    }
106
107    public long getMvcc() {
108      return this.mvcc;
109    }
110
111    private long getActivityTime() {
112      return this.data >>> 3;
113    }
114  }
115
116  /**
117   * Nonces.
118   * Approximate overhead per nonce: 64 bytes from hashmap, 32 from two objects (k/v),
119   * NK: 16 bytes (2 longs), OC: 8 bytes (1 long) - so, 120 bytes.
120   * With 30min expiration time, 5k increments/appends per sec., we'd use approximately 1Gb,
121   * which is a realistic worst case. If it's much worse, we could use some sort of memory
122   * limit and cleanup.
123   */
124  private ConcurrentHashMap<NonceKey, OperationContext> nonces = new ConcurrentHashMap<>();
125
126  private int deleteNonceGracePeriod;
127
128  public ServerNonceManager(Configuration conf) {
129    // Default - 30 minutes.
130    deleteNonceGracePeriod = conf.getInt(HASH_NONCE_GRACE_PERIOD_KEY, 30 * 60 * 1000);
131    if (deleteNonceGracePeriod < 60 * 1000) {
132      LOG.warn("Nonce grace period " + deleteNonceGracePeriod
133          + " is less than a minute; might be too small to be useful");
134    }
135  }
136
137  @VisibleForTesting
138  public void setConflictWaitIterationMs(int conflictWaitIterationMs) {
139    this.conflictWaitIterationMs = conflictWaitIterationMs;
140  }
141
142  /**
143   * Starts the operation if operation with such nonce has not already succeeded. If the
144   * operation is in progress, waits for it to end and checks whether it has succeeded.
145   * @param group Nonce group.
146   * @param nonce Nonce.
147   * @param stoppable Stoppable that terminates waiting (if any) when the server is stopped.
148   * @return true if the operation has not already succeeded and can proceed; false otherwise.
149   */
150  public boolean startOperation(long group, long nonce, Stoppable stoppable)
151      throws InterruptedException {
152    if (nonce == HConstants.NO_NONCE) return true;
153    NonceKey nk = new NonceKey(group, nonce);
154    OperationContext ctx = new OperationContext();
155    while (true) {
156      OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
157      if (oldResult == null) return true;
158
159      // Collision with some operation - should be extremely rare.
160      synchronized (oldResult) {
161        int oldState = oldResult.getState();
162        LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
163        if (oldState != OperationContext.WAIT) {
164          return oldState == OperationContext.PROCEED; // operation ended
165        }
166        oldResult.setHasWait();
167        oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
168        if (stoppable.isStopped()) {
169          throw new InterruptedException("Server stopped");
170        }
171      }
172    }
173  }
174
175  /**
176   * Ends the operation started by startOperation.
177   * @param group Nonce group.
178   * @param nonce Nonce.
179   * @param success Whether the operation has succeeded.
180   */
181  public void endOperation(long group, long nonce, boolean success) {
182    if (nonce == HConstants.NO_NONCE) return;
183    NonceKey nk = new NonceKey(group, nonce);
184    OperationContext newResult = nonces.get(nk);
185    assert newResult != null;
186    synchronized (newResult) {
187      assert newResult.getState() == OperationContext.WAIT;
188      // If we failed, other retries can proceed.
189      newResult.setState(success ? OperationContext.DONT_PROCEED : OperationContext.PROCEED);
190      if (success) {
191        newResult.reportActivity(); // Set time to use for cleanup.
192      } else {
193        OperationContext val = nonces.remove(nk);
194        assert val == newResult;
195      }
196      if (newResult.hasWait()) {
197        LOG.debug("Conflict with running op ended: " + nk + ", " + newResult);
198        newResult.notifyAll();
199      }
200    }
201  }
202
203  /**
204   * Store the write point in OperationContext when the operation succeed.
205   * @param group Nonce group.
206   * @param nonce Nonce.
207   * @param mvcc Write point of the succeed operation.
208   */
209  public void addMvccToOperationContext(long group, long nonce, long mvcc) {
210    if (nonce == HConstants.NO_NONCE) {
211      return;
212    }
213    NonceKey nk = new NonceKey(group, nonce);
214    OperationContext result = nonces.get(nk);
215    assert result != null;
216    synchronized (result) {
217      result.setMvcc(mvcc);
218    }
219  }
220
221  /**
222   * Return the write point of the previous succeed operation.
223   * @param group Nonce group.
224   * @param nonce Nonce.
225   * @return write point of the previous succeed operation.
226   */
227  public long getMvccFromOperationContext(long group, long nonce) {
228    if (nonce == HConstants.NO_NONCE) {
229      return Long.MAX_VALUE;
230    }
231    NonceKey nk = new NonceKey(group, nonce);
232    OperationContext result = nonces.get(nk);
233    return result == null ? Long.MAX_VALUE : result.getMvcc();
234  }
235
236  /**
237   * Reports the operation from WAL during replay.
238   * @param group Nonce group.
239   * @param nonce Nonce.
240   * @param writeTime Entry write time, used to ignore entries that are too old.
241   */
242  public void reportOperationFromWal(long group, long nonce, long writeTime) {
243    if (nonce == HConstants.NO_NONCE) return;
244    // Give the write time some slack in case the clocks are not synchronized.
245    long now = EnvironmentEdgeManager.currentTime();
246    if (now > writeTime + (deleteNonceGracePeriod * 1.5)) return;
247    OperationContext newResult = new OperationContext();
248    newResult.setState(OperationContext.DONT_PROCEED);
249    NonceKey nk = new NonceKey(group, nonce);
250    OperationContext oldResult = nonces.putIfAbsent(nk, newResult);
251    if (oldResult != null) {
252      // Some schemes can have collisions (for example, expiring hashes), so just log it.
253      // We have no idea about the semantics here, so this is the least of many evils.
254      LOG.warn("Nonce collision during WAL recovery: " + nk
255          + ", " + oldResult + " with " + newResult);
256    }
257  }
258
259  /**
260   * Creates a scheduled chore that is used to clean up old nonces.
261   * @param stoppable Stoppable for the chore.
262   * @return ScheduledChore; the scheduled chore is not started.
263   */
264  public ScheduledChore createCleanupScheduledChore(Stoppable stoppable) {
265    // By default, it will run every 6 minutes (30 / 5).
266    return new ScheduledChore("nonceCleaner", stoppable, deleteNonceGracePeriod / 5) {
267      @Override
268      protected void chore() {
269        cleanUpOldNonces();
270      }
271    };
272  }
273
274  private void cleanUpOldNonces() {
275    long cutoff = EnvironmentEdgeManager.currentTime() - deleteNonceGracePeriod;
276    for (Map.Entry<NonceKey, OperationContext> entry : nonces.entrySet()) {
277      OperationContext oc = entry.getValue();
278      if (!oc.isExpired(cutoff)) continue;
279      synchronized (oc) {
280        if (oc.getState() == OperationContext.WAIT || !oc.isExpired(cutoff)) continue;
281        nonces.remove(entry.getKey());
282      }
283    }
284  }
285}