001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.procedure2.store; 020 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Iterator; 024import java.util.Map; 025import java.util.TreeMap; 026import java.util.function.BiFunction; 027import java.util.stream.LongStream; 028import org.apache.hadoop.hbase.procedure2.Procedure; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 034 035/** 036 * Keeps track of live procedures. 037 * 038 * It can be used by the ProcedureStore to identify which procedures are already 039 * deleted/completed to avoid the deserialization step on restart 040 */ 041@InterfaceAudience.Private 042public class ProcedureStoreTracker { 043 044 private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class); 045 046 // Key is procedure id corresponding to first bit of the bitmap. 047 private final TreeMap<Long, BitSetNode> map = new TreeMap<>(); 048 049 /** 050 * If true, do not remove bits corresponding to deleted procedures. Note that this can result 051 * in huge bitmaps overtime. 052 * Currently, it's set to true only when building tracker state from logs during recovery. During 053 * recovery, if we are sure that a procedure has been deleted, reading its old update entries 054 * can be skipped. 055 */ 056 private boolean keepDeletes = false; 057 /** 058 * If true, it means tracker has incomplete information about the active/deleted procedures. 059 * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to 060 * understand it's real use. 061 */ 062 boolean partial = false; 063 064 private long minModifiedProcId = Long.MAX_VALUE; 065 private long maxModifiedProcId = Long.MIN_VALUE; 066 067 public enum DeleteState { YES, NO, MAYBE } 068 069 public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { 070 reset(); 071 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode: trackerProtoBuf.getNodeList()) { 072 final BitSetNode node = new BitSetNode(protoNode); 073 map.put(node.getStart(), node); 074 } 075 } 076 077 /** 078 * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. 079 */ 080 public void resetTo(ProcedureStoreTracker tracker) { 081 resetTo(tracker, false); 082 } 083 084 /** 085 * Resets internal state to same as given {@code tracker}, and change the deleted flag according 086 * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. 087 * <p/> 088 * The {@code resetDelete} will be set to true when building cleanup tracker, please see the 089 * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the 090 * deleted flag if {@code resetDelete} is true. 091 */ 092 public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { 093 reset(); 094 // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags 095 // for all the unmodified bits to 1, the partial flag is useless so set it to false for not 096 // confusing the developers when debugging. 097 this.partial = resetDelete ? false : tracker.partial; 098 this.minModifiedProcId = tracker.minModifiedProcId; 099 this.maxModifiedProcId = tracker.maxModifiedProcId; 100 this.keepDeletes = tracker.keepDeletes; 101 for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { 102 map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); 103 } 104 } 105 106 public void insert(long procId) { 107 insert(null, procId); 108 } 109 110 public void insert(long[] procIds) { 111 for (int i = 0; i < procIds.length; ++i) { 112 insert(procIds[i]); 113 } 114 } 115 116 public void insert(long procId, long[] subProcIds) { 117 BitSetNode node = update(null, procId); 118 for (int i = 0; i < subProcIds.length; ++i) { 119 node = insert(node, subProcIds[i]); 120 } 121 } 122 123 private BitSetNode insert(BitSetNode node, long procId) { 124 if (node == null || !node.contains(procId)) { 125 node = getOrCreateNode(procId); 126 } 127 node.insertOrUpdate(procId); 128 trackProcIds(procId); 129 return node; 130 } 131 132 public void update(long procId) { 133 update(null, procId); 134 } 135 136 private BitSetNode update(BitSetNode node, long procId) { 137 node = lookupClosestNode(node, procId); 138 assert node != null : "expected node to update procId=" + procId; 139 assert node.contains(procId) : "expected procId=" + procId + " in the node"; 140 node.insertOrUpdate(procId); 141 trackProcIds(procId); 142 return node; 143 } 144 145 public void delete(long procId) { 146 delete(null, procId); 147 } 148 149 public void delete(final long[] procIds) { 150 Arrays.sort(procIds); 151 BitSetNode node = null; 152 for (int i = 0; i < procIds.length; ++i) { 153 node = delete(node, procIds[i]); 154 } 155 } 156 157 private BitSetNode delete(BitSetNode node, long procId) { 158 node = lookupClosestNode(node, procId); 159 if (node == null || !node.contains(procId)) { 160 LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId); 161 return node; 162 } 163 node.delete(procId); 164 if (!keepDeletes && node.isEmpty()) { 165 // TODO: RESET if (map.size() == 1) 166 map.remove(node.getStart()); 167 } 168 169 trackProcIds(procId); 170 return node; 171 } 172 173 /** 174 * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. 175 */ 176 public void setMinMaxModifiedProcIds(long min, long max) { 177 this.minModifiedProcId = min; 178 this.maxModifiedProcId = max; 179 } 180 /** 181 * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The 182 * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart 183 * this is not true, as we will read the wal files in reverse order so a delete may come first. 184 */ 185 public void setDeleted(long procId, boolean isDeleted) { 186 BitSetNode node = getOrCreateNode(procId); 187 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 188 node.updateState(procId, isDeleted); 189 trackProcIds(procId); 190 } 191 192 /** 193 * Set the given bit for the procId to delete if it was modified before. 194 * <p/> 195 * This method is used to test whether a procedure wal file can be safely deleted, as if all the 196 * procedures in the given procedure wal file has been modified in the new procedure wal files, 197 * then we can delete it. 198 */ 199 public void setDeletedIfModified(long... procId) { 200 BitSetNode node = null; 201 for (int i = 0; i < procId.length; ++i) { 202 node = lookupClosestNode(node, procId[i]); 203 if (node != null && node.isModified(procId[i])) { 204 node.delete(procId[i]); 205 } 206 } 207 } 208 209 private void setDeleteIf(ProcedureStoreTracker tracker, 210 BiFunction<BitSetNode, Long, Boolean> func) { 211 BitSetNode trackerNode = null; 212 for (BitSetNode node : map.values()) { 213 long minProcId = node.getStart(); 214 long maxProcId = node.getEnd(); 215 for (long procId = minProcId; procId <= maxProcId; ++procId) { 216 if (!node.isModified(procId)) { 217 continue; 218 } 219 220 trackerNode = tracker.lookupClosestNode(trackerNode, procId); 221 if (func.apply(trackerNode, procId)) { 222 node.delete(procId); 223 } 224 } 225 } 226 } 227 228 /** 229 * For the global tracker, we will use this method to build the holdingCleanupTracker, as the 230 * modified flags will be cleared after rolling so we only need to test the deleted flags. 231 * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) 232 */ 233 public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { 234 setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || 235 node.isDeleted(procId) == DeleteState.YES); 236 } 237 238 /** 239 * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by 240 * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, 241 * then we mark it as deleted. 242 * @see #setDeletedIfModified(long...) 243 */ 244 public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { 245 setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); 246 } 247 248 /** 249 * lookup the node containing the specified procId. 250 * @param node cached node to check before doing a lookup 251 * @param procId the procId to lookup 252 * @return the node that may contains the procId or null 253 */ 254 private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { 255 if (node != null && node.contains(procId)) return node; 256 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 257 return entry != null ? entry.getValue() : null; 258 } 259 260 private void trackProcIds(long procId) { 261 minModifiedProcId = Math.min(minModifiedProcId, procId); 262 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 263 } 264 265 public long getModifiedMinProcId() { 266 return minModifiedProcId; 267 } 268 269 public long getModifiedMaxProcId() { 270 return maxModifiedProcId; 271 } 272 273 public void reset() { 274 this.keepDeletes = false; 275 this.partial = false; 276 this.map.clear(); 277 minModifiedProcId = Long.MAX_VALUE; 278 maxModifiedProcId = Long.MIN_VALUE; 279 } 280 281 public boolean isModified(long procId) { 282 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 283 return entry != null && entry.getValue().contains(procId) && 284 entry.getValue().isModified(procId); 285 } 286 287 /** 288 * If {@link #partial} is false, returns state from the bitmap. If no state is found for 289 * {@code procId}, returns YES. 290 * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE 291 * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise, 292 * returns state from the bitmap. 293 */ 294 public DeleteState isDeleted(long procId) { 295 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 296 if (entry != null && entry.getValue().contains(procId)) { 297 BitSetNode node = entry.getValue(); 298 DeleteState state = node.isDeleted(procId); 299 return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; 300 } 301 return partial ? DeleteState.MAYBE : DeleteState.YES; 302 } 303 304 public long getActiveMinProcId() { 305 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 306 return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId(); 307 } 308 309 public void setKeepDeletes(boolean keepDeletes) { 310 this.keepDeletes = keepDeletes; 311 // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted 312 // procedures). 313 if (!keepDeletes) { 314 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 315 while (it.hasNext()) { 316 Map.Entry<Long, BitSetNode> entry = it.next(); 317 if (entry.getValue().isEmpty()) { 318 it.remove(); 319 } 320 } 321 } 322 } 323 324 public boolean isPartial() { 325 return partial; 326 } 327 328 public void setPartialFlag(boolean isPartial) { 329 if (this.partial && !isPartial) { 330 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 331 entry.getValue().unsetPartialFlag(); 332 } 333 } 334 this.partial = isPartial; 335 } 336 337 /** 338 * @return true, if no procedure is active, else false. 339 */ 340 public boolean isEmpty() { 341 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 342 if (!entry.getValue().isEmpty()) { 343 return false; 344 } 345 } 346 return true; 347 } 348 349 /** 350 * @return true if all procedure was modified or deleted since last call to 351 * {@link #resetModified()}. 352 */ 353 public boolean isAllModified() { 354 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 355 if (!entry.getValue().isAllModified()) { 356 return false; 357 } 358 } 359 return true; 360 } 361 362 /** 363 * Will be used when there are too many proc wal files. We will rewrite the states of the active 364 * procedures in the oldest proc wal file so that we can delete it. 365 * @return all the active procedure ids in this tracker. 366 */ 367 public long[] getAllActiveProcIds() { 368 return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) 369 .flatMapToLong(LongStream::of).toArray(); 370 } 371 372 /** 373 * Clears the list of updated procedure ids. This doesn't affect global list of active 374 * procedure ids. 375 */ 376 public void resetModified() { 377 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 378 entry.getValue().resetModified(); 379 } 380 minModifiedProcId = Long.MAX_VALUE; 381 maxModifiedProcId = Long.MIN_VALUE; 382 } 383 384 private BitSetNode getOrCreateNode(long procId) { 385 // If procId can fit in left node (directly or by growing it) 386 BitSetNode leftNode = null; 387 boolean leftCanGrow = false; 388 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 389 if (leftEntry != null) { 390 leftNode = leftEntry.getValue(); 391 if (leftNode.contains(procId)) { 392 return leftNode; 393 } 394 leftCanGrow = leftNode.canGrow(procId); 395 } 396 397 // If procId can fit in right node (directly or by growing it) 398 BitSetNode rightNode = null; 399 boolean rightCanGrow = false; 400 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 401 if (rightEntry != null) { 402 rightNode = rightEntry.getValue(); 403 rightCanGrow = rightNode.canGrow(procId); 404 if (leftNode != null) { 405 if (leftNode.canMerge(rightNode)) { 406 // merge left and right node 407 return mergeNodes(leftNode, rightNode); 408 } 409 410 // If left and right nodes can not merge, decide which one to grow. 411 if (leftCanGrow && rightCanGrow) { 412 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 413 return growNode(leftNode, procId); 414 } 415 return growNode(rightNode, procId); 416 } 417 } 418 } 419 420 // grow the left node 421 if (leftCanGrow) { 422 return growNode(leftNode, procId); 423 } 424 425 // grow the right node 426 if (rightCanGrow) { 427 return growNode(rightNode, procId); 428 } 429 430 // add new node if there are no left/right nodes which can be used. 431 BitSetNode node = new BitSetNode(procId, partial); 432 map.put(node.getStart(), node); 433 return node; 434 } 435 436 /** 437 * Grows {@code node} to contain {@code procId} and updates the map. 438 * @return {@link BitSetNode} instance which contains {@code procId}. 439 */ 440 private BitSetNode growNode(BitSetNode node, long procId) { 441 map.remove(node.getStart()); 442 node.grow(procId); 443 map.put(node.getStart(), node); 444 return node; 445 } 446 447 /** 448 * Merges {@code leftNode} & {@code rightNode} and updates the map. 449 */ 450 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 451 assert leftNode.getStart() < rightNode.getStart(); 452 leftNode.merge(rightNode); 453 map.remove(rightNode.getStart()); 454 return leftNode; 455 } 456 457 public void dump() { 458 System.out.println("map " + map.size()); 459 System.out.println("isAllModified " + isAllModified()); 460 System.out.println("isEmpty " + isEmpty()); 461 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 462 entry.getValue().dump(); 463 } 464 } 465 466 // ======================================================================== 467 // Convert to/from Protocol Buffer. 468 // ======================================================================== 469 470 /** 471 * Builds 472 * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker 473 * protocol buffer from current state. 474 */ 475 public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException { 476 ProcedureProtos.ProcedureStoreTracker.Builder builder = 477 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 478 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 479 builder.addNode(entry.getValue().convert()); 480 } 481 return builder.build(); 482 } 483}