001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.TreeMap; 026import java.util.function.BiFunction; 027import java.util.stream.LongStream; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 034 035/** 036 * Keeps track of live procedures. 037 * 038 * It can be used by the ProcedureStore to identify which procedures are already 039 * deleted/completed to avoid the deserialization step on restart 040 */ 041@InterfaceAudience.Private 042public class ProcedureStoreTracker { 043 044 private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class); 045 046 // Key is procedure id corresponding to first bit of the bitmap. 047 private final TreeMap<Long, BitSetNode> map = new TreeMap<>(); 048 049 /** 050 * If true, do not remove bits corresponding to deleted procedures. Note that this can result 051 * in huge bitmaps overtime. 052 * Currently, it's set to true only when building tracker state from logs during recovery. During 053 * recovery, if we are sure that a procedure has been deleted, reading its old update entries 054 * can be skipped. 055 */ 056 private boolean keepDeletes = false; 057 /** 058 * If true, it means tracker has incomplete information about the active/deleted procedures. 059 * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to 060 * understand it's real use. 061 */ 062 boolean partial = false; 063 064 private long minModifiedProcId = Long.MAX_VALUE; 065 private long maxModifiedProcId = Long.MIN_VALUE; 066 067 public enum DeleteState { YES, NO, MAYBE } 068 069 public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { 070 reset(); 071 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { 072 final BitSetNode node = new BitSetNode(protoNode); 073 map.put(node.getStart(), node); 074 } 075 } 076 077 /** 078 * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. 079 */ 080 public void resetTo(ProcedureStoreTracker tracker) { 081 resetTo(tracker, false); 082 } 083 084 /** 085 * Resets internal state to same as given {@code tracker}, and change the deleted flag according 086 * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. 087 * <p/> 088 * The {@code resetDelete} will be set to true when building cleanup tracker, please see the 089 * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the 090 * deleted flag if {@code resetDelete} is true. 091 */ 092 public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { 093 reset(); 094 // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags 095 // for all the unmodified bits to 1, the partial flag is useless so set it to false for not 096 // confusing the developers when debugging. 097 this.partial = resetDelete ? false : tracker.partial; 098 this.minModifiedProcId = tracker.minModifiedProcId; 099 this.maxModifiedProcId = tracker.maxModifiedProcId; 100 this.keepDeletes = tracker.keepDeletes; 101 for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { 102 map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); 103 } 104 } 105 106 public void insert(long procId) { 107 insert(null, procId); 108 } 109 110 public void insert(long[] procIds) { 111 for (int i = 0; i < procIds.length; ++i) { 112 insert(procIds[i]); 113 } 114 } 115 116 public void insert(long procId, long[] subProcIds) { 117 BitSetNode node = update(null, procId); 118 for (int i = 0; i < subProcIds.length; ++i) { 119 node = insert(node, subProcIds[i]); 120 } 121 } 122 123 private BitSetNode insert(BitSetNode node, long procId) { 124 if (node == null || !node.contains(procId)) { 125 node = getOrCreateNode(procId); 126 } 127 node.insertOrUpdate(procId); 128 trackProcIds(procId); 129 return node; 130 } 131 132 public void update(long procId) { 133 update(null, procId); 134 } 135 136 private BitSetNode update(BitSetNode node, long procId) { 137 node = lookupClosestNode(node, procId); 138 assert node != null : "expected node to update procId=" + procId; 139 assert node.contains(procId) : "expected procId=" + procId + " in the node"; 140 if (node == null) { 141 throw new NullPointerException("pid=" + procId); 142 } 143 node.insertOrUpdate(procId); 144 trackProcIds(procId); 145 return node; 146 } 147 148 public void delete(long procId) { 149 delete(null, procId); 150 } 151 152 public void delete(final long[] procIds) { 153 Arrays.sort(procIds); 154 BitSetNode node = null; 155 for (int i = 0; i < procIds.length; ++i) { 156 node = delete(node, procIds[i]); 157 } 158 } 159 160 private BitSetNode delete(BitSetNode node, long procId) { 161 node = lookupClosestNode(node, procId); 162 if (node == null || !node.contains(procId)) { 163 LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId); 164 return node; 165 } 166 node.delete(procId); 167 if (!keepDeletes && node.isEmpty()) { 168 // TODO: RESET if (map.size() == 1) 169 map.remove(node.getStart()); 170 } 171 172 trackProcIds(procId); 173 return node; 174 } 175 176 /** 177 * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. 178 */ 179 public void setMinMaxModifiedProcIds(long min, long max) { 180 this.minModifiedProcId = min; 181 this.maxModifiedProcId = max; 182 } 183 /** 184 * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The 185 * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart 186 * this is not true, as we will read the wal files in reverse order so a delete may come first. 187 */ 188 public void setDeleted(long procId, boolean isDeleted) { 189 BitSetNode node = getOrCreateNode(procId); 190 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 191 node.updateState(procId, isDeleted); 192 trackProcIds(procId); 193 } 194 195 /** 196 * Set the given bit for the procId to delete if it was modified before. 197 * <p/> 198 * This method is used to test whether a procedure wal file can be safely deleted, as if all the 199 * procedures in the given procedure wal file has been modified in the new procedure wal files, 200 * then we can delete it. 201 */ 202 public void setDeletedIfModified(long... procId) { 203 BitSetNode node = null; 204 for (int i = 0; i < procId.length; ++i) { 205 node = lookupClosestNode(node, procId[i]); 206 if (node != null && node.isModified(procId[i])) { 207 node.delete(procId[i]); 208 } 209 } 210 } 211 212 private void setDeleteIf(ProcedureStoreTracker tracker, 213 BiFunction<BitSetNode, Long, Boolean> func) { 214 BitSetNode trackerNode = null; 215 for (BitSetNode node : map.values()) { 216 long minProcId = node.getStart(); 217 long maxProcId = node.getEnd(); 218 for (long procId = minProcId; procId <= maxProcId; ++procId) { 219 if (!node.isModified(procId)) { 220 continue; 221 } 222 223 trackerNode = tracker.lookupClosestNode(trackerNode, procId); 224 if (func.apply(trackerNode, procId)) { 225 node.delete(procId); 226 } 227 } 228 } 229 } 230 231 /** 232 * For the global tracker, we will use this method to build the holdingCleanupTracker, as the 233 * modified flags will be cleared after rolling so we only need to test the deleted flags. 234 * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) 235 */ 236 public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { 237 setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || 238 node.isDeleted(procId) == DeleteState.YES); 239 } 240 241 /** 242 * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by 243 * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, 244 * then we mark it as deleted. 245 * @see #setDeletedIfModified(long...) 246 */ 247 public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { 248 setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); 249 } 250 251 /** 252 * lookup the node containing the specified procId. 253 * @param node cached node to check before doing a lookup 254 * @param procId the procId to lookup 255 * @return the node that may contains the procId or null 256 */ 257 private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { 258 if (node != null && node.contains(procId)) return node; 259 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 260 return entry != null ? entry.getValue() : null; 261 } 262 263 private void trackProcIds(long procId) { 264 minModifiedProcId = Math.min(minModifiedProcId, procId); 265 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 266 } 267 268 public long getModifiedMinProcId() { 269 return minModifiedProcId; 270 } 271 272 public long getModifiedMaxProcId() { 273 return maxModifiedProcId; 274 } 275 276 public void reset() { 277 this.keepDeletes = false; 278 this.partial = false; 279 this.map.clear(); 280 minModifiedProcId = Long.MAX_VALUE; 281 maxModifiedProcId = Long.MIN_VALUE; 282 } 283 284 public boolean isModified(long procId) { 285 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 286 return entry != null && entry.getValue().contains(procId) && 287 entry.getValue().isModified(procId); 288 } 289 290 /** 291 * If {@link #partial} is false, returns state from the bitmap. If no state is found for 292 * {@code procId}, returns YES. 293 * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE 294 * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise, 295 * returns state from the bitmap. 296 */ 297 public DeleteState isDeleted(long procId) { 298 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 299 if (entry != null && entry.getValue().contains(procId)) { 300 BitSetNode node = entry.getValue(); 301 DeleteState state = node.isDeleted(procId); 302 return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; 303 } 304 return partial ? DeleteState.MAYBE : DeleteState.YES; 305 } 306 307 public long getActiveMinProcId() { 308 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 309 return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId(); 310 } 311 312 public void setKeepDeletes(boolean keepDeletes) { 313 this.keepDeletes = keepDeletes; 314 // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted 315 // procedures). 316 if (!keepDeletes) { 317 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 318 while (it.hasNext()) { 319 Map.Entry<Long, BitSetNode> entry = it.next(); 320 if (entry.getValue().isEmpty()) { 321 it.remove(); 322 } 323 } 324 } 325 } 326 327 public boolean isPartial() { 328 return partial; 329 } 330 331 public void setPartialFlag(boolean isPartial) { 332 if (this.partial && !isPartial) { 333 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 334 entry.getValue().unsetPartialFlag(); 335 } 336 } 337 this.partial = isPartial; 338 } 339 340 /** 341 * @return true, if no procedure is active, else false. 342 */ 343 public boolean isEmpty() { 344 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 345 if (!entry.getValue().isEmpty()) { 346 return false; 347 } 348 } 349 return true; 350 } 351 352 /** 353 * @return true if all procedure was modified or deleted since last call to 354 * {@link #resetModified()}. 355 */ 356 public boolean isAllModified() { 357 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 358 if (!entry.getValue().isAllModified()) { 359 return false; 360 } 361 } 362 return true; 363 } 364 365 /** 366 * Will be used when there are too many proc wal files. We will rewrite the states of the active 367 * procedures in the oldest proc wal file so that we can delete it. 368 * @return all the active procedure ids in this tracker. 369 */ 370 public long[] getAllActiveProcIds() { 371 return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) 372 .flatMapToLong(LongStream::of).toArray(); 373 } 374 375 /** 376 * Clears the list of updated procedure ids. This doesn't affect global list of active 377 * procedure ids. 378 */ 379 public void resetModified() { 380 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 381 entry.getValue().resetModified(); 382 } 383 minModifiedProcId = Long.MAX_VALUE; 384 maxModifiedProcId = Long.MIN_VALUE; 385 } 386 387 private BitSetNode getOrCreateNode(long procId) { 388 // If procId can fit in left node (directly or by growing it) 389 BitSetNode leftNode = null; 390 boolean leftCanGrow = false; 391 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 392 if (leftEntry != null) { 393 leftNode = leftEntry.getValue(); 394 if (leftNode.contains(procId)) { 395 return leftNode; 396 } 397 leftCanGrow = leftNode.canGrow(procId); 398 } 399 400 // If procId can fit in right node (directly or by growing it) 401 BitSetNode rightNode = null; 402 boolean rightCanGrow = false; 403 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 404 if (rightEntry != null) { 405 rightNode = rightEntry.getValue(); 406 rightCanGrow = rightNode.canGrow(procId); 407 if (leftNode != null) { 408 if (leftNode.canMerge(rightNode)) { 409 // merge left and right node 410 return mergeNodes(leftNode, rightNode); 411 } 412 413 // If left and right nodes can not merge, decide which one to grow. 414 if (leftCanGrow && rightCanGrow) { 415 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 416 return growNode(leftNode, procId); 417 } 418 return growNode(rightNode, procId); 419 } 420 } 421 } 422 423 // grow the left node 424 if (leftCanGrow) { 425 return growNode(leftNode, procId); 426 } 427 428 // grow the right node 429 if (rightCanGrow) { 430 return growNode(rightNode, procId); 431 } 432 433 // add new node if there are no left/right nodes which can be used. 434 BitSetNode node = new BitSetNode(procId, partial); 435 map.put(node.getStart(), node); 436 return node; 437 } 438 439 /** 440 * Grows {@code node} to contain {@code procId} and updates the map. 441 * @return {@link BitSetNode} instance which contains {@code procId}. 442 */ 443 private BitSetNode growNode(BitSetNode node, long procId) { 444 map.remove(node.getStart()); 445 node.grow(procId); 446 map.put(node.getStart(), node); 447 return node; 448 } 449 450 /** 451 * Merges {@code leftNode} & {@code rightNode} and updates the map. 452 */ 453 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 454 assert leftNode.getStart() < rightNode.getStart(); 455 leftNode.merge(rightNode); 456 map.remove(rightNode.getStart()); 457 return leftNode; 458 } 459 460 public void dump() { 461 System.out.println("map " + map.size()); 462 System.out.println("isAllModified " + isAllModified()); 463 System.out.println("isEmpty " + isEmpty()); 464 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 465 entry.getValue().dump(); 466 } 467 } 468 469 // ======================================================================== 470 // Convert to/from Protocol Buffer. 471 // ======================================================================== 472 473 /** 474 * Builds 475 * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker 476 * protocol buffer from current state. 477 */ 478 public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException { 479 ProcedureProtos.ProcedureStoreTracker.Builder builder = 480 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 481 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 482 builder.addNode(entry.getValue().convert()); 483 } 484 return builder.build(); 485 } 486}