001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.TreeMap;
025import java.util.function.BiFunction;
026import java.util.stream.LongStream;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
033
034/**
035 * Keeps track of live procedures.
036 *
037 * It can be used by the ProcedureStore to identify which procedures are already
038 * deleted/completed to avoid the deserialization step on restart
039 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
040 *             use the new region based procedure store.
041 */
042@Deprecated
043@InterfaceAudience.Private
044class ProcedureStoreTracker {
045  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
046
047  // Key is procedure id corresponding to first bit of the bitmap.
048  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
049
050  /**
051   * If true, do not remove bits corresponding to deleted procedures. Note that this can result
052   * in huge bitmaps overtime.
053   * Currently, it's set to true only when building tracker state from logs during recovery. During
054   * recovery, if we are sure that a procedure has been deleted, reading its old update entries
055   * can be skipped.
056   */
057  private boolean keepDeletes = false;
058  /**
059   * If true, it means tracker has incomplete information about the active/deleted procedures.
060   * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to
061   * understand it's real use.
062   */
063  boolean partial = false;
064
065  private long minModifiedProcId = Long.MAX_VALUE;
066  private long maxModifiedProcId = Long.MIN_VALUE;
067
068  public enum DeleteState { YES, NO, MAYBE }
069
070  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
071    reset();
072    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode :
073            trackerProtoBuf.getNodeList()) {
074      final BitSetNode node = new BitSetNode(protoNode);
075      map.put(node.getStart(), node);
076    }
077  }
078
079  /**
080   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
081   */
082  public void resetTo(ProcedureStoreTracker tracker) {
083    resetTo(tracker, false);
084  }
085
086  /**
087   * Resets internal state to same as given {@code tracker}, and change the deleted flag according
088   * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
089   * <p/>
090   * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
091   * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
092   * deleted flag if {@code resetDelete} is true.
093   */
094  public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
095    reset();
096    // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
097    // for all the unmodified bits to 1, the partial flag is useless so set it to false for not
098    // confusing the developers when debugging.
099    this.partial = resetDelete ? false : tracker.partial;
100    this.minModifiedProcId = tracker.minModifiedProcId;
101    this.maxModifiedProcId = tracker.maxModifiedProcId;
102    this.keepDeletes = tracker.keepDeletes;
103    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
104      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
105    }
106  }
107
108  public void insert(long procId) {
109    insert(null, procId);
110  }
111
112  public void insert(long[] procIds) {
113    for (int i = 0; i < procIds.length; ++i) {
114      insert(procIds[i]);
115    }
116  }
117
118  public void insert(long procId, long[] subProcIds) {
119    BitSetNode node = update(null, procId);
120    for (int i = 0; i < subProcIds.length; ++i) {
121      node = insert(node, subProcIds[i]);
122    }
123  }
124
125  private BitSetNode insert(BitSetNode node, long procId) {
126    if (node == null || !node.contains(procId)) {
127      node = getOrCreateNode(procId);
128    }
129    node.insertOrUpdate(procId);
130    trackProcIds(procId);
131    return node;
132  }
133
134  public void update(long procId) {
135    update(null, procId);
136  }
137
138  private BitSetNode update(BitSetNode node, long procId) {
139    node = lookupClosestNode(node, procId);
140    assert node != null : "expected node to update procId=" + procId;
141    assert node.contains(procId) : "expected procId=" + procId + " in the node";
142    if (node == null) {
143      throw new NullPointerException("pid=" + procId);
144    }
145    node.insertOrUpdate(procId);
146    trackProcIds(procId);
147    return node;
148  }
149
150  public void delete(long procId) {
151    delete(null, procId);
152  }
153
154  public void delete(final long[] procIds) {
155    Arrays.sort(procIds);
156    BitSetNode node = null;
157    for (int i = 0; i < procIds.length; ++i) {
158      node = delete(node, procIds[i]);
159    }
160  }
161
162  private BitSetNode delete(BitSetNode node, long procId) {
163    node = lookupClosestNode(node, procId);
164    if (node == null || !node.contains(procId)) {
165      LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId);
166      return node;
167    }
168    node.delete(procId);
169    if (!keepDeletes && node.isEmpty()) {
170      // TODO: RESET if (map.size() == 1)
171      map.remove(node.getStart());
172    }
173
174    trackProcIds(procId);
175    return node;
176  }
177
178  /**
179   * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
180   */
181  public void setMinMaxModifiedProcIds(long min, long max) {
182    this.minModifiedProcId = min;
183    this.maxModifiedProcId = max;
184  }
185  /**
186   * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
187   * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
188   * this is not true, as we will read the wal files in reverse order so a delete may come first.
189   */
190  public void setDeleted(long procId, boolean isDeleted) {
191    BitSetNode node = getOrCreateNode(procId);
192    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
193    node.updateState(procId, isDeleted);
194    trackProcIds(procId);
195  }
196
197  /**
198   * Set the given bit for the procId to delete if it was modified before.
199   * <p/>
200   * This method is used to test whether a procedure wal file can be safely deleted, as if all the
201   * procedures in the given procedure wal file has been modified in the new procedure wal files,
202   * then we can delete it.
203   */
204  public void setDeletedIfModified(long... procId) {
205    BitSetNode node = null;
206    for (int i = 0; i < procId.length; ++i) {
207      node = lookupClosestNode(node, procId[i]);
208      if (node != null && node.isModified(procId[i])) {
209        node.delete(procId[i]);
210      }
211    }
212  }
213
214  private void setDeleteIf(ProcedureStoreTracker tracker,
215      BiFunction<BitSetNode, Long, Boolean> func) {
216    BitSetNode trackerNode = null;
217    for (BitSetNode node : map.values()) {
218      long minProcId = node.getStart();
219      long maxProcId = node.getEnd();
220      for (long procId = minProcId; procId <= maxProcId; ++procId) {
221        if (!node.isModified(procId)) {
222          continue;
223        }
224
225        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
226        if (func.apply(trackerNode, procId)) {
227          node.delete(procId);
228        }
229      }
230    }
231  }
232
233  /**
234   * For the global tracker, we will use this method to build the holdingCleanupTracker, as the
235   * modified flags will be cleared after rolling so we only need to test the deleted flags.
236   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
237   */
238  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
239    setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) ||
240      node.isDeleted(procId) == DeleteState.YES);
241  }
242
243  /**
244   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
245   * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
246   * then we mark it as deleted.
247   * @see #setDeletedIfModified(long...)
248   */
249  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
250    setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
251  }
252
253  /**
254   * lookup the node containing the specified procId.
255   * @param node cached node to check before doing a lookup
256   * @param procId the procId to lookup
257   * @return the node that may contains the procId or null
258   */
259  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
260    if (node != null && node.contains(procId)) {
261      return node;
262    }
263
264    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
265    return entry != null ? entry.getValue() : null;
266  }
267
268  private void trackProcIds(long procId) {
269    minModifiedProcId = Math.min(minModifiedProcId, procId);
270    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
271  }
272
273  public long getModifiedMinProcId() {
274    return minModifiedProcId;
275  }
276
277  public long getModifiedMaxProcId() {
278    return maxModifiedProcId;
279  }
280
281  public void reset() {
282    this.keepDeletes = false;
283    this.partial = false;
284    this.map.clear();
285    minModifiedProcId = Long.MAX_VALUE;
286    maxModifiedProcId = Long.MIN_VALUE;
287  }
288
289  public boolean isModified(long procId) {
290    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
291    return entry != null && entry.getValue().contains(procId) &&
292      entry.getValue().isModified(procId);
293  }
294
295  /**
296   * If {@link #partial} is false, returns state from the bitmap. If no state is found for
297   * {@code procId}, returns YES.
298   * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE
299   * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise,
300   * returns state from the bitmap.
301   */
302  public DeleteState isDeleted(long procId) {
303    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
304    if (entry != null && entry.getValue().contains(procId)) {
305      BitSetNode node = entry.getValue();
306      DeleteState state = node.isDeleted(procId);
307      return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
308    }
309    return partial ? DeleteState.MAYBE : DeleteState.YES;
310  }
311
312  public long getActiveMinProcId() {
313    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
314    return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId();
315  }
316
317  public void setKeepDeletes(boolean keepDeletes) {
318    this.keepDeletes = keepDeletes;
319    // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
320    // procedures).
321    if (!keepDeletes) {
322      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
323      while (it.hasNext()) {
324        Map.Entry<Long, BitSetNode> entry = it.next();
325        if (entry.getValue().isEmpty()) {
326          it.remove();
327        }
328      }
329    }
330  }
331
332  public boolean isPartial() {
333    return partial;
334  }
335
336  public void setPartialFlag(boolean isPartial) {
337    if (this.partial && !isPartial) {
338      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
339        entry.getValue().unsetPartialFlag();
340      }
341    }
342    this.partial = isPartial;
343  }
344
345  /**
346   * @return true, if no procedure is active, else false.
347   */
348  public boolean isEmpty() {
349    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
350      if (!entry.getValue().isEmpty()) {
351        return false;
352      }
353    }
354    return true;
355  }
356
357  /**
358   * @return true if all procedure was modified or deleted since last call to
359   *         {@link #resetModified()}.
360   */
361  public boolean isAllModified() {
362    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
363      if (!entry.getValue().isAllModified()) {
364        return false;
365      }
366    }
367    return true;
368  }
369
370  /**
371   * Will be used when there are too many proc wal files. We will rewrite the states of the active
372   * procedures in the oldest proc wal file so that we can delete it.
373   * @return all the active procedure ids in this tracker.
374   */
375  public long[] getAllActiveProcIds() {
376    return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
377      .flatMapToLong(LongStream::of).toArray();
378  }
379
380  /**
381   * Clears the list of updated procedure ids. This doesn't affect global list of active
382   * procedure ids.
383   */
384  public void resetModified() {
385    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
386      entry.getValue().resetModified();
387    }
388    minModifiedProcId = Long.MAX_VALUE;
389    maxModifiedProcId = Long.MIN_VALUE;
390  }
391
392  private BitSetNode getOrCreateNode(long procId) {
393    // If procId can fit in left node (directly or by growing it)
394    BitSetNode leftNode = null;
395    boolean leftCanGrow = false;
396    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
397    if (leftEntry != null) {
398      leftNode = leftEntry.getValue();
399      if (leftNode.contains(procId)) {
400        return leftNode;
401      }
402      leftCanGrow = leftNode.canGrow(procId);
403    }
404
405    // If procId can fit in right node (directly or by growing it)
406    BitSetNode rightNode = null;
407    boolean rightCanGrow = false;
408    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
409    if (rightEntry != null) {
410      rightNode = rightEntry.getValue();
411      rightCanGrow = rightNode.canGrow(procId);
412      if (leftNode != null) {
413        if (leftNode.canMerge(rightNode)) {
414          // merge left and right node
415          return mergeNodes(leftNode, rightNode);
416        }
417
418        // If left and right nodes can not merge, decide which one to grow.
419        if (leftCanGrow && rightCanGrow) {
420          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
421            return growNode(leftNode, procId);
422          }
423          return growNode(rightNode, procId);
424        }
425      }
426    }
427
428    // grow the left node
429    if (leftCanGrow) {
430      return growNode(leftNode, procId);
431    }
432
433    // grow the right node
434    if (rightCanGrow) {
435      return growNode(rightNode, procId);
436    }
437
438    // add new node if there are no left/right nodes which can be used.
439    BitSetNode node = new BitSetNode(procId, partial);
440    map.put(node.getStart(), node);
441    return node;
442  }
443
444  /**
445   * Grows {@code node} to contain {@code procId} and updates the map.
446   * @return {@link BitSetNode} instance which contains {@code procId}.
447   */
448  private BitSetNode growNode(BitSetNode node, long procId) {
449    map.remove(node.getStart());
450    node.grow(procId);
451    map.put(node.getStart(), node);
452    return node;
453  }
454
455  /**
456   * Merges {@code leftNode} & {@code rightNode} and updates the map.
457   */
458  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
459    assert leftNode.getStart() < rightNode.getStart();
460    leftNode.merge(rightNode);
461    map.remove(rightNode.getStart());
462    return leftNode;
463  }
464
465  public void dump() {
466    System.out.println("map " + map.size());
467    System.out.println("isAllModified " + isAllModified());
468    System.out.println("isEmpty " + isEmpty());
469    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
470      entry.getValue().dump();
471    }
472  }
473
474  // ========================================================================
475  //  Convert to/from Protocol Buffer.
476  // ========================================================================
477
478  /**
479   * Builds
480   * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker
481   * protocol buffer from current state.
482   */
483  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
484    ProcedureProtos.ProcedureStoreTracker.Builder builder =
485        ProcedureProtos.ProcedureStoreTracker.newBuilder();
486    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
487      builder.addNode(entry.getValue().convert());
488    }
489    return builder.build();
490  }
491}