001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.function.BiFunction; 026import java.util.stream.LongStream; 027import org.apache.hadoop.hbase.procedure2.Procedure; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 033 034/** 035 * Keeps track of live procedures. It can be used by the ProcedureStore to identify which procedures 036 * are already deleted/completed to avoid the deserialization step on restart 037 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 038 * use the new region based procedure store. 039 */ 040@Deprecated 041@InterfaceAudience.Private 042class ProcedureStoreTracker { 043 private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class); 044 045 // Key is procedure id corresponding to first bit of the bitmap. 046 private final TreeMap<Long, BitSetNode> map = new TreeMap<>(); 047 048 /** 049 * If true, do not remove bits corresponding to deleted procedures. Note that this can result in 050 * huge bitmaps overtime. Currently, it's set to true only when building tracker state from logs 051 * during recovery. During recovery, if we are sure that a procedure has been deleted, reading its 052 * old update entries can be skipped. 053 */ 054 private boolean keepDeletes = false; 055 /** 056 * If true, it means tracker has incomplete information about the active/deleted procedures. It's 057 * set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to understand 058 * it's real use. 059 */ 060 boolean partial = false; 061 062 private long minModifiedProcId = Long.MAX_VALUE; 063 private long maxModifiedProcId = Long.MIN_VALUE; 064 065 public enum DeleteState { 066 YES, 067 NO, 068 MAYBE 069 } 070 071 public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { 072 reset(); 073 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode : trackerProtoBuf 074 .getNodeList()) { 075 final BitSetNode node = new BitSetNode(protoNode); 076 map.put(node.getStart(), node); 077 } 078 } 079 080 /** 081 * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. 082 */ 083 public void resetTo(ProcedureStoreTracker tracker) { 084 resetTo(tracker, false); 085 } 086 087 /** 088 * Resets internal state to same as given {@code tracker}, and change the deleted flag according 089 * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. 090 * <p/> 091 * The {@code resetDelete} will be set to true when building cleanup tracker, please see the 092 * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the 093 * deleted flag if {@code resetDelete} is true. 094 */ 095 public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { 096 reset(); 097 // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags 098 // for all the unmodified bits to 1, the partial flag is useless so set it to false for not 099 // confusing the developers when debugging. 100 this.partial = resetDelete ? false : tracker.partial; 101 this.minModifiedProcId = tracker.minModifiedProcId; 102 this.maxModifiedProcId = tracker.maxModifiedProcId; 103 this.keepDeletes = tracker.keepDeletes; 104 for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { 105 map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); 106 } 107 } 108 109 public void insert(long procId) { 110 insert(null, procId); 111 } 112 113 public void insert(long[] procIds) { 114 for (int i = 0; i < procIds.length; ++i) { 115 insert(procIds[i]); 116 } 117 } 118 119 public void insert(long procId, long[] subProcIds) { 120 BitSetNode node = update(null, procId); 121 for (int i = 0; i < subProcIds.length; ++i) { 122 node = insert(node, subProcIds[i]); 123 } 124 } 125 126 private BitSetNode insert(BitSetNode node, long procId) { 127 if (node == null || !node.contains(procId)) { 128 node = getOrCreateNode(procId); 129 } 130 node.insertOrUpdate(procId); 131 trackProcIds(procId); 132 return node; 133 } 134 135 public void update(long procId) { 136 update(null, procId); 137 } 138 139 private BitSetNode update(BitSetNode node, long procId) { 140 node = lookupClosestNode(node, procId); 141 assert node != null : "expected node to update procId=" + procId; 142 assert node.contains(procId) : "expected procId=" + procId + " in the node"; 143 if (node == null) { 144 throw new NullPointerException("pid=" + procId); 145 } 146 node.insertOrUpdate(procId); 147 trackProcIds(procId); 148 return node; 149 } 150 151 public void delete(long procId) { 152 delete(null, procId); 153 } 154 155 public void delete(final long[] procIds) { 156 Arrays.sort(procIds); 157 BitSetNode node = null; 158 for (int i = 0; i < procIds.length; ++i) { 159 node = delete(node, procIds[i]); 160 } 161 } 162 163 private BitSetNode delete(BitSetNode node, long procId) { 164 node = lookupClosestNode(node, procId); 165 if (node == null || !node.contains(procId)) { 166 LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId); 167 return node; 168 } 169 node.delete(procId); 170 if (!keepDeletes && node.isEmpty()) { 171 // TODO: RESET if (map.size() == 1) 172 map.remove(node.getStart()); 173 } 174 175 trackProcIds(procId); 176 return node; 177 } 178 179 /** 180 * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. 181 */ 182 public void setMinMaxModifiedProcIds(long min, long max) { 183 this.minModifiedProcId = min; 184 this.maxModifiedProcId = max; 185 } 186 187 /** 188 * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The 189 * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart 190 * this is not true, as we will read the wal files in reverse order so a delete may come first. 191 */ 192 public void setDeleted(long procId, boolean isDeleted) { 193 BitSetNode node = getOrCreateNode(procId); 194 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 195 node.updateState(procId, isDeleted); 196 trackProcIds(procId); 197 } 198 199 /** 200 * Set the given bit for the procId to delete if it was modified before. 201 * <p/> 202 * This method is used to test whether a procedure wal file can be safely deleted, as if all the 203 * procedures in the given procedure wal file has been modified in the new procedure wal files, 204 * then we can delete it. 205 */ 206 public void setDeletedIfModified(long... procId) { 207 BitSetNode node = null; 208 for (int i = 0; i < procId.length; ++i) { 209 node = lookupClosestNode(node, procId[i]); 210 if (node != null && node.isModified(procId[i])) { 211 node.delete(procId[i]); 212 } 213 } 214 } 215 216 private void setDeleteIf(ProcedureStoreTracker tracker, 217 BiFunction<BitSetNode, Long, Boolean> func) { 218 BitSetNode trackerNode = null; 219 for (BitSetNode node : map.values()) { 220 long minProcId = node.getStart(); 221 long maxProcId = node.getEnd(); 222 for (long procId = minProcId; procId <= maxProcId; ++procId) { 223 if (!node.isModified(procId)) { 224 continue; 225 } 226 227 trackerNode = tracker.lookupClosestNode(trackerNode, procId); 228 if (func.apply(trackerNode, procId)) { 229 node.delete(procId); 230 } 231 } 232 } 233 } 234 235 /** 236 * For the global tracker, we will use this method to build the holdingCleanupTracker, as the 237 * modified flags will be cleared after rolling so we only need to test the deleted flags. 238 * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) 239 */ 240 public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { 241 setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) 242 || node.isDeleted(procId) == DeleteState.YES); 243 } 244 245 /** 246 * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by 247 * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, 248 * then we mark it as deleted. 249 * @see #setDeletedIfModified(long...) 250 */ 251 public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { 252 setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); 253 } 254 255 /** 256 * lookup the node containing the specified procId. 257 * @param node cached node to check before doing a lookup 258 * @param procId the procId to lookup 259 * @return the node that may contains the procId or null 260 */ 261 private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { 262 if (node != null && node.contains(procId)) { 263 return node; 264 } 265 266 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 267 return entry != null ? entry.getValue() : null; 268 } 269 270 private void trackProcIds(long procId) { 271 minModifiedProcId = Math.min(minModifiedProcId, procId); 272 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 273 } 274 275 public long getModifiedMinProcId() { 276 return minModifiedProcId; 277 } 278 279 public long getModifiedMaxProcId() { 280 return maxModifiedProcId; 281 } 282 283 public void reset() { 284 this.keepDeletes = false; 285 this.partial = false; 286 this.map.clear(); 287 minModifiedProcId = Long.MAX_VALUE; 288 maxModifiedProcId = Long.MIN_VALUE; 289 } 290 291 public boolean isModified(long procId) { 292 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 293 return entry != null && entry.getValue().contains(procId) 294 && entry.getValue().isModified(procId); 295 } 296 297 /** 298 * If {@link #partial} is false, returns state from the bitmap. If no state is found for 299 * {@code procId}, returns YES. If partial is true, tracker doesn't have complete view of system 300 * state, so it returns MAYBE if there is no update for the procedure or if it doesn't have a 301 * state in bitmap. Otherwise, returns state from the bitmap. 302 */ 303 public DeleteState isDeleted(long procId) { 304 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 305 if (entry != null && entry.getValue().contains(procId)) { 306 BitSetNode node = entry.getValue(); 307 DeleteState state = node.isDeleted(procId); 308 return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; 309 } 310 return partial ? DeleteState.MAYBE : DeleteState.YES; 311 } 312 313 public long getActiveMinProcId() { 314 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 315 return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId(); 316 } 317 318 public void setKeepDeletes(boolean keepDeletes) { 319 this.keepDeletes = keepDeletes; 320 // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted 321 // procedures). 322 if (!keepDeletes) { 323 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 324 while (it.hasNext()) { 325 Map.Entry<Long, BitSetNode> entry = it.next(); 326 if (entry.getValue().isEmpty()) { 327 it.remove(); 328 } 329 } 330 } 331 } 332 333 public boolean isPartial() { 334 return partial; 335 } 336 337 public void setPartialFlag(boolean isPartial) { 338 if (this.partial && !isPartial) { 339 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 340 entry.getValue().unsetPartialFlag(); 341 } 342 } 343 this.partial = isPartial; 344 } 345 346 /** Returns true, if no procedure is active, else false. */ 347 public boolean isEmpty() { 348 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 349 if (!entry.getValue().isEmpty()) { 350 return false; 351 } 352 } 353 return true; 354 } 355 356 /** 357 * @return true if all procedure was modified or deleted since last call to 358 * {@link #resetModified()}. 359 */ 360 public boolean isAllModified() { 361 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 362 if (!entry.getValue().isAllModified()) { 363 return false; 364 } 365 } 366 return true; 367 } 368 369 /** 370 * Will be used when there are too many proc wal files. We will rewrite the states of the active 371 * procedures in the oldest proc wal file so that we can delete it. 372 * @return all the active procedure ids in this tracker. 373 */ 374 public long[] getAllActiveProcIds() { 375 return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) 376 .flatMapToLong(LongStream::of).toArray(); 377 } 378 379 /** 380 * Clears the list of updated procedure ids. This doesn't affect global list of active procedure 381 * ids. 382 */ 383 public void resetModified() { 384 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 385 entry.getValue().resetModified(); 386 } 387 minModifiedProcId = Long.MAX_VALUE; 388 maxModifiedProcId = Long.MIN_VALUE; 389 } 390 391 private BitSetNode getOrCreateNode(long procId) { 392 // If procId can fit in left node (directly or by growing it) 393 BitSetNode leftNode = null; 394 boolean leftCanGrow = false; 395 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 396 if (leftEntry != null) { 397 leftNode = leftEntry.getValue(); 398 if (leftNode.contains(procId)) { 399 return leftNode; 400 } 401 leftCanGrow = leftNode.canGrow(procId); 402 } 403 404 // If procId can fit in right node (directly or by growing it) 405 BitSetNode rightNode = null; 406 boolean rightCanGrow = false; 407 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 408 if (rightEntry != null) { 409 rightNode = rightEntry.getValue(); 410 rightCanGrow = rightNode.canGrow(procId); 411 if (leftNode != null) { 412 if (leftNode.canMerge(rightNode)) { 413 // merge left and right node 414 return mergeNodes(leftNode, rightNode); 415 } 416 417 // If left and right nodes can not merge, decide which one to grow. 418 if (leftCanGrow && rightCanGrow) { 419 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 420 return growNode(leftNode, procId); 421 } 422 return growNode(rightNode, procId); 423 } 424 } 425 } 426 427 // grow the left node 428 if (leftCanGrow) { 429 return growNode(leftNode, procId); 430 } 431 432 // grow the right node 433 if (rightCanGrow) { 434 return growNode(rightNode, procId); 435 } 436 437 // add new node if there are no left/right nodes which can be used. 438 BitSetNode node = new BitSetNode(procId, partial); 439 map.put(node.getStart(), node); 440 return node; 441 } 442 443 /** 444 * Grows {@code node} to contain {@code procId} and updates the map. 445 * @return {@link BitSetNode} instance which contains {@code procId}. 446 */ 447 private BitSetNode growNode(BitSetNode node, long procId) { 448 map.remove(node.getStart()); 449 node.grow(procId); 450 map.put(node.getStart(), node); 451 return node; 452 } 453 454 /** 455 * Merges {@code leftNode} & {@code rightNode} and updates the map. 456 */ 457 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 458 assert leftNode.getStart() < rightNode.getStart(); 459 leftNode.merge(rightNode); 460 map.remove(rightNode.getStart()); 461 return leftNode; 462 } 463 464 public void dump() { 465 System.out.println("map " + map.size()); 466 System.out.println("isAllModified " + isAllModified()); 467 System.out.println("isEmpty " + isEmpty()); 468 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 469 entry.getValue().dump(); 470 } 471 } 472 473 // ======================================================================== 474 // Convert to/from Protocol Buffer. 475 // ======================================================================== 476 477 /** 478 * Builds org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker 479 * protocol buffer from current state. 480 */ 481 public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException { 482 ProcedureProtos.ProcedureStoreTracker.Builder builder = 483 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 484 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 485 builder.addNode(entry.getValue().convert()); 486 } 487 return builder.build(); 488 } 489}