001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.io.IOException; 021import java.util.Arrays; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.TreeMap; 025import java.util.function.BiFunction; 026import java.util.stream.LongStream; 027import org.apache.hadoop.hbase.procedure2.Procedure; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.slf4j.Logger; 030import org.slf4j.LoggerFactory; 031 032import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 033 034/** 035 * Keeps track of live procedures. 036 * 037 * It can be used by the ProcedureStore to identify which procedures are already 038 * deleted/completed to avoid the deserialization step on restart 039 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 040 * use the new region based procedure store. 041 */ 042@Deprecated 043@InterfaceAudience.Private 044class ProcedureStoreTracker { 045 private static final Logger LOG = LoggerFactory.getLogger(ProcedureStoreTracker.class); 046 047 // Key is procedure id corresponding to first bit of the bitmap. 048 private final TreeMap<Long, BitSetNode> map = new TreeMap<>(); 049 050 /** 051 * If true, do not remove bits corresponding to deleted procedures. Note that this can result 052 * in huge bitmaps overtime. 053 * Currently, it's set to true only when building tracker state from logs during recovery. During 054 * recovery, if we are sure that a procedure has been deleted, reading its old update entries 055 * can be skipped. 056 */ 057 private boolean keepDeletes = false; 058 /** 059 * If true, it means tracker has incomplete information about the active/deleted procedures. 060 * It's set to true only when recovering from old logs. See {@link #isDeleted(long)} docs to 061 * understand it's real use. 062 */ 063 boolean partial = false; 064 065 private long minModifiedProcId = Long.MAX_VALUE; 066 private long maxModifiedProcId = Long.MIN_VALUE; 067 068 public enum DeleteState { YES, NO, MAYBE } 069 070 public void resetToProto(ProcedureProtos.ProcedureStoreTracker trackerProtoBuf) { 071 reset(); 072 for (ProcedureProtos.ProcedureStoreTracker.TrackerNode protoNode : 073 trackerProtoBuf.getNodeList()) { 074 final BitSetNode node = new BitSetNode(protoNode); 075 map.put(node.getStart(), node); 076 } 077 } 078 079 /** 080 * Resets internal state to same as given {@code tracker}. Does deep copy of the bitmap. 081 */ 082 public void resetTo(ProcedureStoreTracker tracker) { 083 resetTo(tracker, false); 084 } 085 086 /** 087 * Resets internal state to same as given {@code tracker}, and change the deleted flag according 088 * to the modified flag if {@code resetDelete} is true. Does deep copy of the bitmap. 089 * <p/> 090 * The {@code resetDelete} will be set to true when building cleanup tracker, please see the 091 * comments in {@link BitSetNode#BitSetNode(BitSetNode, boolean)} to learn how we change the 092 * deleted flag if {@code resetDelete} is true. 093 */ 094 public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) { 095 reset(); 096 // resetDelete will true if we are building the cleanup tracker, as we will reset deleted flags 097 // for all the unmodified bits to 1, the partial flag is useless so set it to false for not 098 // confusing the developers when debugging. 099 this.partial = resetDelete ? false : tracker.partial; 100 this.minModifiedProcId = tracker.minModifiedProcId; 101 this.maxModifiedProcId = tracker.maxModifiedProcId; 102 this.keepDeletes = tracker.keepDeletes; 103 for (Map.Entry<Long, BitSetNode> entry : tracker.map.entrySet()) { 104 map.put(entry.getKey(), new BitSetNode(entry.getValue(), resetDelete)); 105 } 106 } 107 108 public void insert(long procId) { 109 insert(null, procId); 110 } 111 112 public void insert(long[] procIds) { 113 for (int i = 0; i < procIds.length; ++i) { 114 insert(procIds[i]); 115 } 116 } 117 118 public void insert(long procId, long[] subProcIds) { 119 BitSetNode node = update(null, procId); 120 for (int i = 0; i < subProcIds.length; ++i) { 121 node = insert(node, subProcIds[i]); 122 } 123 } 124 125 private BitSetNode insert(BitSetNode node, long procId) { 126 if (node == null || !node.contains(procId)) { 127 node = getOrCreateNode(procId); 128 } 129 node.insertOrUpdate(procId); 130 trackProcIds(procId); 131 return node; 132 } 133 134 public void update(long procId) { 135 update(null, procId); 136 } 137 138 private BitSetNode update(BitSetNode node, long procId) { 139 node = lookupClosestNode(node, procId); 140 assert node != null : "expected node to update procId=" + procId; 141 assert node.contains(procId) : "expected procId=" + procId + " in the node"; 142 if (node == null) { 143 throw new NullPointerException("pid=" + procId); 144 } 145 node.insertOrUpdate(procId); 146 trackProcIds(procId); 147 return node; 148 } 149 150 public void delete(long procId) { 151 delete(null, procId); 152 } 153 154 public void delete(final long[] procIds) { 155 Arrays.sort(procIds); 156 BitSetNode node = null; 157 for (int i = 0; i < procIds.length; ++i) { 158 node = delete(node, procIds[i]); 159 } 160 } 161 162 private BitSetNode delete(BitSetNode node, long procId) { 163 node = lookupClosestNode(node, procId); 164 if (node == null || !node.contains(procId)) { 165 LOG.warn("The BitSetNode for procId={} does not exist, maybe a double deletion?", procId); 166 return node; 167 } 168 node.delete(procId); 169 if (!keepDeletes && node.isEmpty()) { 170 // TODO: RESET if (map.size() == 1) 171 map.remove(node.getStart()); 172 } 173 174 trackProcIds(procId); 175 return node; 176 } 177 178 /** 179 * Will be called when restarting where we need to rebuild the ProcedureStoreTracker. 180 */ 181 public void setMinMaxModifiedProcIds(long min, long max) { 182 this.minModifiedProcId = min; 183 this.maxModifiedProcId = max; 184 } 185 /** 186 * This method is used when restarting where we need to rebuild the ProcedureStoreTracker. The 187 * {@link #delete(long)} method above assume that the {@link BitSetNode} exists, but when restart 188 * this is not true, as we will read the wal files in reverse order so a delete may come first. 189 */ 190 public void setDeleted(long procId, boolean isDeleted) { 191 BitSetNode node = getOrCreateNode(procId); 192 assert node.contains(procId) : "expected procId=" + procId + " in the node=" + node; 193 node.updateState(procId, isDeleted); 194 trackProcIds(procId); 195 } 196 197 /** 198 * Set the given bit for the procId to delete if it was modified before. 199 * <p/> 200 * This method is used to test whether a procedure wal file can be safely deleted, as if all the 201 * procedures in the given procedure wal file has been modified in the new procedure wal files, 202 * then we can delete it. 203 */ 204 public void setDeletedIfModified(long... procId) { 205 BitSetNode node = null; 206 for (int i = 0; i < procId.length; ++i) { 207 node = lookupClosestNode(node, procId[i]); 208 if (node != null && node.isModified(procId[i])) { 209 node.delete(procId[i]); 210 } 211 } 212 } 213 214 private void setDeleteIf(ProcedureStoreTracker tracker, 215 BiFunction<BitSetNode, Long, Boolean> func) { 216 BitSetNode trackerNode = null; 217 for (BitSetNode node : map.values()) { 218 long minProcId = node.getStart(); 219 long maxProcId = node.getEnd(); 220 for (long procId = minProcId; procId <= maxProcId; ++procId) { 221 if (!node.isModified(procId)) { 222 continue; 223 } 224 225 trackerNode = tracker.lookupClosestNode(trackerNode, procId); 226 if (func.apply(trackerNode, procId)) { 227 node.delete(procId); 228 } 229 } 230 } 231 } 232 233 /** 234 * For the global tracker, we will use this method to build the holdingCleanupTracker, as the 235 * modified flags will be cleared after rolling so we only need to test the deleted flags. 236 * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker) 237 */ 238 public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) { 239 setDeleteIf(tracker, (node, procId) -> node == null || !node.contains(procId) || 240 node.isDeleted(procId) == DeleteState.YES); 241 } 242 243 /** 244 * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code procId} are given by 245 * the {@code tracker}. If a procedure is modified by us, and also by the given {@code tracker}, 246 * then we mark it as deleted. 247 * @see #setDeletedIfModified(long...) 248 */ 249 public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) { 250 setDeleteIf(tracker, (node, procId) -> node != null && node.isModified(procId)); 251 } 252 253 /** 254 * lookup the node containing the specified procId. 255 * @param node cached node to check before doing a lookup 256 * @param procId the procId to lookup 257 * @return the node that may contains the procId or null 258 */ 259 private BitSetNode lookupClosestNode(final BitSetNode node, final long procId) { 260 if (node != null && node.contains(procId)) { 261 return node; 262 } 263 264 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 265 return entry != null ? entry.getValue() : null; 266 } 267 268 private void trackProcIds(long procId) { 269 minModifiedProcId = Math.min(minModifiedProcId, procId); 270 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 271 } 272 273 public long getModifiedMinProcId() { 274 return minModifiedProcId; 275 } 276 277 public long getModifiedMaxProcId() { 278 return maxModifiedProcId; 279 } 280 281 public void reset() { 282 this.keepDeletes = false; 283 this.partial = false; 284 this.map.clear(); 285 minModifiedProcId = Long.MAX_VALUE; 286 maxModifiedProcId = Long.MIN_VALUE; 287 } 288 289 public boolean isModified(long procId) { 290 final Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 291 return entry != null && entry.getValue().contains(procId) && 292 entry.getValue().isModified(procId); 293 } 294 295 /** 296 * If {@link #partial} is false, returns state from the bitmap. If no state is found for 297 * {@code procId}, returns YES. 298 * If partial is true, tracker doesn't have complete view of system state, so it returns MAYBE 299 * if there is no update for the procedure or if it doesn't have a state in bitmap. Otherwise, 300 * returns state from the bitmap. 301 */ 302 public DeleteState isDeleted(long procId) { 303 Map.Entry<Long, BitSetNode> entry = map.floorEntry(procId); 304 if (entry != null && entry.getValue().contains(procId)) { 305 BitSetNode node = entry.getValue(); 306 DeleteState state = node.isDeleted(procId); 307 return partial && !node.isModified(procId) ? DeleteState.MAYBE : state; 308 } 309 return partial ? DeleteState.MAYBE : DeleteState.YES; 310 } 311 312 public long getActiveMinProcId() { 313 Map.Entry<Long, BitSetNode> entry = map.firstEntry(); 314 return entry == null ? Procedure.NO_PROC_ID : entry.getValue().getActiveMinProcId(); 315 } 316 317 public void setKeepDeletes(boolean keepDeletes) { 318 this.keepDeletes = keepDeletes; 319 // If not to keep deletes, remove the BitSetNodes which are empty (i.e. contains ids of deleted 320 // procedures). 321 if (!keepDeletes) { 322 Iterator<Map.Entry<Long, BitSetNode>> it = map.entrySet().iterator(); 323 while (it.hasNext()) { 324 Map.Entry<Long, BitSetNode> entry = it.next(); 325 if (entry.getValue().isEmpty()) { 326 it.remove(); 327 } 328 } 329 } 330 } 331 332 public boolean isPartial() { 333 return partial; 334 } 335 336 public void setPartialFlag(boolean isPartial) { 337 if (this.partial && !isPartial) { 338 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 339 entry.getValue().unsetPartialFlag(); 340 } 341 } 342 this.partial = isPartial; 343 } 344 345 /** 346 * @return true, if no procedure is active, else false. 347 */ 348 public boolean isEmpty() { 349 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 350 if (!entry.getValue().isEmpty()) { 351 return false; 352 } 353 } 354 return true; 355 } 356 357 /** 358 * @return true if all procedure was modified or deleted since last call to 359 * {@link #resetModified()}. 360 */ 361 public boolean isAllModified() { 362 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 363 if (!entry.getValue().isAllModified()) { 364 return false; 365 } 366 } 367 return true; 368 } 369 370 /** 371 * Will be used when there are too many proc wal files. We will rewrite the states of the active 372 * procedures in the oldest proc wal file so that we can delete it. 373 * @return all the active procedure ids in this tracker. 374 */ 375 public long[] getAllActiveProcIds() { 376 return map.values().stream().map(BitSetNode::getActiveProcIds).filter(p -> p.length > 0) 377 .flatMapToLong(LongStream::of).toArray(); 378 } 379 380 /** 381 * Clears the list of updated procedure ids. This doesn't affect global list of active 382 * procedure ids. 383 */ 384 public void resetModified() { 385 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 386 entry.getValue().resetModified(); 387 } 388 minModifiedProcId = Long.MAX_VALUE; 389 maxModifiedProcId = Long.MIN_VALUE; 390 } 391 392 private BitSetNode getOrCreateNode(long procId) { 393 // If procId can fit in left node (directly or by growing it) 394 BitSetNode leftNode = null; 395 boolean leftCanGrow = false; 396 Map.Entry<Long, BitSetNode> leftEntry = map.floorEntry(procId); 397 if (leftEntry != null) { 398 leftNode = leftEntry.getValue(); 399 if (leftNode.contains(procId)) { 400 return leftNode; 401 } 402 leftCanGrow = leftNode.canGrow(procId); 403 } 404 405 // If procId can fit in right node (directly or by growing it) 406 BitSetNode rightNode = null; 407 boolean rightCanGrow = false; 408 Map.Entry<Long, BitSetNode> rightEntry = map.ceilingEntry(procId); 409 if (rightEntry != null) { 410 rightNode = rightEntry.getValue(); 411 rightCanGrow = rightNode.canGrow(procId); 412 if (leftNode != null) { 413 if (leftNode.canMerge(rightNode)) { 414 // merge left and right node 415 return mergeNodes(leftNode, rightNode); 416 } 417 418 // If left and right nodes can not merge, decide which one to grow. 419 if (leftCanGrow && rightCanGrow) { 420 if ((procId - leftNode.getEnd()) <= (rightNode.getStart() - procId)) { 421 return growNode(leftNode, procId); 422 } 423 return growNode(rightNode, procId); 424 } 425 } 426 } 427 428 // grow the left node 429 if (leftCanGrow) { 430 return growNode(leftNode, procId); 431 } 432 433 // grow the right node 434 if (rightCanGrow) { 435 return growNode(rightNode, procId); 436 } 437 438 // add new node if there are no left/right nodes which can be used. 439 BitSetNode node = new BitSetNode(procId, partial); 440 map.put(node.getStart(), node); 441 return node; 442 } 443 444 /** 445 * Grows {@code node} to contain {@code procId} and updates the map. 446 * @return {@link BitSetNode} instance which contains {@code procId}. 447 */ 448 private BitSetNode growNode(BitSetNode node, long procId) { 449 map.remove(node.getStart()); 450 node.grow(procId); 451 map.put(node.getStart(), node); 452 return node; 453 } 454 455 /** 456 * Merges {@code leftNode} & {@code rightNode} and updates the map. 457 */ 458 private BitSetNode mergeNodes(BitSetNode leftNode, BitSetNode rightNode) { 459 assert leftNode.getStart() < rightNode.getStart(); 460 leftNode.merge(rightNode); 461 map.remove(rightNode.getStart()); 462 return leftNode; 463 } 464 465 public void dump() { 466 System.out.println("map " + map.size()); 467 System.out.println("isAllModified " + isAllModified()); 468 System.out.println("isEmpty " + isEmpty()); 469 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 470 entry.getValue().dump(); 471 } 472 } 473 474 // ======================================================================== 475 // Convert to/from Protocol Buffer. 476 // ======================================================================== 477 478 /** 479 * Builds 480 * org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker 481 * protocol buffer from current state. 482 */ 483 public ProcedureProtos.ProcedureStoreTracker toProto() throws IOException { 484 ProcedureProtos.ProcedureStoreTracker.Builder builder = 485 ProcedureProtos.ProcedureStoreTracker.newBuilder(); 486 for (Map.Entry<Long, BitSetNode> entry : map.entrySet()) { 487 builder.addNode(entry.getValue().convert()); 488 } 489 return builder.build(); 490 } 491}