001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import java.util.Arrays;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.TreeMap;
025import java.util.function.BiFunction;
026import java.util.stream.LongStream;
027import org.apache.hadoop.hbase.procedure2.Procedure;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031
032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
033
034/**
035 * Keeps track of live procedures. It can be used by the ProcedureStore to identify which procedures
036 * are already deleted/completed to avoid the deserialization step on restart
037 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
038 *             use the new region based procedure store.
039 */
040@Deprecated
041@InterfaceAudience.Private
042class ProcedureStoreTracker {
043  private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class);
044
045  // Key is procedure id corresponding to first bit of the bitmap.
046  private final TreeMap<Long, BitSetNode> map = new TreeMap<>();
047
048  /**
049   * If true, do not remove bits corresponding to deleted procedures. Note that this can result in
050   * huge bitmaps overtime. Currently, it's set to true only when building tracker state from logs
051   * during recovery. During recovery, if we are sure that a procedure has been deleted, reading its
052   * old update entries can be skipped.
053   */
054  private boolean keepDeletes = false;
055  /**
056   * If true, it means tracker has incomplete information about the active/deleted procedures. It's
057   * set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to understand
058   * it's real use.
059   */
060  boolean partial = false;
061
062  private long minModifiedProcId = Long.MAX_VALUE;
063  private long maxModifiedProcId = Long.MIN_VALUE;
064
065  public enum DeleteState {
066    YES,
067    NO,
068    MAYBE
069  }
070
071  public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) {
072    reset();
073    for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode : trackerProtoBuf
074      .getNodeList()) {
075      final BitSetNode node = new BitSetNode(protoNode);
076      map.put(node.getStart(), node);
077    }
078  }
079
080  /**
081   * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap.
082   */
083  public void resetTo(ProcedureStoreTracker tracker) {
084    resetTo(tracker, false);
085  }
086
087  /**
088   * Resets internal state to same as given {@code tracker}, and change the deleted flag according
089   * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap.
090   * <p/>
091   * The {@code resetDelete} will be set to true when building cleanup tracker, please see the
092   * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the
093   * deleted flag if {@code resetDelete} is true.
094   */
095  public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
096    reset();
097    // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags
098    // for all the unmodified bits to 1, the partial flag is useless so set it to false for not
099    // confusing the developers when debugging.
100    this.partial = resetDelete ? false : tracker.partial;
101    this.minModifiedProcId = tracker.minModifiedProcId;
102    this.maxModifiedProcId = tracker.maxModifiedProcId;
103    this.keepDeletes = tracker.keepDeletes;
104    for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) {
105      map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete));
106    }
107  }
108
109  public void insert(long procId) {
110    insert(null, procId);
111  }
112
113  public void insert(long[] procIds) {
114    for (int i = 0; i < procIds.length; ++i) {
115      insert(procIds[i]);
116    }
117  }
118
119  public void insert(long procId, long[] subProcIds) {
120    BitSetNode node = update(null, procId);
121    for (int i = 0; i < subProcIds.length; ++i) {
122      node = insert(node, subProcIds[i]);
123    }
124  }
125
126  private BitSetNode insert(BitSetNode node, long procId) {
127    if (node == null || !node.contains(procId)) {
128      node = getOrCreateNode(procId);
129    }
130    node.insertOrUpdate(procId);
131    trackProcIds(procId);
132    return node;
133  }
134
135  public void update(long procId) {
136    update(null, procId);
137  }
138
139  private BitSetNode update(BitSetNode node, long procId) {
140    node = lookupClosestNode(node, procId);
141    assert node != null : "expected node to update procId=" + procId;
142    assert node.contains(procId) : "expected procId=" + procId + " in the node";
143    if (node == null) {
144      throw new NullPointerException("pid=" + procId);
145    }
146    node.insertOrUpdate(procId);
147    trackProcIds(procId);
148    return node;
149  }
150
151  public void delete(long procId) {
152    delete(null, procId);
153  }
154
155  public void delete(final long[] procIds) {
156    Arrays.sort(procIds);
157    BitSetNode node = null;
158    for (int i = 0; i < procIds.length; ++i) {
159      node = delete(node, procIds[i]);
160    }
161  }
162
163  private BitSetNode delete(BitSetNode node, long procId) {
164    node = lookupClosestNode(node, procId);
165    if (node == null || !node.contains(procId)) {
166      LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId);
167      return node;
168    }
169    node.delete(procId);
170    if (!keepDeletes && node.isEmpty()) {
171      // TODO: RESET if (map.size() == 1)
172      map.remove(node.getStart());
173    }
174
175    trackProcIds(procId);
176    return node;
177  }
178
179  /**
180   * Will be called when restarting where we need to rebuild the ProcedureStoreTracker.
181   */
182  public void setMinMaxModifiedProcIds(long min, long max) {
183    this.minModifiedProcId = min;
184    this.maxModifiedProcId = max;
185  }
186
187  /**
188   * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The
189   * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart
190   * this is not true, as we will read the wal files in reverse order so a delete may come first.
191   */
192  public void setDeleted(long procId, boolean isDeleted) {
193    BitSetNode node = getOrCreateNode(procId);
194    assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node;
195    node.updateState(procId, isDeleted);
196    trackProcIds(procId);
197  }
198
199  /**
200   * Set the given bit for the procId to delete if it was modified before.
201   * <p/>
202   * This method is used to test whether a procedure wal file can be safely deleted, as if all the
203   * procedures in the given procedure wal file has been modified in the new procedure wal files,
204   * then we can delete it.
205   */
206  public void setDeletedIfModified(long... procId) {
207    BitSetNode node = null;
208    for (int i = 0; i < procId.length; ++i) {
209      node = lookupClosestNode(node, procId[i]);
210      if (node != null && node.isModified(procId[i])) {
211        node.delete(procId[i]);
212      }
213    }
214  }
215
216  private void setDeleteIf(ProcedureStoreTracker tracker,
217    BiFunction<BitSetNode, Long, Boolean> func) {
218    BitSetNode trackerNode = null;
219    for (BitSetNode node : map.values()) {
220      long minProcId = node.getStart();
221      long maxProcId = node.getEnd();
222      for (long procId = minProcId; procId <= maxProcId; ++procId) {
223        if (!node.isModified(procId)) {
224          continue;
225        }
226
227        trackerNode = tracker.lookupClosestNode(trackerNode, procId);
228        if (func.apply(trackerNode, procId)) {
229          node.delete(procId);
230        }
231      }
232    }
233  }
234
235  /**
236   * For the global tracker, we will use this method to build the holdingCleanupTracker, as the
237   * modified flags will be cleared after rolling so we only need to test the deleted flags.
238   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
239   */
240  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
241    setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId)
242      || node.isDeleted(procId) == DeleteState.YES);
243  }
244
245  /**
246   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by
247   * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker},
248   * then we mark it as deleted.
249   * @see #setDeletedIfModified(long...)
250   */
251  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
252    setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId));
253  }
254
255  /**
256   * lookup the node containing the specified procId.
257   * @param node   cached node to check before doing a lookup
258   * @param procId the procId to lookup
259   * @return the node that may contains the procId or null
260   */
261  private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) {
262    if (node != null && node.contains(procId)) {
263      return node;
264    }
265
266    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
267    return entry != null ? entry.getValue() : null;
268  }
269
270  private void trackProcIds(long procId) {
271    minModifiedProcId = Math.min(minModifiedProcId, procId);
272    maxModifiedProcId = Math.max(maxModifiedProcId, procId);
273  }
274
275  public long getModifiedMinProcId() {
276    return minModifiedProcId;
277  }
278
279  public long getModifiedMaxProcId() {
280    return maxModifiedProcId;
281  }
282
283  public void reset() {
284    this.keepDeletes = false;
285    this.partial = false;
286    this.map.clear();
287    minModifiedProcId = Long.MAX_VALUE;
288    maxModifiedProcId = Long.MIN_VALUE;
289  }
290
291  public boolean isModified(long procId) {
292    final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
293    return entry != null && entry.getValue().contains(procId)
294      && entry.getValue().isModified(procId);
295  }
296
297  /**
298   * If {@link #partial} is false, returns state from the bitmap. If no state is found for
299   * {@code procId}, returns YES. If partial is true, tracker doesn't have complete view of system
300   * state, so it returns MAYBE if there is no update for the procedure or if it doesn't have a
301   * state in bitmap. Otherwise, returns state from the bitmap.
302   */
303  public DeleteState isDeleted(long procId) {
304    Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId);
305    if (entry != null && entry.getValue().contains(procId)) {
306      BitSetNode node = entry.getValue();
307      DeleteState state = node.isDeleted(procId);
308      return partial && !node.isModified(procId) ? DeleteState.MAYBE : state;
309    }
310    return partial ? DeleteState.MAYBE : DeleteState.YES;
311  }
312
313  public long getActiveMinProcId() {
314    Map.Entry<Long, BitSetNode> entry = map.firstEntry();
315    return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId();
316  }
317
318  public void setKeepDeletes(boolean keepDeletes) {
319    this.keepDeletes = keepDeletes;
320    // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted
321    // procedures).
322    if (!keepDeletes) {
323      Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator();
324      while (it.hasNext()) {
325        Map.Entry<Long, BitSetNode> entry = it.next();
326        if (entry.getValue().isEmpty()) {
327          it.remove();
328        }
329      }
330    }
331  }
332
333  public boolean isPartial() {
334    return partial;
335  }
336
337  public void setPartialFlag(boolean isPartial) {
338    if (this.partial && !isPartial) {
339      for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
340        entry.getValue().unsetPartialFlag();
341      }
342    }
343    this.partial = isPartial;
344  }
345
346  /** Returns true, if no procedure is active, else false. */
347  public boolean isEmpty() {
348    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
349      if (!entry.getValue().isEmpty()) {
350        return false;
351      }
352    }
353    return true;
354  }
355
356  /**
357   * @return true if all procedure was modified or deleted since last call to
358   *         {@link #resetModified()}.
359   */
360  public boolean isAllModified() {
361    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
362      if (!entry.getValue().isAllModified()) {
363        return false;
364      }
365    }
366    return true;
367  }
368
369  /**
370   * Will be used when there are too many proc wal files. We will rewrite the states of the active
371   * procedures in the oldest proc wal file so that we can delete it.
372   * @return all the active procedure ids in this tracker.
373   */
374  public long[] getAllActiveProcIds() {
375    return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0)
376      .flatMapToLong(LongStream::of).toArray();
377  }
378
379  /**
380   * Clears the list of updated procedure ids. This doesn't affect global list of active procedure
381   * ids.
382   */
383  public void resetModified() {
384    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
385      entry.getValue().resetModified();
386    }
387    minModifiedProcId = Long.MAX_VALUE;
388    maxModifiedProcId = Long.MIN_VALUE;
389  }
390
391  private BitSetNode getOrCreateNode(long procId) {
392    // If procId can fit in left node (directly or by growing it)
393    BitSetNode leftNode = null;
394    boolean leftCanGrow = false;
395    Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId);
396    if (leftEntry != null) {
397      leftNode = leftEntry.getValue();
398      if (leftNode.contains(procId)) {
399        return leftNode;
400      }
401      leftCanGrow = leftNode.canGrow(procId);
402    }
403
404    // If procId can fit in right node (directly or by growing it)
405    BitSetNode rightNode = null;
406    boolean rightCanGrow = false;
407    Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId);
408    if (rightEntry != null) {
409      rightNode = rightEntry.getValue();
410      rightCanGrow = rightNode.canGrow(procId);
411      if (leftNode != null) {
412        if (leftNode.canMerge(rightNode)) {
413          // merge left and right node
414          return mergeNodes(leftNode, rightNode);
415        }
416
417        // If left and right nodes can not merge, decide which one to grow.
418        if (leftCanGrow && rightCanGrow) {
419          if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) {
420            return growNode(leftNode, procId);
421          }
422          return growNode(rightNode, procId);
423        }
424      }
425    }
426
427    // grow the left node
428    if (leftCanGrow) {
429      return growNode(leftNode, procId);
430    }
431
432    // grow the right node
433    if (rightCanGrow) {
434      return growNode(rightNode, procId);
435    }
436
437    // add new node if there are no left/right nodes which can be used.
438    BitSetNode node = new BitSetNode(procId, partial);
439    map.put(node.getStart(), node);
440    return node;
441  }
442
443  /**
444   * Grows {@code node} to contain {@code procId} and updates the map.
445   * @return {@link BitSetNode} instance which contains {@code procId}.
446   */
447  private BitSetNode growNode(BitSetNode node, long procId) {
448    map.remove(node.getStart());
449    node.grow(procId);
450    map.put(node.getStart(), node);
451    return node;
452  }
453
454  /**
455   * Merges {@code leftNode} & {@code rightNode} and updates the map.
456   */
457  private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) {
458    assert leftNode.getStart() < rightNode.getStart();
459    leftNode.merge(rightNode);
460    map.remove(rightNode.getStart());
461    return leftNode;
462  }
463
464  public void dump() {
465    System.out.println("map " + map.size());
466    System.out.println("isAllModified " + isAllModified());
467    System.out.println("isEmpty " + isEmpty());
468    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
469      entry.getValue().dump();
470    }
471  }
472
473  // ========================================================================
474  // Convert to/from Protocol Buffer.
475  // ========================================================================
476
477  /**
478   * Builds org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker
479   * protocol buffer from current state.
480   */
481  public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException {
482    ProcedureProtos.ProcedureStoreTracker.Builder builder =
483      ProcedureProtos.ProcedureStoreTracker.newBuilder();
484    for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) {
485      builder.addNode(entry.getValue().convert());
486    }
487    return builder.build();
488  }
489}