001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.function.BiFunction; 026import java.util.stream.LongStream; 027import org.apache.hadoop.hbase.procedure2.Procedure; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 033 034/** 035 * Keeps track of live procedures. 036 * 037 * It can be used by the ProcedureStore to identify which procedures are already 038 * deleted/completed to avoid the deserialization step on restart 039 */ 040@InterfaceAudience.Private 041public class ProcedureStoreTracker { 042 private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class); 043 044 // Key is procedure id corresponding to first bit of the bitmap. 045 private final TreeMap<Long, BitSetNode> map = new TreeMap<>(); 046 047 /** 048 * If true, do not remove bits corresponding to deleted procedures. Note that this can result 049 * in huge bitmaps overtime. 050 * Currently, it's set to true only when building tracker state from logs during recovery. During 051 * recovery, if we are sure that a procedure has been deleted, reading its old update entries 052 * can be skipped. 053 */ 054 private boolean keepDeletes = false; 055 /** 056 * If true, it means tracker has incomplete information about the active/deleted procedures. 057 * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to 058 * understand it's real use. 059 */ 060 boolean partial = false; 061 062 private long minModifiedProcId = Long.MAX_VALUE; 063 private long maxModifiedProcId = Long.MIN_VALUE; 064 065 public enum DeleteState { YES, NO, MAYBE } 066 067 public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { 068 reset(); 069 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode : 070 trackerProtoBuf.getNodeList()) { 071 final BitSetNode node = new BitSetNode(protoNode); 072 map.put(node.getStart(), node); 073 } 074 } 075 076 /** 077 * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. 078 */ 079 public void resetTo(ProcedureStoreTracker tracker) { 080 resetTo(tracker, false); 081 } 082 083 /** 084 * Resets internal state to same as given {@code tracker}, and change the deleted flag according 085 * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. 086 * <p/> 087 * The {@code resetDelete} will be set to true when building cleanup tracker, please see the 088 * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the 089 * deleted flag if {@code resetDelete} is true. 090 */ 091 public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { 092 reset(); 093 // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags 094 // for all the unmodified bits to 1, the partial flag is useless so set it to false for not 095 // confusing the developers when debugging. 096 this.partial = resetDelete ? false : tracker.partial; 097 this.minModifiedProcId = tracker.minModifiedProcId; 098 this.maxModifiedProcId = tracker.maxModifiedProcId; 099 this.keepDeletes = tracker.keepDeletes; 100 for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { 101 map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); 102 } 103 } 104 105 public void insert(long procId) { 106 insert(null, procId); 107 } 108 109 public void insert(long[] procIds) { 110 for (int i = 0; i < procIds.length; ++i) { 111 insert(procIds[i]); 112 } 113 } 114 115 public void insert(long procId, long[] subProcIds) { 116 BitSetNode node = update(null, procId); 117 for (int i = 0; i < subProcIds.length; ++i) { 118 node = insert(node, subProcIds[i]); 119 } 120 } 121 122 private BitSetNode insert(BitSetNode node, long procId) { 123 if (node == null || !node.contains(procId)) { 124 node = getOrCreateNode(procId); 125 } 126 node.insertOrUpdate(procId); 127 trackProcIds(procId); 128 return node; 129 } 130 131 public void update(long procId) { 132 update(null, procId); 133 } 134 135 private BitSetNode update(BitSetNode node, long procId) { 136 node = lookupClosestNode(node, procId); 137 assert node != null : "expected node to update procId=" + procId; 138 assert node.contains(procId) : "expected procId=" + procId + " in the node"; 139 if (node == null) { 140 throw new NullPointerException("pid=" + procId); 141 } 142 node.insertOrUpdate(procId); 143 trackProcIds(procId); 144 return node; 145 } 146 147 public void delete(long procId) { 148 delete(null, procId); 149 } 150 151 public void delete(final long[] procIds) { 152 Arrays.sort(procIds); 153 BitSetNode node = null; 154 for (int i = 0; i < procIds.length; ++i) { 155 node = delete(node, procIds[i]); 156 } 157 } 158 159 private BitSetNode delete(BitSetNode node, long procId) { 160 node = lookupClosestNode(node, procId); 161 if (node == null || !node.contains(procId)) { 162 LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId); 163 return node; 164 } 165 node.delete(procId); 166 if (!keepDeletes && node.isEmpty()) { 167 // TODO: RESET if (map.size() == 1) 168 map.remove(node.getStart()); 169 } 170 171 trackProcIds(procId); 172 return node; 173 } 174 175 /** 176 * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. 177 */ 178 public void setMinMaxModifiedProcIds(long min, long max) { 179 this.minModifiedProcId = min; 180 this.maxModifiedProcId = max; 181 } 182 /** 183 * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The 184 * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart 185 * this is not true, as we will read the wal files in reverse order so a delete may come first. 186 */ 187 public void setDeleted(long procId, boolean isDeleted) { 188 BitSetNode node = getOrCreateNode(procId); 189 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 190 node.updateState(procId, isDeleted); 191 trackProcIds(procId); 192 } 193 194 /** 195 * Set the given bit for the procId to delete if it was modified before. 196 * <p/> 197 * This method is used to test whether a procedure wal file can be safely deleted, as if all the 198 * procedures in the given procedure wal file has been modified in the new procedure wal files, 199 * then we can delete it. 200 */ 201 public void setDeletedIfModified(long... procId) { 202 BitSetNode node = null; 203 for (int i = 0; i < procId.length; ++i) { 204 node = lookupClosestNode(node, procId[i]); 205 if (node != null && node.isModified(procId[i])) { 206 node.delete(procId[i]); 207 } 208 } 209 } 210 211 private void setDeleteIf(ProcedureStoreTracker tracker, 212 BiFunction<BitSetNode, Long, Boolean> func) { 213 BitSetNode trackerNode = null; 214 for (BitSetNode node : map.values()) { 215 long minProcId = node.getStart(); 216 long maxProcId = node.getEnd(); 217 for (long procId = minProcId; procId <= maxProcId; ++procId) { 218 if (!node.isModified(procId)) { 219 continue; 220 } 221 222 trackerNode = tracker.lookupClosestNode(trackerNode, procId); 223 if (func.apply(trackerNode, procId)) { 224 node.delete(procId); 225 } 226 } 227 } 228 } 229 230 /** 231 * For the global tracker, we will use this method to build the holdingCleanupTracker, as the 232 * modified flags will be cleared after rolling so we only need to test the deleted flags. 233 * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) 234 */ 235 public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { 236 setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || 237 node.isDeleted(procId) == DeleteState.YES); 238 } 239 240 /** 241 * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by 242 * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, 243 * then we mark it as deleted. 244 * @see #setDeletedIfModified(long...) 245 */ 246 public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { 247 setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); 248 } 249 250 /** 251 * lookup the node containing the specified procId. 252 * @param node cached node to check before doing a lookup 253 * @param procId the procId to lookup 254 * @return the node that may contains the procId or null 255 */ 256 private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { 257 if (node != null && node.contains(procId)) { 258 return node; 259 } 260 261 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 262 return entry != null ? entry.getValue() : null; 263 } 264 265 private void trackProcIds(long procId) { 266 minModifiedProcId = Math.min(minModifiedProcId, procId); 267 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 268 } 269 270 public long getModifiedMinProcId() { 271 return minModifiedProcId; 272 } 273 274 public long getModifiedMaxProcId() { 275 return maxModifiedProcId; 276 } 277 278 public void reset() { 279 this.keepDeletes = false; 280 this.partial = false; 281 this.map.clear(); 282 minModifiedProcId = Long.MAX_VALUE; 283 maxModifiedProcId = Long.MIN_VALUE; 284 } 285 286 public boolean isModified(long procId) { 287 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 288 return entry != null && entry.getValue().contains(procId) && 289 entry.getValue().isModified(procId); 290 } 291 292 /** 293 * If {@link #partial} is false, returns state from the bitmap. If no state is found for 294 * {@code procId}, returns YES. 295 * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE 296 * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise, 297 * returns state from the bitmap. 298 */ 299 public DeleteState isDeleted(long procId) { 300 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 301 if (entry != null && entry.getValue().contains(procId)) { 302 BitSetNode node = entry.getValue(); 303 DeleteState state = node.isDeleted(procId); 304 return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; 305 } 306 return partial ? DeleteState.MAYBE : DeleteState.YES; 307 } 308 309 public long getActiveMinProcId() { 310 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 311 return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId(); 312 } 313 314 public void setKeepDeletes(boolean keepDeletes) { 315 this.keepDeletes = keepDeletes; 316 // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted 317 // procedures). 318 if (!keepDeletes) { 319 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 320 while (it.hasNext()) { 321 Map.Entry<Long, BitSetNode> entry = it.next(); 322 if (entry.getValue().isEmpty()) { 323 it.remove(); 324 } 325 } 326 } 327 } 328 329 public boolean isPartial() { 330 return partial; 331 } 332 333 public void setPartialFlag(boolean isPartial) { 334 if (this.partial && !isPartial) { 335 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 336 entry.getValue().unsetPartialFlag(); 337 } 338 } 339 this.partial = isPartial; 340 } 341 342 /** 343 * @return true, if no procedure is active, else false. 344 */ 345 public boolean isEmpty() { 346 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 347 if (!entry.getValue().isEmpty()) { 348 return false; 349 } 350 } 351 return true; 352 } 353 354 /** 355 * @return true if all procedure was modified or deleted since last call to 356 * {@link #resetModified()}. 357 */ 358 public boolean isAllModified() { 359 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 360 if (!entry.getValue().isAllModified()) { 361 return false; 362 } 363 } 364 return true; 365 } 366 367 /** 368 * Will be used when there are too many proc wal files. We will rewrite the states of the active 369 * procedures in the oldest proc wal file so that we can delete it. 370 * @return all the active procedure ids in this tracker. 371 */ 372 public long[] getAllActiveProcIds() { 373 return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) 374 .flatMapToLong(LongStream::of).toArray(); 375 } 376 377 /** 378 * Clears the list of updated procedure ids. This doesn't affect global list of active 379 * procedure ids. 380 */ 381 public void resetModified() { 382 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 383 entry.getValue().resetModified(); 384 } 385 minModifiedProcId = Long.MAX_VALUE; 386 maxModifiedProcId = Long.MIN_VALUE; 387 } 388 389 private BitSetNode getOrCreateNode(long procId) { 390 // If procId can fit in left node (directly or by growing it) 391 BitSetNode leftNode = null; 392 boolean leftCanGrow = false; 393 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 394 if (leftEntry != null) { 395 leftNode = leftEntry.getValue(); 396 if (leftNode.contains(procId)) { 397 return leftNode; 398 } 399 leftCanGrow = leftNode.canGrow(procId); 400 } 401 402 // If procId can fit in right node (directly or by growing it) 403 BitSetNode rightNode = null; 404 boolean rightCanGrow = false; 405 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 406 if (rightEntry != null) { 407 rightNode = rightEntry.getValue(); 408 rightCanGrow = rightNode.canGrow(procId); 409 if (leftNode != null) { 410 if (leftNode.canMerge(rightNode)) { 411 // merge left and right node 412 return mergeNodes(leftNode, rightNode); 413 } 414 415 // If left and right nodes can not merge, decide which one to grow. 416 if (leftCanGrow && rightCanGrow) { 417 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 418 return growNode(leftNode, procId); 419 } 420 return growNode(rightNode, procId); 421 } 422 } 423 } 424 425 // grow the left node 426 if (leftCanGrow) { 427 return growNode(leftNode, procId); 428 } 429 430 // grow the right node 431 if (rightCanGrow) { 432 return growNode(rightNode, procId); 433 } 434 435 // add new node if there are no left/right nodes which can be used. 436 BitSetNode node = new BitSetNode(procId, partial); 437 map.put(node.getStart(), node); 438 return node; 439 } 440 441 /** 442 * Grows {@code node} to contain {@code procId} and updates the map. 443 * @return {@link BitSetNode} instance which contains {@code procId}. 444 */ 445 private BitSetNode growNode(BitSetNode node, long procId) { 446 map.remove(node.getStart()); 447 node.grow(procId); 448 map.put(node.getStart(), node); 449 return node; 450 } 451 452 /** 453 * Merges {@code leftNode} & {@code rightNode} and updates the map. 454 */ 455 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 456 assert leftNode.getStart() < rightNode.getStart(); 457 leftNode.merge(rightNode); 458 map.remove(rightNode.getStart()); 459 return leftNode; 460 } 461 462 public void dump() { 463 System.out.println("map " + map.size()); 464 System.out.println("isAllModified " + isAllModified()); 465 System.out.println("isEmpty " + isEmpty()); 466 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 467 entry.getValue().dump(); 468 } 469 } 470 471 // ======================================================================== 472 // Convert to/from Protocol Buffer. 473 // ======================================================================== 474 475 /** 476 * Builds 477 * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker 478 * protocol buffer from current state. 479 */ 480 public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException { 481 ProcedureProtos.ProcedureStoreTracker.Builder builder = 482 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 483 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 484 builder.addNode(entry.getValue().convert()); 485 } 486 return builder.build(); 487 } 488}