001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.TreeMap;
025import java.util.function.BiFunction;
026import java.util.stream.LongStream;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
033
034/**
035 * Keeps track of live procedures.
036 *
037 * It can be used by the ProcedureStore to identify which procedures are already
038 * deleted/completed to avoid the deserialization step on restart
039 */
040@InterfaceAudience.Private
041public class ProcedureStoreTracker {
042  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
043
044  // Key is procedure id corresponding to first bit of the bitmap.
045  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
046
047  /**
048   * If true, do not remove bits corresponding to deleted procedures. Note that this can result
049   * in huge bitmaps overtime.
050   * Currently, it's set to true only when building tracker state from logs during recovery. During
051   * recovery, if we are sure that a procedure has been deleted, reading its old update entries
052   * can be skipped.
053   */
054  private boolean keepDeletes = false;
055  /**
056   * If true, it means tracker has incomplete information about the active/deleted procedures.
057   * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
058   * understand it's real use.
059   */
060  boolean partial = false;
061
062  private long minModifiedProcId = Long.MAX_VALUE;
063  private long maxModifiedProcId = Long.MIN_VALUE;
064
065  public enum DeleteState { YES, NO, MAYBE }
066
067  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
068    reset();
069    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode :
070            trackerProtoBuf.getNodeList()) {
071      final BitSetNode node = new BitSetNode(protoNode);
072      map.put(node.getStart(), node);
073    }
074  }
075
076  /**
077   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
078   */
079  public void resetTo(ProcedureStoreTracker tracker) {
080    resetTo(tracker, false);
081  }
082
083  /**
084   * Resets internal state to same as given {@code tracker}, and change the deleted flag according
085   * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
086   * <p/>
087   * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
088   * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
089   * deleted flag if {@code resetDelete} is true.
090   */
091  public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
092    reset();
093    // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
094    // for all the unmodified bits to 1, the partial flag is useless so set it to false for not
095    // confusing the developers when debugging.
096    this.partial = resetDelete ? false : tracker.partial;
097    this.minModifiedProcId = tracker.minModifiedProcId;
098    this.maxModifiedProcId = tracker.maxModifiedProcId;
099    this.keepDeletes = tracker.keepDeletes;
100    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
101      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
102    }
103  }
104
105  public void insert(long procId) {
106    insert(null, procId);
107  }
108
109  public void insert(long[] procIds) {
110    for (int i = 0; i < procIds.length; ++i) {
111      insert(procIds[i]);
112    }
113  }
114
115  public void insert(long procId, long[] subProcIds) {
116    BitSetNode node = update(null, procId);
117    for (int i = 0; i < subProcIds.length; ++i) {
118      node = insert(node, subProcIds[i]);
119    }
120  }
121
122  private BitSetNode insert(BitSetNode node, long procId) {
123    if (node == null || !node.contains(procId)) {
124      node = getOrCreateNode(procId);
125    }
126    node.insertOrUpdate(procId);
127    trackProcIds(procId);
128    return node;
129  }
130
131  public void update(long procId) {
132    update(null, procId);
133  }
134
135  private BitSetNode update(BitSetNode node, long procId) {
136    node = lookupClosestNode(node, procId);
137    assert node != null : "expected node to update procId=" + procId;
138    assert node.contains(procId) : "expected procId=" + procId + " in the node";
139    if (node == null) {
140      throw new NullPointerException("pid=" + procId);
141    }
142    node.insertOrUpdate(procId);
143    trackProcIds(procId);
144    return node;
145  }
146
147  public void delete(long procId) {
148    delete(null, procId);
149  }
150
151  public void delete(final long[] procIds) {
152    Arrays.sort(procIds);
153    BitSetNode node = null;
154    for (int i = 0; i < procIds.length; ++i) {
155      node = delete(node, procIds[i]);
156    }
157  }
158
159  private BitSetNode delete(BitSetNode node, long procId) {
160    node = lookupClosestNode(node, procId);
161    if (node == null || !node.contains(procId)) {
162      LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId);
163      return node;
164    }
165    node.delete(procId);
166    if (!keepDeletes && node.isEmpty()) {
167      // TODO: RESET if (map.size() == 1)
168      map.remove(node.getStart());
169    }
170
171    trackProcIds(procId);
172    return node;
173  }
174
175  /**
176   * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
177   */
178  public void setMinMaxModifiedProcIds(long min, long max) {
179    this.minModifiedProcId = min;
180    this.maxModifiedProcId = max;
181  }
182  /**
183   * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
184   * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
185   * this is not true, as we will read the wal files in reverse order so a delete may come first.
186   */
187  public void setDeleted(long procId, boolean isDeleted) {
188    BitSetNode node = getOrCreateNode(procId);
189    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
190    node.updateState(procId, isDeleted);
191    trackProcIds(procId);
192  }
193
194  /**
195   * Set the given bit for the procId to delete if it was modified before.
196   * <p/>
197   * This method is used to test whether a procedure wal file can be safely deleted, as if all the
198   * procedures in the given procedure wal file has been modified in the new procedure wal files,
199   * then we can delete it.
200   */
201  public void setDeletedIfModified(long... procId) {
202    BitSetNode node = null;
203    for (int i = 0; i < procId.length; ++i) {
204      node = lookupClosestNode(node, procId[i]);
205      if (node != null && node.isModified(procId[i])) {
206        node.delete(procId[i]);
207      }
208    }
209  }
210
211  private void setDeleteIf(ProcedureStoreTracker tracker,
212      BiFunction<BitSetNode, Long, Boolean> func) {
213    BitSetNode trackerNode = null;
214    for (BitSetNode node : map.values()) {
215      long minProcId = node.getStart();
216      long maxProcId = node.getEnd();
217      for (long procId = minProcId; procId <= maxProcId; ++procId) {
218        if (!node.isModified(procId)) {
219          continue;
220        }
221
222        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
223        if (func.apply(trackerNode, procId)) {
224          node.delete(procId);
225        }
226      }
227    }
228  }
229
230  /**
231   * For the global tracker, we will use this method to build the holdingCleanupTracker, as the
232   * modified flags will be cleared after rolling so we only need to test the deleted flags.
233   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
234   */
235  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
236    setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) ||
237      node.isDeleted(procId) == DeleteState.YES);
238  }
239
240  /**
241   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
242   * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
243   * then we mark it as deleted.
244   * @see #setDeletedIfModified(long...)
245   */
246  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
247    setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
248  }
249
250  /**
251   * lookup the node containing the specified procId.
252   * @param node cached node to check before doing a lookup
253   * @param procId the procId to lookup
254   * @return the node that may contains the procId or null
255   */
256  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
257    if (node != null && node.contains(procId)) {
258      return node;
259    }
260
261    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
262    return entry != null ? entry.getValue() : null;
263  }
264
265  private void trackProcIds(long procId) {
266    minModifiedProcId = Math.min(minModifiedProcId, procId);
267    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
268  }
269
270  public long getModifiedMinProcId() {
271    return minModifiedProcId;
272  }
273
274  public long getModifiedMaxProcId() {
275    return maxModifiedProcId;
276  }
277
278  public void reset() {
279    this.keepDeletes = false;
280    this.partial = false;
281    this.map.clear();
282    minModifiedProcId = Long.MAX_VALUE;
283    maxModifiedProcId = Long.MIN_VALUE;
284  }
285
286  public boolean isModified(long procId) {
287    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
288    return entry != null && entry.getValue().contains(procId) &&
289      entry.getValue().isModified(procId);
290  }
291
292  /**
293   * If {@link #partial} is false, returns state from the bitmap. If no state is found for
294   * {@code procId}, returns YES.
295   * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
296   * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
297   * returns state from the bitmap.
298   */
299  public DeleteState isDeleted(long procId) {
300    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
301    if (entry != null && entry.getValue().contains(procId)) {
302      BitSetNode node = entry.getValue();
303      DeleteState state = node.isDeleted(procId);
304      return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
305    }
306    return partial ? DeleteState.MAYBE : DeleteState.YES;
307  }
308
309  public long getActiveMinProcId() {
310    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
311    return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId();
312  }
313
314  public void setKeepDeletes(boolean keepDeletes) {
315    this.keepDeletes = keepDeletes;
316    // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
317    // procedures).
318    if (!keepDeletes) {
319      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
320      while (it.hasNext()) {
321        Map.Entry<Long, BitSetNode> entry = it.next();
322        if (entry.getValue().isEmpty()) {
323          it.remove();
324        }
325      }
326    }
327  }
328
329  public boolean isPartial() {
330    return partial;
331  }
332
333  public void setPartialFlag(boolean isPartial) {
334    if (this.partial && !isPartial) {
335      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
336        entry.getValue().unsetPartialFlag();
337      }
338    }
339    this.partial = isPartial;
340  }
341
342  /**
343   * @return true, if no procedure is active, else false.
344   */
345  public boolean isEmpty() {
346    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
347      if (!entry.getValue().isEmpty()) {
348        return false;
349      }
350    }
351    return true;
352  }
353
354  /**
355   * @return true if all procedure was modified or deleted since last call to
356   *         {@link #resetModified()}.
357   */
358  public boolean isAllModified() {
359    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
360      if (!entry.getValue().isAllModified()) {
361        return false;
362      }
363    }
364    return true;
365  }
366
367  /**
368   * Will be used when there are too many proc wal files. We will rewrite the states of the active
369   * procedures in the oldest proc wal file so that we can delete it.
370   * @return all the active procedure ids in this tracker.
371   */
372  public long[] getAllActiveProcIds() {
373    return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
374      .flatMapToLong(LongStream::of).toArray();
375  }
376
377  /**
378   * Clears the list of updated procedure ids. This doesn't affect global list of active
379   * procedure ids.
380   */
381  public void resetModified() {
382    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
383      entry.getValue().resetModified();
384    }
385    minModifiedProcId = Long.MAX_VALUE;
386    maxModifiedProcId = Long.MIN_VALUE;
387  }
388
389  private BitSetNode getOrCreateNode(long procId) {
390    // If procId can fit in left node (directly or by growing it)
391    BitSetNode leftNode = null;
392    boolean leftCanGrow = false;
393    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
394    if (leftEntry != null) {
395      leftNode = leftEntry.getValue();
396      if (leftNode.contains(procId)) {
397        return leftNode;
398      }
399      leftCanGrow = leftNode.canGrow(procId);
400    }
401
402    // If procId can fit in right node (directly or by growing it)
403    BitSetNode rightNode = null;
404    boolean rightCanGrow = false;
405    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
406    if (rightEntry != null) {
407      rightNode = rightEntry.getValue();
408      rightCanGrow = rightNode.canGrow(procId);
409      if (leftNode != null) {
410        if (leftNode.canMerge(rightNode)) {
411          // merge left and right node
412          return mergeNodes(leftNode, rightNode);
413        }
414
415        // If left and right nodes can not merge, decide which one to grow.
416        if (leftCanGrow && rightCanGrow) {
417          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
418            return growNode(leftNode, procId);
419          }
420          return growNode(rightNode, procId);
421        }
422      }
423    }
424
425    // grow the left node
426    if (leftCanGrow) {
427      return growNode(leftNode, procId);
428    }
429
430    // grow the right node
431    if (rightCanGrow) {
432      return growNode(rightNode, procId);
433    }
434
435    // add new node if there are no left/right nodes which can be used.
436    BitSetNode node = new BitSetNode(procId, partial);
437    map.put(node.getStart(), node);
438    return node;
439  }
440
441  /**
442   * Grows {@code node} to contain {@code procId} and updates the map.
443   * @return {@link BitSetNode} instance which contains {@code procId}.
444   */
445  private BitSetNode growNode(BitSetNode node, long procId) {
446    map.remove(node.getStart());
447    node.grow(procId);
448    map.put(node.getStart(), node);
449    return node;
450  }
451
452  /**
453   * Merges {@code leftNode} & {@code rightNode} and updates the map.
454   */
455  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
456    assert leftNode.getStart() < rightNode.getStart();
457    leftNode.merge(rightNode);
458    map.remove(rightNode.getStart());
459    return leftNode;
460  }
461
462  public void dump() {
463    System.out.println("map " + map.size());
464    System.out.println("isAllModified " + isAllModified());
465    System.out.println("isEmpty " + isEmpty());
466    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
467      entry.getValue().dump();
468    }
469  }
470
471  // ========================================================================
472  //  Convert to/from Protocol Buffer.
473  // ========================================================================
474
475  /**
476   * Builds
477   * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker
478   * protocol buffer from current state.
479   */
480  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
481    ProcedureProtos.ProcedureStoreTracker.Builder builder =
482        ProcedureProtos.ProcedureStoreTracker.newBuilder();
483    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
484      builder.addNode(entry.getValue().convert());
485    }
486    return builder.build();
487  }
488}