001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.procedure2.store;
020
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Iterator;
024import java.util.Map;
025import java.util.TreeMap;
026import java.util.function.BiFunction;
027import java.util.stream.LongStream;
028import org.apache.hadoop.hbase.procedure2.Procedure;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
034
035/**
036 * Keeps track of live procedures.
037 *
038 * It can be used by the ProcedureStore to identify which procedures are already
039 * deleted/completed to avoid the deserialization step on restart
040 */
041@InterfaceAudience.Private
042public class ProcedureStoreTracker {
043
044  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
045
046  // Key is procedure id corresponding to first bit of the bitmap.
047  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
048
049  /**
050   * If true, do not remove bits corresponding to deleted procedures. Note that this can result
051   * in huge bitmaps overtime.
052   * Currently, it's set to true only when building tracker state from logs during recovery. During
053   * recovery, if we are sure that a procedure has been deleted, reading its old update entries
054   * can be skipped.
055   */
056  private boolean keepDeletes = false;
057  /**
058   * If true, it means tracker has incomplete information about the active/deleted procedures.
059   * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
060   * understand it's real use.
061   */
062  boolean partial = false;
063
064  private long minModifiedProcId = Long.MAX_VALUE;
065  private long maxModifiedProcId = Long.MIN_VALUE;
066
067  public enum DeleteState { YES, NO, MAYBE }
068
069  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
070    reset();
071    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) {
072      final BitSetNode node = new BitSetNode(protoNode);
073      map.put(node.getStart(), node);
074    }
075  }
076
077  /**
078   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
079   */
080  public void resetTo(ProcedureStoreTracker tracker) {
081    resetTo(tracker, false);
082  }
083
084  /**
085   * Resets internal state to same as given {@code tracker}, and change the deleted flag according
086   * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
087   * <p/>
088   * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
089   * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
090   * deleted flag if {@code resetDelete} is true.
091   */
092  public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
093    reset();
094    // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
095    // for all the unmodified bits to 1, the partial flag is useless so set it to false for not
096    // confusing the developers when debugging.
097    this.partial = resetDelete ? false : tracker.partial;
098    this.minModifiedProcId = tracker.minModifiedProcId;
099    this.maxModifiedProcId = tracker.maxModifiedProcId;
100    this.keepDeletes = tracker.keepDeletes;
101    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
102      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
103    }
104  }
105
106  public void insert(long procId) {
107    insert(null, procId);
108  }
109
110  public void insert(long[] procIds) {
111    for (int i = 0; i < procIds.length; ++i) {
112      insert(procIds[i]);
113    }
114  }
115
116  public void insert(long procId, long[] subProcIds) {
117    BitSetNode node = update(null, procId);
118    for (int i = 0; i < subProcIds.length; ++i) {
119      node = insert(node, subProcIds[i]);
120    }
121  }
122
123  private BitSetNode insert(BitSetNode node, long procId) {
124    if (node == null || !node.contains(procId)) {
125      node = getOrCreateNode(procId);
126    }
127    node.insertOrUpdate(procId);
128    trackProcIds(procId);
129    return node;
130  }
131
132  public void update(long procId) {
133    update(null, procId);
134  }
135
136  private BitSetNode update(BitSetNode node, long procId) {
137    node = lookupClosestNode(node, procId);
138    assert node != null : "expected node to update procId=" + procId;
139    assert node.contains(procId) : "expected procId=" + procId + " in the node";
140    node.insertOrUpdate(procId);
141    trackProcIds(procId);
142    return node;
143  }
144
145  public void delete(long procId) {
146    delete(null, procId);
147  }
148
149  public void delete(final long[] procIds) {
150    Arrays.sort(procIds);
151    BitSetNode node = null;
152    for (int i = 0; i < procIds.length; ++i) {
153      node = delete(node, procIds[i]);
154    }
155  }
156
157  private BitSetNode delete(BitSetNode node, long procId) {
158    node = lookupClosestNode(node, procId);
159    if (node == null || !node.contains(procId)) {
160      LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId);
161      return node;
162    }
163    node.delete(procId);
164    if (!keepDeletes && node.isEmpty()) {
165      // TODO: RESET if (map.size() == 1)
166      map.remove(node.getStart());
167    }
168
169    trackProcIds(procId);
170    return node;
171  }
172
173  /**
174   * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
175   */
176  public void setMinMaxModifiedProcIds(long min, long max) {
177    this.minModifiedProcId = min;
178    this.maxModifiedProcId = max;
179  }
180  /**
181   * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
182   * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
183   * this is not true, as we will read the wal files in reverse order so a delete may come first.
184   */
185  public void setDeleted(long procId, boolean isDeleted) {
186    BitSetNode node = getOrCreateNode(procId);
187    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
188    node.updateState(procId, isDeleted);
189    trackProcIds(procId);
190  }
191
192  /**
193   * Set the given bit for the procId to delete if it was modified before.
194   * <p/>
195   * This method is used to test whether a procedure wal file can be safely deleted, as if all the
196   * procedures in the given procedure wal file has been modified in the new procedure wal files,
197   * then we can delete it.
198   */
199  public void setDeletedIfModified(long... procId) {
200    BitSetNode node = null;
201    for (int i = 0; i < procId.length; ++i) {
202      node = lookupClosestNode(node, procId[i]);
203      if (node != null && node.isModified(procId[i])) {
204        node.delete(procId[i]);
205      }
206    }
207  }
208
209  private void setDeleteIf(ProcedureStoreTracker tracker,
210      BiFunction<BitSetNode, Long, Boolean> func) {
211    BitSetNode trackerNode = null;
212    for (BitSetNode node : map.values()) {
213      long minProcId = node.getStart();
214      long maxProcId = node.getEnd();
215      for (long procId = minProcId; procId <= maxProcId; ++procId) {
216        if (!node.isModified(procId)) {
217          continue;
218        }
219
220        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
221        if (func.apply(trackerNode, procId)) {
222          node.delete(procId);
223        }
224      }
225    }
226  }
227
228  /**
229   * For the global tracker, we will use this method to build the holdingCleanupTracker, as the
230   * modified flags will be cleared after rolling so we only need to test the deleted flags.
231   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
232   */
233  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
234    setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) ||
235      node.isDeleted(procId) == DeleteState.YES);
236  }
237
238  /**
239   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
240   * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
241   * then we mark it as deleted.
242   * @see #setDeletedIfModified(long...)
243   */
244  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
245    setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
246  }
247
248  /**
249   * lookup the node containing the specified procId.
250   * @param node cached node to check before doing a lookup
251   * @param procId the procId to lookup
252   * @return the node that may contains the procId or null
253   */
254  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
255    if (node != null && node.contains(procId)) return node;
256    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
257    return entry != null ? entry.getValue() : null;
258  }
259
260  private void trackProcIds(long procId) {
261    minModifiedProcId = Math.min(minModifiedProcId, procId);
262    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
263  }
264
265  public long getModifiedMinProcId() {
266    return minModifiedProcId;
267  }
268
269  public long getModifiedMaxProcId() {
270    return maxModifiedProcId;
271  }
272
273  public void reset() {
274    this.keepDeletes = false;
275    this.partial = false;
276    this.map.clear();
277    minModifiedProcId = Long.MAX_VALUE;
278    maxModifiedProcId = Long.MIN_VALUE;
279  }
280
281  public boolean isModified(long procId) {
282    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
283    return entry != null && entry.getValue().contains(procId) &&
284      entry.getValue().isModified(procId);
285  }
286
287  /**
288   * If {@link #partial} is false, returns state from the bitmap. If no state is found for
289   * {@code procId}, returns YES.
290   * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
291   * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
292   * returns state from the bitmap.
293   */
294  public DeleteState isDeleted(long procId) {
295    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
296    if (entry != null && entry.getValue().contains(procId)) {
297      BitSetNode node = entry.getValue();
298      DeleteState state = node.isDeleted(procId);
299      return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
300    }
301    return partial ? DeleteState.MAYBE : DeleteState.YES;
302  }
303
304  public long getActiveMinProcId() {
305    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
306    return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId();
307  }
308
309  public void setKeepDeletes(boolean keepDeletes) {
310    this.keepDeletes = keepDeletes;
311    // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
312    // procedures).
313    if (!keepDeletes) {
314      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
315      while (it.hasNext()) {
316        Map.Entry<Long, BitSetNode> entry = it.next();
317        if (entry.getValue().isEmpty()) {
318          it.remove();
319        }
320      }
321    }
322  }
323
324  public boolean isPartial() {
325    return partial;
326  }
327
328  public void setPartialFlag(boolean isPartial) {
329    if (this.partial && !isPartial) {
330      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
331        entry.getValue().unsetPartialFlag();
332      }
333    }
334    this.partial = isPartial;
335  }
336
337  /**
338   * @return true, if no procedure is active, else false.
339   */
340  public boolean isEmpty() {
341    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
342      if (!entry.getValue().isEmpty()) {
343        return false;
344      }
345    }
346    return true;
347  }
348
349  /**
350   * @return true if all procedure was modified or deleted since last call to
351   *         {@link #resetModified()}.
352   */
353  public boolean isAllModified() {
354    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
355      if (!entry.getValue().isAllModified()) {
356        return false;
357      }
358    }
359    return true;
360  }
361
362  /**
363   * Will be used when there are too many proc wal files. We will rewrite the states of the active
364   * procedures in the oldest proc wal file so that we can delete it.
365   * @return all the active procedure ids in this tracker.
366   */
367  public long[] getAllActiveProcIds() {
368    return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
369      .flatMapToLong(LongStream::of).toArray();
370  }
371
372  /**
373   * Clears the list of updated procedure ids. This doesn't affect global list of active
374   * procedure ids.
375   */
376  public void resetModified() {
377    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
378      entry.getValue().resetModified();
379    }
380    minModifiedProcId = Long.MAX_VALUE;
381    maxModifiedProcId = Long.MIN_VALUE;
382  }
383
384  private BitSetNode getOrCreateNode(long procId) {
385    // If procId can fit in left node (directly or by growing it)
386    BitSetNode leftNode = null;
387    boolean leftCanGrow = false;
388    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
389    if (leftEntry != null) {
390      leftNode = leftEntry.getValue();
391      if (leftNode.contains(procId)) {
392        return leftNode;
393      }
394      leftCanGrow = leftNode.canGrow(procId);
395    }
396
397    // If procId can fit in right node (directly or by growing it)
398    BitSetNode rightNode = null;
399    boolean rightCanGrow = false;
400    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
401    if (rightEntry != null) {
402      rightNode = rightEntry.getValue();
403      rightCanGrow = rightNode.canGrow(procId);
404      if (leftNode != null) {
405        if (leftNode.canMerge(rightNode)) {
406          // merge left and right node
407          return mergeNodes(leftNode, rightNode);
408        }
409
410        // If left and right nodes can not merge, decide which one to grow.
411        if (leftCanGrow && rightCanGrow) {
412          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
413            return growNode(leftNode, procId);
414          }
415          return growNode(rightNode, procId);
416        }
417      }
418    }
419
420    // grow the left node
421    if (leftCanGrow) {
422      return growNode(leftNode, procId);
423    }
424
425    // grow the right node
426    if (rightCanGrow) {
427      return growNode(rightNode, procId);
428    }
429
430    // add new node if there are no left/right nodes which can be used.
431    BitSetNode node = new BitSetNode(procId, partial);
432    map.put(node.getStart(), node);
433    return node;
434  }
435
436  /**
437   * Grows {@code node} to contain {@code procId} and updates the map.
438   * @return {@link BitSetNode} instance which contains {@code procId}.
439   */
440  private BitSetNode growNode(BitSetNode node, long procId) {
441    map.remove(node.getStart());
442    node.grow(procId);
443    map.put(node.getStart(), node);
444    return node;
445  }
446
447  /**
448   * Merges {@code leftNode} & {@code rightNode} and updates the map.
449   */
450  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
451    assert leftNode.getStart() < rightNode.getStart();
452    leftNode.merge(rightNode);
453    map.remove(rightNode.getStart());
454    return leftNode;
455  }
456
457  public void dump() {
458    System.out.println("map " + map.size());
459    System.out.println("isAllModified " + isAllModified());
460    System.out.println("isEmpty " + isEmpty());
461    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
462      entry.getValue().dump();
463    }
464  }
465
466  // ========================================================================
467  //  Convert to/from Protocol Buffer.
468  // ========================================================================
469
470  /**
471   * Builds
472   * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker
473   * protocol buffer from current state.
474   */
475  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
476    ProcedureProtos.ProcedureStoreTracker.Builder builder =
477        ProcedureProtos.ProcedureStoreTracker.newBuilder();
478    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
479      builder.addNode(entry.getValue().convert());
480    }
481    return builder.build();
482  }
483}