001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.TreeMap;
026import java.util.function.BiFunction;
027import java.util.stream.LongStream;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
034
035/**
036 * Keeps track of live procedures.
037 *
038 * It can be used by the ProcedureStore to identify which procedures are already
039 * deleted/completed to avoid the deserialization step on restart
040 */
041@InterfaceAudience.Private
042public class ProcedureStoreTracker {
043
044  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
045
046  // Key is procedure id corresponding to first bit of the bitmap.
047  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
048
049  /**
050   * If true, do not remove bits corresponding to deleted procedures. Note that this can result
051   * in huge bitmaps overtime.
052   * Currently, it's set to true only when building tracker state from logs during recovery. During
053   * recovery, if we are sure that a procedure has been deleted, reading its old update entries
054   * can be skipped.
055   */
056  private boolean keepDeletes = false;
057  /**
058   * If true, it means tracker has incomplete information about the active/deleted procedures.
059   * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
060   * understand it's real use.
061   */
062  boolean partial = false;
063
064  private long minModifiedProcId = Long.MAX_VALUE;
065  private long maxModifiedProcId = Long.MIN_VALUE;
066
067  public enum DeleteState { YES, NO, MAYBE }
068
069  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
070    reset();
071    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
072      final BitSetNode node = new BitSetNode(protoNode);
073      map.put(node.getStart(), node);
074    }
075  }
076
077  /**
078   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
079   */
080  public void resetTo(ProcedureStoreTracker tracker) {
081    resetTo(tracker, false);
082  }
083
084  /**
085   * Resets internal state to same as given {@code tracker}, and change the deleted flag according
086   * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
087   * <p/>
088   * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
089   * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
090   * deleted flag if {@code resetDelete} is true.
091   */
092  public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
093    reset();
094    // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
095    // for all the unmodified bits to 1, the partial flag is useless so set it to false for not
096    // confusing the developers when debugging.
097    this.partial = resetDelete ? false : tracker.partial;
098    this.minModifiedProcId = tracker.minModifiedProcId;
099    this.maxModifiedProcId = tracker.maxModifiedProcId;
100    this.keepDeletes = tracker.keepDeletes;
101    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
102      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
103    }
104  }
105
106  public void insert(long procId) {
107    insert(null, procId);
108  }
109
110  public void insert(long[] procIds) {
111    for (int i = 0; i < procIds.length; ++i) {
112      insert(procIds[i]);
113    }
114  }
115
116  public void insert(long procId, long[] subProcIds) {
117    BitSetNode node = update(null, procId);
118    for (int i = 0; i < subProcIds.length; ++i) {
119      node = insert(node, subProcIds[i]);
120    }
121  }
122
123  private BitSetNode insert(BitSetNode node, long procId) {
124    if (node == null || !node.contains(procId)) {
125      node = getOrCreateNode(procId);
126    }
127    node.insertOrUpdate(procId);
128    trackProcIds(procId);
129    return node;
130  }
131
132  public void update(long procId) {
133    update(null, procId);
134  }
135
136  private BitSetNode update(BitSetNode node, long procId) {
137    node = lookupClosestNode(node, procId);
138    assert node != null : "expected node to update procId=" + procId;
139    assert node.contains(procId) : "expected procId=" + procId + " in the node";
140    if (node == null) {
141      throw new NullPointerException("pid=" + procId);
142    }
143    node.insertOrUpdate(procId);
144    trackProcIds(procId);
145    return node;
146  }
147
148  public void delete(long procId) {
149    delete(null, procId);
150  }
151
152  public void delete(final long[] procIds) {
153    Arrays.sort(procIds);
154    BitSetNode node = null;
155    for (int i = 0; i < procIds.length; ++i) {
156      node = delete(node, procIds[i]);
157    }
158  }
159
160  private BitSetNode delete(BitSetNode node, long procId) {
161    node = lookupClosestNode(node, procId);
162    if (node == null || !node.contains(procId)) {
163      LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId);
164      return node;
165    }
166    node.delete(procId);
167    if (!keepDeletes && node.isEmpty()) {
168      // TODO: RESET if (map.size() == 1)
169      map.remove(node.getStart());
170    }
171
172    trackProcIds(procId);
173    return node;
174  }
175
176  /**
177   * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
178   */
179  public void setMinMaxModifiedProcIds(long min, long max) {
180    this.minModifiedProcId = min;
181    this.maxModifiedProcId = max;
182  }
183  /**
184   * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
185   * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
186   * this is not true, as we will read the wal files in reverse order so a delete may come first.
187   */
188  public void setDeleted(long procId, boolean isDeleted) {
189    BitSetNode node = getOrCreateNode(procId);
190    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
191    node.updateState(procId, isDeleted);
192    trackProcIds(procId);
193  }
194
195  /**
196   * Set the given bit for the procId to delete if it was modified before.
197   * <p/>
198   * This method is used to test whether a procedure wal file can be safely deleted, as if all the
199   * procedures in the given procedure wal file has been modified in the new procedure wal files,
200   * then we can delete it.
201   */
202  public void setDeletedIfModified(long... procId) {
203    BitSetNode node = null;
204    for (int i = 0; i < procId.length; ++i) {
205      node = lookupClosestNode(node, procId[i]);
206      if (node != null && node.isModified(procId[i])) {
207        node.delete(procId[i]);
208      }
209    }
210  }
211
212  private void setDeleteIf(ProcedureStoreTracker tracker,
213      BiFunction<BitSetNode, Long, Boolean> func) {
214    BitSetNode trackerNode = null;
215    for (BitSetNode node : map.values()) {
216      long minProcId = node.getStart();
217      long maxProcId = node.getEnd();
218      for (long procId = minProcId; procId <= maxProcId; ++procId) {
219        if (!node.isModified(procId)) {
220          continue;
221        }
222
223        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
224        if (func.apply(trackerNode, procId)) {
225          node.delete(procId);
226        }
227      }
228    }
229  }
230
231  /**
232   * For the global tracker, we will use this method to build the holdingCleanupTracker, as the
233   * modified flags will be cleared after rolling so we only need to test the deleted flags.
234   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
235   */
236  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
237    setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) ||
238      node.isDeleted(procId) == DeleteState.YES);
239  }
240
241  /**
242   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
243   * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
244   * then we mark it as deleted.
245   * @see #setDeletedIfModified(long...)
246   */
247  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
248    setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
249  }
250
251  /**
252   * lookup the node containing the specified procId.
253   * @param node cached node to check before doing a lookup
254   * @param procId the procId to lookup
255   * @return the node that may contains the procId or null
256   */
257  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
258    if (node != null && node.contains(procId)) return node;
259    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
260    return entry != null ? entry.getValue() : null;
261  }
262
263  private void trackProcIds(long procId) {
264    minModifiedProcId = Math.min(minModifiedProcId, procId);
265    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
266  }
267
268  public long getModifiedMinProcId() {
269    return minModifiedProcId;
270  }
271
272  public long getModifiedMaxProcId() {
273    return maxModifiedProcId;
274  }
275
276  public void reset() {
277    this.keepDeletes = false;
278    this.partial = false;
279    this.map.clear();
280    minModifiedProcId = Long.MAX_VALUE;
281    maxModifiedProcId = Long.MIN_VALUE;
282  }
283
284  public boolean isModified(long procId) {
285    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
286    return entry != null && entry.getValue().contains(procId) &&
287      entry.getValue().isModified(procId);
288  }
289
290  /**
291   * If {@link #partial} is false, returns state from the bitmap. If no state is found for
292   * {@code procId}, returns YES.
293   * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
294   * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
295   * returns state from the bitmap.
296   */
297  public DeleteState isDeleted(long procId) {
298    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
299    if (entry != null && entry.getValue().contains(procId)) {
300      BitSetNode node = entry.getValue();
301      DeleteState state = node.isDeleted(procId);
302      return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
303    }
304    return partial ? DeleteState.MAYBE : DeleteState.YES;
305  }
306
307  public long getActiveMinProcId() {
308    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
309    return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId();
310  }
311
312  public void setKeepDeletes(boolean keepDeletes) {
313    this.keepDeletes = keepDeletes;
314    // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
315    // procedures).
316    if (!keepDeletes) {
317      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
318      while (it.hasNext()) {
319        Map.Entry<Long, BitSetNode> entry = it.next();
320        if (entry.getValue().isEmpty()) {
321          it.remove();
322        }
323      }
324    }
325  }
326
327  public boolean isPartial() {
328    return partial;
329  }
330
331  public void setPartialFlag(boolean isPartial) {
332    if (this.partial && !isPartial) {
333      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
334        entry.getValue().unsetPartialFlag();
335      }
336    }
337    this.partial = isPartial;
338  }
339
340  /**
341   * @return true, if no procedure is active, else false.
342   */
343  public boolean isEmpty() {
344    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
345      if (!entry.getValue().isEmpty()) {
346        return false;
347      }
348    }
349    return true;
350  }
351
352  /**
353   * @return true if all procedure was modified or deleted since last call to
354   *         {@link #resetModified()}.
355   */
356  public boolean isAllModified() {
357    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
358      if (!entry.getValue().isAllModified()) {
359        return false;
360      }
361    }
362    return true;
363  }
364
365  /**
366   * Will be used when there are too many proc wal files. We will rewrite the states of the active
367   * procedures in the oldest proc wal file so that we can delete it.
368   * @return all the active procedure ids in this tracker.
369   */
370  public long[] getAllActiveProcIds() {
371    return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
372      .flatMapToLong(LongStream::of).toArray();
373  }
374
375  /**
376   * Clears the list of updated procedure ids. This doesn't affect global list of active
377   * procedure ids.
378   */
379  public void resetModified() {
380    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
381      entry.getValue().resetModified();
382    }
383    minModifiedProcId = Long.MAX_VALUE;
384    maxModifiedProcId = Long.MIN_VALUE;
385  }
386
387  private BitSetNode getOrCreateNode(long procId) {
388    // If procId can fit in left node (directly or by growing it)
389    BitSetNode leftNode = null;
390    boolean leftCanGrow = false;
391    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
392    if (leftEntry != null) {
393      leftNode = leftEntry.getValue();
394      if (leftNode.contains(procId)) {
395        return leftNode;
396      }
397      leftCanGrow = leftNode.canGrow(procId);
398    }
399
400    // If procId can fit in right node (directly or by growing it)
401    BitSetNode rightNode = null;
402    boolean rightCanGrow = false;
403    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
404    if (rightEntry != null) {
405      rightNode = rightEntry.getValue();
406      rightCanGrow = rightNode.canGrow(procId);
407      if (leftNode != null) {
408        if (leftNode.canMerge(rightNode)) {
409          // merge left and right node
410          return mergeNodes(leftNode, rightNode);
411        }
412
413        // If left and right nodes can not merge, decide which one to grow.
414        if (leftCanGrow && rightCanGrow) {
415          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
416            return growNode(leftNode, procId);
417          }
418          return growNode(rightNode, procId);
419        }
420      }
421    }
422
423    // grow the left node
424    if (leftCanGrow) {
425      return growNode(leftNode, procId);
426    }
427
428    // grow the right node
429    if (rightCanGrow) {
430      return growNode(rightNode, procId);
431    }
432
433    // add new node if there are no left/right nodes which can be used.
434    BitSetNode node = new BitSetNode(procId, partial);
435    map.put(node.getStart(), node);
436    return node;
437  }
438
439  /**
440   * Grows {@code node} to contain {@code procId} and updates the map.
441   * @return {@link BitSetNode} instance which contains {@code procId}.
442   */
443  private BitSetNode growNode(BitSetNode node, long procId) {
444    map.remove(node.getStart());
445    node.grow(procId);
446    map.put(node.getStart(), node);
447    return node;
448  }
449
450  /**
451   * Merges {@code leftNode} & {@code rightNode} and updates the map.
452   */
453  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
454    assert leftNode.getStart() < rightNode.getStart();
455    leftNode.merge(rightNode);
456    map.remove(rightNode.getStart());
457    return leftNode;
458  }
459
460  public void dump() {
461    System.out.println("map " + map.size());
462    System.out.println("isAllModified " + isAllModified());
463    System.out.println("isEmpty " + isEmpty());
464    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
465      entry.getValue().dump();
466    }
467  }
468
469  // ========================================================================
470  //  Convert to/from Protocol Buffer.
471  // ========================================================================
472
473  /**
474   * Builds
475   * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker
476   * protocol buffer from current state.
477   */
478  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
479    ProcedureProtos.ProcedureStoreTracker.Builder builder =
480        ProcedureProtos.ProcedureStoreTracker.newBuilder();
481    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
482      builder.addNode(entry.getValue().convert());
483    }
484    return builder.build();
485  }
486}