001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.List;
023import org.apache.hadoop.hbase.procedure2.Procedure;
024import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker.DeleteState;
025import org.apache.yetus.audience.InterfaceAudience;
026
027import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
028
029/**
030 * A bitmap which can grow/merge with other {@link BitSetNode} (if certain conditions are met).
031 * Boundaries of bitmap are aligned to multiples of {@link BitSetNode#BITS_PER_WORD}. So the range
032 * of a {@link BitSetNode} is from [x * K, y * K) where x and y are integers, y > x and K is
033 * BITS_PER_WORD.
034 * <p/>
035 * We have two main bit sets to describe the state of procedures, the meanings are:
036 *
037 * <pre>
038 *  ----------------------
039 * | modified | deleted |  meaning
040 * |     0    |   0     |  proc exists, but hasn't been updated since last resetUpdates().
041 * |     1    |   0     |  proc was updated (but not deleted).
042 * |     1    |   1     |  proc was deleted.
043 * |     0    |   1     |  proc doesn't exist (maybe never created, maybe deleted in past).
044 * ----------------------
045 * </pre>
046 *
047 * The meaning of modified is that, we have modified the state of the procedure, no matter insert,
048 * update, or delete. And if it is an insert or update, we will set the deleted to 0, if not we will
049 * set the delete to 1.
050 * <p/>
051 * For a non-partial BitSetNode, the initial modified value is 0 and deleted value is 1. For the
052 * partial one, the initial modified value is 0 and the initial deleted value is also 0. In
053 * {@link #unsetPartialFlag()} we will reset the deleted to 1 if it is not modified.
054 */
055@InterfaceAudience.Private
056class BitSetNode {
057  private static final long WORD_MASK = 0xffffffffffffffffL;
058  private static final int ADDRESS_BITS_PER_WORD = 6;
059  private static final int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
060  // The BitSetNode itself has 48 bytes overhead, which is the size of 6 longs, so here we use a max
061  // node size 4, which is 8 longs since we have an array for modified and also an array for
062  // deleted. The assumption here is that most procedures will be deleted soon so we'd better keep
063  // the BitSetNode small.
064  private static final int MAX_NODE_SIZE = 4 << ADDRESS_BITS_PER_WORD;
065
066  /**
067   * Mimics {@link ProcedureStoreTracker#partial}. It will effect how we fill the new deleted bits
068   * when growing.
069   */
070  private boolean partial;
071
072  /**
073   * Set of procedures which have been modified since last {@link #resetModified()}. Useful to track
074   * procedures which have been modified since last WAL write.
075   */
076  private long[] modified;
077
078  /**
079   * Keeps track of procedure ids which belong to this bitmap's range and have been deleted. This
080   * represents global state since it's not reset on WAL rolls.
081   */
082  private long[] deleted;
083  /**
084   * Offset of bitmap i.e. procedure id corresponding to first bit.
085   */
086  private long start;
087
088  public void dump() {
089    System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(), getActiveMinProcId(),
090      getActiveMaxProcId());
091    System.out.println("Modified:");
092    for (int i = 0; i < modified.length; ++i) {
093      for (int j = 0; j < BITS_PER_WORD; ++j) {
094        System.out.print((modified[i] & (1L << j)) != 0 ? "1" : "0");
095      }
096      System.out.println(" " + i);
097    }
098    System.out.println();
099    System.out.println("Delete:");
100    for (int i = 0; i < deleted.length; ++i) {
101      for (int j = 0; j < BITS_PER_WORD; ++j) {
102        System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
103      }
104      System.out.println(" " + i);
105    }
106    System.out.println();
107  }
108
109  public BitSetNode(long procId, boolean partial) {
110    start = alignDown(procId);
111
112    int count = 1;
113    modified = new long[count];
114    deleted = new long[count];
115    if (!partial) {
116      Arrays.fill(deleted, WORD_MASK);
117    }
118
119    this.partial = partial;
120    updateState(procId, false);
121  }
122
123  public BitSetNode(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
124    start = data.getStartId();
125    int size = data.getUpdatedCount();
126    assert size == data.getDeletedCount();
127    modified = new long[size];
128    deleted = new long[size];
129    for (int i = 0; i < size; ++i) {
130      modified[i] = data.getUpdated(i);
131      deleted[i] = data.getDeleted(i);
132    }
133    partial = false;
134  }
135
136  public BitSetNode(BitSetNode other, boolean resetDelete) {
137    this.start = other.start;
138    // The resetDelete will be set to true when building cleanup tracker.
139    // as we will reset deleted flags for all the unmodified bits to 1, the partial flag is useless
140    // so set it to false for not confusing the developers when debugging.
141    this.partial = resetDelete ? false : other.partial;
142    this.modified = other.modified.clone();
143    // The intention here is that, if a procedure is not modified in this tracker, then we do not
144    // need to take care of it, so we will set deleted to true for these bits, i.e, if modified is
145    // 0, then we set deleted to 1, otherwise keep it as is. So here, the equation is
146    // deleted |= ~modified, i.e,
147    if (resetDelete) {
148      this.deleted = new long[other.deleted.length];
149      for (int i = 0; i < this.deleted.length; ++i) {
150        this.deleted[i] |= ~(other.modified[i]);
151      }
152    } else {
153      this.deleted = other.deleted.clone();
154    }
155  }
156
157  public void insertOrUpdate(final long procId) {
158    updateState(procId, false);
159  }
160
161  public void delete(final long procId) {
162    updateState(procId, true);
163  }
164
165  public long getStart() {
166    return start;
167  }
168
169  /**
170   * Inclusive.
171   */
172  public long getEnd() {
173    return start + (modified.length << ADDRESS_BITS_PER_WORD) - 1;
174  }
175
176  public boolean contains(final long procId) {
177    return start <= procId && procId <= getEnd();
178  }
179
180  public DeleteState isDeleted(final long procId) {
181    int bitmapIndex = getBitmapIndex(procId);
182    int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
183    if (wordIndex >= deleted.length) {
184      return DeleteState.MAYBE;
185    }
186    // The left shift of java only takes care of the lowest several bits(5 for int and 6 for long),
187    // so here we can use bitmapIndex directly, without mod 64
188    return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
189  }
190
191  public boolean isModified(long procId) {
192    int bitmapIndex = getBitmapIndex(procId);
193    int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
194    if (wordIndex >= modified.length) {
195      return false;
196    }
197    // The left shift of java only takes care of the lowest several bits(5 for int and 6 for long),
198    // so here we can use bitmapIndex directly, without mod 64
199    return (modified[wordIndex] & (1L << bitmapIndex)) != 0;
200  }
201
202  /**
203   * @return true, if all the procedures has been modified.
204   */
205  public boolean isAllModified() {
206    // TODO: cache the value
207    for (int i = 0; i < modified.length; ++i) {
208      if ((modified[i] | deleted[i]) != WORD_MASK) {
209        return false;
210      }
211    }
212    return true;
213  }
214
215  /**
216   * @return all the active procedure ids in this bit set.
217   */
218  public long[] getActiveProcIds() {
219    List<Long> procIds = new ArrayList<>();
220    for (int wordIndex = 0; wordIndex < modified.length; wordIndex++) {
221      if (deleted[wordIndex] == WORD_MASK || modified[wordIndex] == 0) {
222        // This should be the common case, where most procedures has been deleted.
223        continue;
224      }
225      long baseProcId = getStart() + (wordIndex << ADDRESS_BITS_PER_WORD);
226      for (int i = 0; i < (1 << ADDRESS_BITS_PER_WORD); i++) {
227        long mask = 1L << i;
228        if ((deleted[wordIndex] & mask) == 0 && (modified[wordIndex] & mask) != 0) {
229          procIds.add(baseProcId + i);
230        }
231      }
232    }
233    return procIds.stream().mapToLong(Long::longValue).toArray();
234  }
235
236  /**
237   * @return true, if there are no active procedures in this BitSetNode, else false.
238   */
239  public boolean isEmpty() {
240    // TODO: cache the value
241    for (int i = 0; i < deleted.length; ++i) {
242      if (deleted[i] != WORD_MASK) {
243        return false;
244      }
245    }
246    return true;
247  }
248
249  public void resetModified() {
250    Arrays.fill(modified, 0);
251  }
252
253  public void unsetPartialFlag() {
254    partial = false;
255    for (int i = 0; i < modified.length; ++i) {
256      for (int j = 0; j < BITS_PER_WORD; ++j) {
257        if ((modified[i] & (1L << j)) == 0) {
258          deleted[i] |= (1L << j);
259        }
260      }
261    }
262  }
263
264  /**
265   * Convert to
266   * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode
267   * protobuf.
268   */
269  public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
270    ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
271      ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
272    builder.setStartId(start);
273    for (int i = 0; i < modified.length; ++i) {
274      builder.addUpdated(modified[i]);
275      builder.addDeleted(deleted[i]);
276    }
277    return builder.build();
278  }
279
280  // ========================================================================
281  // Grow/Merge Helpers
282  // ========================================================================
283  public boolean canGrow(long procId) {
284    if (procId <= start) {
285      return getEnd() - procId < MAX_NODE_SIZE;
286    } else {
287      return procId - start < MAX_NODE_SIZE;
288    }
289  }
290
291  public boolean canMerge(BitSetNode rightNode) {
292    // Can just compare 'starts' since boundaries are aligned to multiples of BITS_PER_WORD.
293    assert start < rightNode.start;
294    return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
295  }
296
297  public void grow(long procId) {
298    // make sure you have call canGrow first before calling this method
299    assert canGrow(procId);
300    if (procId < start) {
301      // grow to left
302      long newStart = alignDown(procId);
303      int delta = (int) (start - newStart) >> ADDRESS_BITS_PER_WORD;
304      start = newStart;
305      long[] newModified = new long[modified.length + delta];
306      System.arraycopy(modified, 0, newModified, delta, modified.length);
307      modified = newModified;
308      long[] newDeleted = new long[deleted.length + delta];
309      if (!partial) {
310        for (int i = 0; i < delta; i++) {
311          newDeleted[i] = WORD_MASK;
312        }
313      }
314      System.arraycopy(deleted, 0, newDeleted, delta, deleted.length);
315      deleted = newDeleted;
316    } else {
317      // grow to right
318      long newEnd = alignUp(procId + 1);
319      int delta = (int) (newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
320      int newSize = modified.length + delta;
321      long[] newModified = Arrays.copyOf(modified, newSize);
322      modified = newModified;
323      long[] newDeleted = Arrays.copyOf(deleted, newSize);
324      if (!partial) {
325        for (int i = deleted.length; i < newSize; i++) {
326          newDeleted[i] = WORD_MASK;
327        }
328      }
329      deleted = newDeleted;
330    }
331  }
332
333  public void merge(BitSetNode rightNode) {
334    assert start < rightNode.start;
335    int newSize = (int) (rightNode.getEnd() - start + 1) >> ADDRESS_BITS_PER_WORD;
336    long[] newModified = Arrays.copyOf(modified, newSize);
337    System.arraycopy(rightNode.modified, 0, newModified, newSize - rightNode.modified.length,
338      rightNode.modified.length);
339    long[] newDeleted = Arrays.copyOf(deleted, newSize);
340    System.arraycopy(rightNode.deleted, 0, newDeleted, newSize - rightNode.deleted.length,
341      rightNode.deleted.length);
342    if (!partial) {
343      for (int i = deleted.length, n = newSize - rightNode.deleted.length; i < n; i++) {
344        newDeleted[i] = WORD_MASK;
345      }
346    }
347    modified = newModified;
348    deleted = newDeleted;
349  }
350
351  @Override
352  public String toString() {
353    return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
354  }
355
356  // ========================================================================
357  // Min/Max Helpers
358  // ========================================================================
359  public long getActiveMinProcId() {
360    long minProcId = start;
361    for (int i = 0; i < deleted.length; ++i) {
362      if (deleted[i] == 0) {
363        return minProcId;
364      }
365
366      if (deleted[i] != WORD_MASK) {
367        for (int j = 0; j < BITS_PER_WORD; ++j) {
368          if ((deleted[i] & (1L << j)) == 0) {
369            return minProcId + j;
370          }
371        }
372      }
373
374      minProcId += BITS_PER_WORD;
375    }
376    return Procedure.NO_PROC_ID;
377  }
378
379  public long getActiveMaxProcId() {
380    long maxProcId = getEnd();
381    for (int i = deleted.length - 1; i >= 0; --i) {
382      if (deleted[i] == 0) {
383        return maxProcId;
384      }
385
386      if (deleted[i] != WORD_MASK) {
387        for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
388          if ((deleted[i] & (1L << j)) == 0) {
389            return maxProcId - (BITS_PER_WORD - 1 - j);
390          }
391        }
392      }
393      maxProcId -= BITS_PER_WORD;
394    }
395    return Procedure.NO_PROC_ID;
396  }
397
398  // ========================================================================
399  // Bitmap Helpers
400  // ========================================================================
401  private int getBitmapIndex(final long procId) {
402    return (int) (procId - start);
403  }
404
405  void updateState(long procId, boolean isDeleted) {
406    int bitmapIndex = getBitmapIndex(procId);
407    int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
408    long value = (1L << bitmapIndex);
409
410    modified[wordIndex] |= value;
411    if (isDeleted) {
412      deleted[wordIndex] |= value;
413    } else {
414      deleted[wordIndex] &= ~value;
415    }
416  }
417
418  // ========================================================================
419  // Helpers
420  // ========================================================================
421  /**
422   * @return upper boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
423   */
424  private static long alignUp(final long x) {
425    return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
426  }
427
428  /**
429   * @return lower boundary (aligned to multiple of BITS_PER_WORD) of bitmap range x belongs to.
430   */
431  private static long alignDown(final long x) {
432    return x & -BITS_PER_WORD;
433  }
434}