View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.util.Iterator;
25  import java.util.Map;
26  import java.util.TreeMap;
27  
28  import org.apache.hadoop.hbase.classification.InterfaceAudience;
29  import org.apache.hadoop.hbase.classification.InterfaceStability;
30  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
31  
32  /**
33   * Keeps track of live procedures.
34   *
35   * It can be used by the ProcedureStore to identify which procedures are already
36   * deleted/completed to avoid the deserialization step on restart.
37   */
38  @InterfaceAudience.Private
39  @InterfaceStability.Evolving
40  public class ProcedureStoreTracker {
41    private final TreeMap<Long, BitSetNode> map = new TreeMap<Long, BitSetNode>();
42  
43    private boolean keepDeletes = false;
44    private boolean partial = false;
45  
46    private long minUpdatedProcId = Long.MAX_VALUE;
47    private long maxUpdatedProcId = Long.MIN_VALUE;
48  
49    public enum DeleteState { YES, NO, MAYBE }
50  
51    public static class BitSetNode {
52      private final static long WORD_MASK = 0xffffffffffffffffL;
53      private final static int ADDRESS_BITS_PER_WORD = 6;
54      private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
55      private final static int MAX_NODE_SIZE = 1 << ADDRESS_BITS_PER_WORD;
56  
57      private final boolean partial;
58      private long[] updated;
59      private long[] deleted;
60      private long start;
61  
62      public void dump() {
63        System.out.printf("%06d:%06d min=%d max=%d%n", getStart(), getEnd(),
64          getMinProcId(), getMaxProcId());
65        System.out.println("Update:");
66        for (int i = 0; i < updated.length; ++i) {
67          for (int j = 0; j < BITS_PER_WORD; ++j) {
68            System.out.print((updated[i] & (1L << j)) != 0 ? "1" : "0");
69          }
70          System.out.println(" " + i);
71        }
72        System.out.println();
73        System.out.println("Delete:");
74        for (int i = 0; i < deleted.length; ++i) {
75          for (int j = 0; j < BITS_PER_WORD; ++j) {
76            System.out.print((deleted[i] & (1L << j)) != 0 ? "1" : "0");
77          }
78          System.out.println(" " + i);
79        }
80        System.out.println();
81      }
82  
83      public BitSetNode(final long procId, final boolean partial) {
84        start = alignDown(procId);
85  
86        int count = 1;
87        updated = new long[count];
88        deleted = new long[count];
89        for (int i = 0; i < count; ++i) {
90          updated[i] = 0;
91          deleted[i] = partial ? 0 : WORD_MASK;
92        }
93  
94        this.partial = partial;
95        updateState(procId, false);
96      }
97  
98      protected BitSetNode(final long start, final long[] updated, final long[] deleted) {
99        this.start = start;
100       this.updated = updated;
101       this.deleted = deleted;
102       this.partial = false;
103     }
104 
105     public void update(final long procId) {
106       updateState(procId, false);
107     }
108 
109     public void delete(final long procId) {
110       updateState(procId, true);
111     }
112 
113     public Long getStart() {
114       return start;
115     }
116 
117     public Long getEnd() {
118       return start + (updated.length << ADDRESS_BITS_PER_WORD) - 1;
119     }
120 
121     public boolean contains(final long procId) {
122       return start <= procId && procId <= getEnd();
123     }
124 
125     public DeleteState isDeleted(final long procId) {
126       int bitmapIndex = getBitmapIndex(procId);
127       int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
128       if (wordIndex >= deleted.length) {
129         return DeleteState.MAYBE;
130       }
131       return (deleted[wordIndex] & (1L << bitmapIndex)) != 0 ? DeleteState.YES : DeleteState.NO;
132     }
133 
134     private boolean isUpdated(final long procId) {
135       int bitmapIndex = getBitmapIndex(procId);
136       int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
137       if (wordIndex >= updated.length) {
138         return false;
139       }
140       return (updated[wordIndex] & (1L << bitmapIndex)) != 0;
141     }
142 
143     public boolean isUpdated() {
144       // TODO: cache the value
145       for (int i = 0; i < updated.length; ++i) {
146         if ((updated[i] | deleted[i]) != WORD_MASK) {
147           return false;
148         }
149       }
150       return true;
151     }
152 
153     public boolean isEmpty() {
154       // TODO: cache the value
155       for (int i = 0; i < deleted.length; ++i) {
156         if (deleted[i] != WORD_MASK) {
157           return false;
158         }
159       }
160       return true;
161     }
162 
163     public void resetUpdates() {
164       for (int i = 0; i < updated.length; ++i) {
165         updated[i] = 0;
166       }
167     }
168 
169     public void undeleteAll() {
170       for (int i = 0; i < updated.length; ++i) {
171         deleted[i] = 0;
172       }
173     }
174 
175     public void unsetPartialFlag() {
176       for (int i = 0; i < updated.length; ++i) {
177         for (int j = 0; j < BITS_PER_WORD; ++j) {
178           if ((updated[i] & (1L << j)) == 0) {
179             deleted[i] |= (1L << j);
180           }
181         }
182       }
183     }
184 
185     public ProcedureProtos.ProcedureStoreTracker.TrackerNode convert() {
186       ProcedureProtos.ProcedureStoreTracker.TrackerNode.Builder builder =
187         ProcedureProtos.ProcedureStoreTracker.TrackerNode.newBuilder();
188       builder.setStartId(start);
189       for (int i = 0; i < updated.length; ++i) {
190         builder.addUpdated(updated[i]);
191         builder.addDeleted(deleted[i]);
192       }
193       return builder.build();
194     }
195 
196     public static BitSetNode convert(ProcedureProtos.ProcedureStoreTracker.TrackerNode data) {
197       long start = data.getStartId();
198       int size = data.getUpdatedCount();
199       long[] updated = new long[size];
200       long[] deleted = new long[size];
201       for (int i = 0; i < size; ++i) {
202         updated[i] = data.getUpdated(i);
203         deleted[i] = data.getDeleted(i);
204       }
205       return new BitSetNode(start, updated, deleted);
206     }
207 
208     // ========================================================================
209     //  Grow/Merge Helpers
210     // ========================================================================
211     public boolean canGrow(final long procId) {
212       return Math.abs(procId - start) < MAX_NODE_SIZE;
213     }
214 
215     public boolean canMerge(final BitSetNode rightNode) {
216       assert start < rightNode.getEnd();
217       return (rightNode.getEnd() - start) < MAX_NODE_SIZE;
218     }
219 
220     public void grow(final long procId) {
221       int delta, offset;
222 
223       if (procId < start) {
224         // add to head
225         long newStart = alignDown(procId);
226         delta = (int)(start - newStart) >> ADDRESS_BITS_PER_WORD;
227         offset = delta;
228         start = newStart;
229       } else {
230         // Add to tail
231         long newEnd = alignUp(procId + 1);
232         delta = (int)(newEnd - getEnd()) >> ADDRESS_BITS_PER_WORD;
233         offset = 0;
234       }
235 
236       long[] newBitmap;
237       int oldSize = updated.length;
238 
239       newBitmap = new long[oldSize + delta];
240       for (int i = 0; i < newBitmap.length; ++i) {
241         newBitmap[i] = 0;
242       }
243       System.arraycopy(updated, 0, newBitmap, offset, oldSize);
244       updated = newBitmap;
245 
246       newBitmap = new long[deleted.length + delta];
247       for (int i = 0; i < newBitmap.length; ++i) {
248         newBitmap[i] = partial ? 0 : WORD_MASK;
249       }
250       System.arraycopy(deleted, 0, newBitmap, offset, oldSize);
251       deleted = newBitmap;
252     }
253 
254     public void merge(final BitSetNode rightNode) {
255       int delta = (int)(rightNode.getEnd() - getEnd()) >> ADDRESS_BITS_PER_WORD;
256 
257       long[] newBitmap;
258       int oldSize = updated.length;
259       int newSize = (delta - rightNode.updated.length);
260       int offset = oldSize + newSize;
261 
262       newBitmap = new long[oldSize + delta];
263       System.arraycopy(updated, 0, newBitmap, 0, oldSize);
264       System.arraycopy(rightNode.updated, 0, newBitmap, offset, rightNode.updated.length);
265       updated = newBitmap;
266 
267       newBitmap = new long[oldSize + delta];
268       System.arraycopy(deleted, 0, newBitmap, 0, oldSize);
269       System.arraycopy(rightNode.deleted, 0, newBitmap, offset, rightNode.deleted.length);
270       deleted = newBitmap;
271 
272       for (int i = 0; i < newSize; ++i) {
273         updated[offset + i] = 0;
274         deleted[offset + i] = partial ? 0 : WORD_MASK;
275       }
276     }
277 
278     @Override
279     public String toString() {
280       return "BitSetNode(" + getStart() + "-" + getEnd() + ")";
281     }
282 
283     // ========================================================================
284     //  Min/Max Helpers
285     // ========================================================================
286     public long getMinProcId() {
287       long minProcId = start;
288       for (int i = 0; i < deleted.length; ++i) {
289         if (deleted[i] == 0) {
290           return(minProcId);
291         }
292 
293         if (deleted[i] != WORD_MASK) {
294           for (int j = 0; j < BITS_PER_WORD; ++j) {
295             if ((deleted[i] & (1L << j)) != 0) {
296               return minProcId + j;
297             }
298           }
299         }
300 
301         minProcId += BITS_PER_WORD;
302       }
303       return minProcId;
304     }
305 
306     public long getMaxProcId() {
307       long maxProcId = getEnd();
308       for (int i = deleted.length - 1; i >= 0; --i) {
309         if (deleted[i] == 0) {
310           return maxProcId;
311         }
312 
313         if (deleted[i] != WORD_MASK) {
314           for (int j = BITS_PER_WORD - 1; j >= 0; --j) {
315             if ((deleted[i] & (1L << j)) == 0) {
316               return maxProcId - (BITS_PER_WORD - 1 - j);
317             }
318           }
319         }
320         maxProcId -= BITS_PER_WORD;
321       }
322       return maxProcId;
323     }
324 
325     // ========================================================================
326     //  Bitmap Helpers
327     // ========================================================================
328     private int getBitmapIndex(final long procId) {
329       return (int)(procId - start);
330     }
331 
332     private void updateState(final long procId, final boolean isDeleted) {
333       int bitmapIndex = getBitmapIndex(procId);
334       int wordIndex = bitmapIndex >> ADDRESS_BITS_PER_WORD;
335       long value = (1L << bitmapIndex);
336 
337       if (isDeleted) {
338         updated[wordIndex] |= value;
339         deleted[wordIndex] |= value;
340       } else {
341         updated[wordIndex] |= value;
342         deleted[wordIndex] &= ~value;
343       }
344     }
345 
346     // ========================================================================
347     //  Helpers
348     // ========================================================================
349     private static long alignUp(final long x) {
350       return (x + (BITS_PER_WORD - 1)) & -BITS_PER_WORD;
351     }
352 
353     private static long alignDown(final long x) {
354       return x & -BITS_PER_WORD;
355     }
356   }
357 
358   public void insert(long procId) {
359     BitSetNode node = getOrCreateNode(procId);
360     node.update(procId);
361     trackProcIds(procId);
362   }
363 
364   public void insert(final long procId, final long[] subProcIds) {
365     update(procId);
366     for (int i = 0; i < subProcIds.length; ++i) {
367       insert(subProcIds[i]);
368     }
369   }
370 
371   public void update(long procId) {
372     Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
373     assert entry != null : "expected node to update procId=" + procId;
374 
375     BitSetNode node = entry.getValue();
376     assert node.contains(procId);
377     node.update(procId);
378     trackProcIds(procId);
379   }
380 
381   public void delete(long procId) {
382     Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
383     assert entry != null : "expected node to delete procId=" + procId;
384 
385     BitSetNode node = entry.getValue();
386     assert node.contains(procId) : "expected procId in the node";
387     node.delete(procId);
388 
389     if (!keepDeletes && node.isEmpty()) {
390       // TODO: RESET if (map.size() == 1)
391       map.remove(entry.getKey());
392     }
393 
394     trackProcIds(procId);
395   }
396 
397   private void trackProcIds(long procId) {
398     minUpdatedProcId = Math.min(minUpdatedProcId, procId);
399     maxUpdatedProcId = Math.max(maxUpdatedProcId, procId);
400   }
401 
402   public long getUpdatedMinProcId() {
403     return minUpdatedProcId;
404   }
405 
406   public long getUpdatedMaxProcId() {
407     return maxUpdatedProcId;
408   }
409 
410   @InterfaceAudience.Private
411   public void setDeleted(final long procId, final boolean isDeleted) {
412     BitSetNode node = getOrCreateNode(procId);
413     assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
414     node.updateState(procId, isDeleted);
415   }
416 
417   public void reset() {
418     this.keepDeletes = false;
419     this.partial = false;
420     this.map.clear();
421     resetUpdates();
422   }
423 
424   public DeleteState isDeleted(long procId) {
425     Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
426     if (entry != null && entry.getValue().contains(procId)) {
427       BitSetNode node = entry.getValue();
428       DeleteState state = node.isDeleted(procId);
429       return partial && !node.isUpdated(procId) ? DeleteState.MAYBE : state;
430     }
431     return partial ? DeleteState.MAYBE : DeleteState.YES;
432   }
433 
434   public long getMinProcId() {
435     // TODO: Cache?
436     Map.Entry<Long, BitSetNode> entry = map.firstEntry();
437     return entry == null ? 0 : entry.getValue().getMinProcId();
438   }
439 
440   public void setKeepDeletes(boolean keepDeletes) {
441     this.keepDeletes = keepDeletes;
442     if (!keepDeletes) {
443       Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
444       while (it.hasNext()) {
445         Map.Entry<Long, BitSetNode> entry = it.next();
446         if (entry.getValue().isEmpty()) {
447           it.remove();
448         }
449       }
450     }
451   }
452 
453   public void setPartialFlag(boolean isPartial) {
454     if (this.partial && !isPartial) {
455       for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
456         entry.getValue().unsetPartialFlag();
457       }
458     }
459     this.partial = isPartial;
460   }
461 
462   public boolean isEmpty() {
463     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
464       if (entry.getValue().isEmpty() == false) {
465         return false;
466       }
467     }
468     return true;
469   }
470 
471   public boolean isUpdated() {
472     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
473       if (entry.getValue().isUpdated() == false) {
474         return false;
475       }
476     }
477     return true;
478   }
479 
480   public boolean isTracking(long minId, long maxId) {
481     // TODO: we can make it more precise, instead of looking just at the block
482     return map.floorEntry(minId) != null || map.floorEntry(maxId) != null;
483   }
484 
485   public void resetUpdates() {
486     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
487       entry.getValue().resetUpdates();
488     }
489     minUpdatedProcId = Long.MAX_VALUE;
490     maxUpdatedProcId = Long.MIN_VALUE;
491   }
492 
493   public void undeleteAll() {
494     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
495       entry.getValue().undeleteAll();
496     }
497   }
498 
499   private BitSetNode getOrCreateNode(final long procId) {
500     // can procId fit in the left node?
501     BitSetNode leftNode = null;
502     boolean leftCanGrow = false;
503     Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
504     if (leftEntry != null) {
505       leftNode = leftEntry.getValue();
506       if (leftNode.contains(procId)) {
507         return leftNode;
508       }
509       leftCanGrow = leftNode.canGrow(procId);
510     }
511 
512     BitSetNode rightNode = null;
513     boolean rightCanGrow = false;
514     Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
515     if (rightEntry != null) {
516       rightNode = rightEntry.getValue();
517       rightCanGrow = rightNode.canGrow(procId);
518       if (leftNode != null) {
519         if (leftNode.canMerge(rightNode)) {
520           // merge left and right node
521           return mergeNodes(leftNode, rightNode);
522         }
523 
524         if (leftCanGrow && rightCanGrow) {
525           if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
526             // grow the left node
527             return growNode(leftNode, procId);
528           }
529           // grow the right node
530           return growNode(rightNode, procId);
531         }
532       }
533     }
534 
535     // grow the left node
536     if (leftCanGrow) {
537       return growNode(leftNode, procId);
538     }
539 
540     // grow the right node
541     if (rightCanGrow) {
542       return growNode(rightNode, procId);
543     }
544 
545     // add new node
546     BitSetNode node = new BitSetNode(procId, partial);
547     map.put(node.getStart(), node);
548     return node;
549   }
550 
551   private BitSetNode growNode(BitSetNode node, long procId) {
552     map.remove(node.getStart());
553     node.grow(procId);
554     map.put(node.getStart(), node);
555     return node;
556   }
557 
558   private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
559     assert leftNode.getStart() < rightNode.getStart();
560     leftNode.merge(rightNode);
561     map.remove(rightNode.getStart());
562     return leftNode;
563   }
564 
565   public void dump() {
566     System.out.println("map " + map.size());
567     System.out.println("isUpdated " + isUpdated());
568     System.out.println("isEmpty " + isEmpty());
569     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
570       entry.getValue().dump();
571     }
572   }
573 
574   public void writeTo(final OutputStream stream) throws IOException {
575     ProcedureProtos.ProcedureStoreTracker.Builder builder =
576         ProcedureProtos.ProcedureStoreTracker.newBuilder();
577     for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
578       builder.addNode(entry.getValue().convert());
579     }
580     builder.build().writeDelimitedTo(stream);
581   }
582 
583   public void readFrom(final InputStream stream) throws IOException {
584     reset();
585     final ProcedureProtos.ProcedureStoreTracker data =
586         ProcedureProtos.ProcedureStoreTracker.parseDelimitedFrom(stream);
587     for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: data.getNodeList()) {
588       final BitSetNode node = BitSetNode.convert(protoNode);
589       map.put(node.getStart(), node);
590     }
591   }
592 }