001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.wal; 019 020import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; 021 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Set; 029import java.util.TreeMap; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ConcurrentMap; 032import java.util.stream.Collectors; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.ImmutableByteArray; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Accounting of sequence ids per region and then by column family. So we can keep our accounting 042 * current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can 043 * keep abreast of the state of sequence id persistence. Also call update per append. 044 * <p> 045 * For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by 046 * {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use 047 * it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because 048 * hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See 049 * HBASE-16278 for more details. 050 * </p> 051 */ 052@InterfaceAudience.Private 053class SequenceIdAccounting { 054 private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class); 055 056 /** 057 * This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and 058 * {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the lowest 059 * outstanding sequence ids EXCEPT when flushing. When we flush, the current lowest set for the 060 * region/column family are moved (atomically because of this lock) to 061 * {@link #flushingSequenceIds}. 062 * <p> 063 * The two Maps are tied by this locking object EXCEPT when we go to update the lowest entry; see 064 * {@link #lowestUnflushedSequenceIds}. In here is a putIfAbsent call on 065 * {@link #lowestUnflushedSequenceIds}. In this latter case, we will add this lowest sequence id 066 * if we find that there is no entry for the current column family. There will be no entry only if 067 * we just came up OR we have moved aside current set of lowest sequence ids because the current 068 * set are being flushed (by putting them into {@link #flushingSequenceIds}). This is how we pick 069 * up the next 'lowest' sequence id per region per column family to be used figuring what is in 070 * the next flush. 071 */ 072 private final Object tieLock = new Object(); 073 074 /** 075 * Map of encoded region names and family names to their OLDEST -- i.e. their first, the 076 * longest-lived, their 'earliest', the 'lowest' -- sequence id. 077 * <p> 078 * When we flush, the current lowest sequence ids get cleared and added to 079 * {@link #flushingSequenceIds}. The next append that comes in, is then added here to 080 * {@link #lowestUnflushedSequenceIds} as the next lowest sequenceid. 081 * <p> 082 * If flush fails, currently server is aborted so no need to restore previous sequence ids. 083 * <p> 084 * Needs to be concurrent Maps because we use putIfAbsent updating oldest. 085 */ 086 private final ConcurrentMap<byte[], 087 ConcurrentMap<ImmutableByteArray, Long>> lowestUnflushedSequenceIds = new ConcurrentHashMap<>(); 088 089 /** 090 * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id 091 * currently being flushed out to hfiles. Entries are moved here from 092 * {@link #lowestUnflushedSequenceIds} while the lock {@link #tieLock} is held (so movement 093 * between the Maps is atomic). 094 */ 095 private final Map<byte[], Map<ImmutableByteArray, Long>> flushingSequenceIds = new HashMap<>(); 096 097 /** 098 * <p> 099 * Map of region encoded names to the latest/highest region sequence id. Updated on each call to 100 * append. 101 * </p> 102 * <p> 103 * This map uses byte[] as the key, and uses reference equality. It works in our use case as we 104 * use {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()} as keys. For a 105 * given region, it always returns the same array. 106 * </p> 107 */ 108 private Map<byte[], Long> highestSequenceIds = new HashMap<>(); 109 110 /** 111 * Returns the lowest unflushed sequence id for the region. 112 * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will return 113 * {@link HConstants#NO_SEQNUM} when none. 114 */ 115 long getLowestSequenceId(final byte[] encodedRegionName) { 116 synchronized (this.tieLock) { 117 Map<?, Long> m = this.flushingSequenceIds.get(encodedRegionName); 118 long flushingLowest = m != null ? getLowestSequenceId(m) : Long.MAX_VALUE; 119 m = this.lowestUnflushedSequenceIds.get(encodedRegionName); 120 long unflushedLowest = m != null ? getLowestSequenceId(m) : HConstants.NO_SEQNUM; 121 return Math.min(flushingLowest, unflushedLowest); 122 } 123 } 124 125 /** 126 * @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and 127 * <code>familyName</code>. Returned sequenceid may be for an edit currently being 128 * flushed. 129 */ 130 long getLowestSequenceId(final byte[] encodedRegionName, final byte[] familyName) { 131 ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); 132 synchronized (this.tieLock) { 133 Map<ImmutableByteArray, Long> m = this.flushingSequenceIds.get(encodedRegionName); 134 if (m != null) { 135 Long lowest = m.get(familyNameWrapper); 136 if (lowest != null) { 137 return lowest; 138 } 139 } 140 m = this.lowestUnflushedSequenceIds.get(encodedRegionName); 141 if (m != null) { 142 Long lowest = m.get(familyNameWrapper); 143 if (lowest != null) { 144 return lowest; 145 } 146 } 147 } 148 return HConstants.NO_SEQNUM; 149 } 150 151 /** 152 * Reset the accounting of highest sequenceid by regionname. 153 * @return Return the previous accounting Map of regions to the last sequence id written into 154 * each. 155 */ 156 Map<byte[], Long> resetHighest() { 157 Map<byte[], Long> old = this.highestSequenceIds; 158 this.highestSequenceIds = new HashMap<>(); 159 return old; 160 } 161 162 /** 163 * We've been passed a new sequenceid for the region. Set it as highest seen for this region and 164 * if we are to record oldest, or lowest sequenceids, save it as oldest seen if nothing currently 165 * older. 166 * @param lowest Whether to keep running account of oldest sequence id. 167 */ 168 void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid, 169 final boolean lowest) { 170 Long l = Long.valueOf(sequenceid); 171 this.highestSequenceIds.put(encodedRegionName, l); 172 if (lowest) { 173 ConcurrentMap<ImmutableByteArray, Long> m = getOrCreateLowestSequenceIds(encodedRegionName); 174 for (byte[] familyName : families) { 175 m.putIfAbsent(ImmutableByteArray.wrap(familyName), l); 176 } 177 } 178 } 179 180 /** 181 * Clear all the records of the given region as it is going to be closed. 182 * <p/> 183 * We will call this once we get the region close marker. We need this because that, if we use 184 * Durability.ASYNC_WAL, after calling startCacheFlush, we may still get some ongoing wal entries 185 * that has not been processed yet, this will lead to orphan records in the 186 * lowestUnflushedSequenceIds and then cause too many WAL files. 187 * <p/> 188 * See HBASE-23157 for more details. 189 */ 190 void onRegionClose(byte[] encodedRegionName) { 191 synchronized (tieLock) { 192 this.lowestUnflushedSequenceIds.remove(encodedRegionName); 193 Map<ImmutableByteArray, Long> flushing = this.flushingSequenceIds.remove(encodedRegionName); 194 if (flushing != null) { 195 LOG.warn("Still have flushing records when closing {}, {}", 196 Bytes.toString(encodedRegionName), 197 flushing.entrySet().stream().map(e -> e.getKey().toString() + "->" + e.getValue()) 198 .collect(Collectors.joining(",", "{", "}"))); 199 } 200 } 201 this.highestSequenceIds.remove(encodedRegionName); 202 } 203 204 /** 205 * Update the store sequence id, e.g., upon executing in-memory compaction 206 */ 207 void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceId, 208 boolean onlyIfGreater) { 209 if (sequenceId == null) { 210 return; 211 } 212 Long highest = this.highestSequenceIds.get(encodedRegionName); 213 if (highest == null || sequenceId > highest) { 214 this.highestSequenceIds.put(encodedRegionName, sequenceId); 215 } 216 ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap(familyName); 217 synchronized (this.tieLock) { 218 ConcurrentMap<ImmutableByteArray, Long> m = getOrCreateLowestSequenceIds(encodedRegionName); 219 boolean replaced = false; 220 while (!replaced) { 221 Long oldSeqId = m.get(familyNameWrapper); 222 if (oldSeqId == null) { 223 m.put(familyNameWrapper, sequenceId); 224 replaced = true; 225 } else if (onlyIfGreater) { 226 if (sequenceId > oldSeqId) { 227 replaced = m.replace(familyNameWrapper, oldSeqId, sequenceId); 228 } else { 229 return; 230 } 231 } else { // replace even if sequence id is not greater than oldSeqId 232 m.put(familyNameWrapper, sequenceId); 233 return; 234 } 235 } 236 } 237 } 238 239 ConcurrentMap<ImmutableByteArray, Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) { 240 // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. 241 return computeIfAbsent(this.lowestUnflushedSequenceIds, encodedRegionName, 242 ConcurrentHashMap::new); 243 } 244 245 /** 246 * @param sequenceids Map to search for lowest value. 247 * @return Lowest value found in <code>sequenceids</code>. 248 */ 249 private static long getLowestSequenceId(Map<?, Long> sequenceids) { 250 long lowest = HConstants.NO_SEQNUM; 251 for (Map.Entry<?, Long> entry : sequenceids.entrySet()) { 252 if (entry.getKey().toString().equals("METAFAMILY")) { 253 continue; 254 } 255 Long sid = entry.getValue(); 256 if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) { 257 lowest = sid.longValue(); 258 } 259 } 260 return lowest; 261 } 262 263 /** 264 * @return New Map that has same keys as <code>src</code> but instead of a Map for a value, it 265 * instead has found the smallest sequence id and it returns that as the value instead. 266 */ 267 private <T extends Map<?, Long>> Map<byte[], Long> flattenToLowestSequenceId(Map<byte[], T> src) { 268 if (src == null || src.isEmpty()) { 269 return null; 270 } 271 Map<byte[], Long> tgt = new HashMap<>(); 272 for (Map.Entry<byte[], T> entry : src.entrySet()) { 273 long lowestSeqId = getLowestSequenceId(entry.getValue()); 274 if (lowestSeqId != HConstants.NO_SEQNUM) { 275 tgt.put(entry.getKey(), lowestSeqId); 276 } 277 } 278 return tgt; 279 } 280 281 /** 282 * @param encodedRegionName Region to flush. 283 * @param families Families to flush. May be a subset of all families in the region. 284 * @return Returns {@link HConstants#NO_SEQNUM} if we are flushing the whole region OR if we are 285 * flushing a subset of all families but there are no edits in those families not being 286 * flushed; in other words, this is effectively same as a flush of all of the region 287 * though we were passed a subset of regions. Otherwise, it returns the sequence id of the 288 * oldest/lowest outstanding edit. 289 */ 290 Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) { 291 Map<byte[], Long> familytoSeq = new HashMap<>(); 292 for (byte[] familyName : families) { 293 familytoSeq.put(familyName, HConstants.NO_SEQNUM); 294 } 295 return startCacheFlush(encodedRegionName, familytoSeq); 296 } 297 298 Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> familyToSeq) { 299 Map<ImmutableByteArray, Long> oldSequenceIds = null; 300 Long lowestUnflushedInRegion = HConstants.NO_SEQNUM; 301 synchronized (tieLock) { 302 Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName); 303 if (m != null) { 304 // NOTE: Removal from this.lowestUnflushedSequenceIds must be done in controlled 305 // circumstance because another concurrent thread now may add sequenceids for this family 306 // (see above in getOrCreateLowestSequenceId). Make sure you are ok with this. Usually it 307 // is fine because updates are blocked when this method is called. Make sure!!! 308 for (Map.Entry<byte[], Long> entry : familyToSeq.entrySet()) { 309 ImmutableByteArray familyNameWrapper = ImmutableByteArray.wrap((byte[]) entry.getKey()); 310 Long seqId = null; 311 if (entry.getValue() == HConstants.NO_SEQNUM) { 312 seqId = m.remove(familyNameWrapper); 313 } else { 314 seqId = m.replace(familyNameWrapper, entry.getValue()); 315 } 316 if (seqId != null) { 317 if (oldSequenceIds == null) { 318 oldSequenceIds = new HashMap<>(); 319 } 320 oldSequenceIds.put(familyNameWrapper, seqId); 321 } 322 } 323 if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) { 324 if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) { 325 LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) 326 + ", sequenceid=" + oldSequenceIds); 327 } 328 } 329 if (m.isEmpty()) { 330 // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever 331 // even if the region is already moved to other server. 332 // Do not worry about data racing, we held write lock of region when calling 333 // startCacheFlush, so no one can add value to the map we removed. 334 this.lowestUnflushedSequenceIds.remove(encodedRegionName); 335 } else { 336 // Flushing a subset of the region families. Return the sequence id of the oldest entry. 337 lowestUnflushedInRegion = Collections.min(m.values()); 338 } 339 } 340 } 341 // Do this check outside lock. 342 if (oldSequenceIds != null && oldSequenceIds.isEmpty()) { 343 // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either 344 // the region is already flushing (which would make this call invalid), or there 345 // were no appends after last flush, so why are we starting flush? Maybe we should 346 // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. 347 // For now preserve old logic. 348 LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName)); 349 } 350 return lowestUnflushedInRegion; 351 } 352 353 void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { 354 // This is a simple hack to avoid maxFlushedSeqId go backwards. 355 // The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going 356 // to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may 357 // still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and 358 // then it will be recorded as the lowestUnflushedSeqId by the above update method, which is 359 // less than the current maxFlushedSeqId. And if next time we only flush the family with this 360 // unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards. 361 // This is an unexpected behavior so we should fix it, otherwise it may cause unexpected 362 // behavior in other area. 363 // The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with 364 // maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less 365 // than or equal to it have been flushed, i.e, persistent to HFile, so set 366 // lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss. 367 // And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it 368 // means we have flushed all the stores so the seq id for actual data should be at least plus 1. 369 // And if we do not flush all the stores, then the maxFlushedSeqId is calculated by 370 // lowestUnflushedSeqId - 1, so here let's plus the 1 back. 371 Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1); 372 synchronized (tieLock) { 373 this.flushingSequenceIds.remove(encodedRegionName); 374 Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName); 375 if (unflushed == null) { 376 return; 377 } 378 for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) { 379 if (e.getValue().longValue() <= maxFlushedSeqId) { 380 e.setValue(wrappedSeqId); 381 } 382 } 383 } 384 } 385 386 void abortCacheFlush(final byte[] encodedRegionName) { 387 // Method is called when we are crashing down because failed write flush AND it is called 388 // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids. 389 Map<ImmutableByteArray, Long> flushing = null; 390 Map<ImmutableByteArray, Long> tmpMap = new HashMap<>(); 391 // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what 392 // happened in startCacheFlush. During prepare phase, we have update lock on the region so 393 // no edits should be coming in via append. 394 synchronized (tieLock) { 395 flushing = this.flushingSequenceIds.remove(encodedRegionName); 396 if (flushing != null) { 397 Map<ImmutableByteArray, Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName); 398 for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) { 399 // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this 400 // value, it will now be in tmpMap. 401 tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue())); 402 } 403 } 404 } 405 406 // Here we are doing some 'test' to see if edits are going in out of order. What is it for? 407 // Carried over from old code. 408 if (flushing != null) { 409 for (Map.Entry<ImmutableByteArray, Long> e : flushing.entrySet()) { 410 Long currentId = tmpMap.get(e.getKey()); 411 if (currentId != null && currentId.longValue() < e.getValue().longValue()) { 412 String errorStr = Bytes.toString(encodedRegionName) + " family " + e.getKey().toString() 413 + " acquired edits out of order current memstore seq=" + currentId 414 + ", previous oldest unflushed id=" + e.getValue(); 415 LOG.error(errorStr); 416 Runtime.getRuntime().halt(1); 417 } 418 } 419 } 420 } 421 422 /** 423 * See if passed <code>sequenceids</code> are lower -- i.e. earlier -- than any outstanding 424 * sequenceids, sequenceids we are holding on to in this accounting instance. 425 * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to 426 * be null). 427 * @param keysBlocking An optional collection that is used to return the specific keys that are 428 * causing this method to return false. 429 * @return true if all sequenceids are lower, older than, the old sequenceids in this instance. 430 */ 431 boolean areAllLower(Map<byte[], Long> sequenceids, Collection<byte[]> keysBlocking) { 432 Map<byte[], Long> flushing = null; 433 Map<byte[], Long> unflushed = null; 434 synchronized (this.tieLock) { 435 // Get a flattened -- only the oldest sequenceid -- copy of current flushing and unflushed 436 // data structures to use in tests below. 437 flushing = flattenToLowestSequenceId(this.flushingSequenceIds); 438 unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds); 439 } 440 boolean result = true; 441 for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) { 442 long oldestFlushing = Long.MAX_VALUE; 443 long oldestUnflushed = Long.MAX_VALUE; 444 if (flushing != null && flushing.containsKey(e.getKey())) { 445 oldestFlushing = flushing.get(e.getKey()); 446 } 447 if (unflushed != null && unflushed.containsKey(e.getKey())) { 448 oldestUnflushed = unflushed.get(e.getKey()); 449 } 450 long min = Math.min(oldestFlushing, oldestUnflushed); 451 if (min <= e.getValue()) { 452 if (keysBlocking == null) { 453 return false; 454 } 455 result = false; 456 keysBlocking.add(e.getKey()); 457 // Continue examining the map so we could log all regions blocking this WAL. 458 } 459 } 460 return result; 461 } 462 463 /** 464 * Iterates over the given Map and compares sequence ids with corresponding entries in 465 * {@link #lowestUnflushedSequenceIds}. If a region in {@link #lowestUnflushedSequenceIds} has a 466 * sequence id less than that passed in <code>sequenceids</code> then return it. 467 * @param sequenceids Sequenceids keyed by encoded region name. 468 * @return stores of regions found in this instance with sequence ids less than those passed in. 469 */ 470 Map<byte[], List<byte[]>> findLower(Map<byte[], Long> sequenceids) { 471 Map<byte[], List<byte[]>> toFlush = null; 472 // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. 473 synchronized (tieLock) { 474 for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) { 475 Map<ImmutableByteArray, Long> m = this.lowestUnflushedSequenceIds.get(e.getKey()); 476 if (m == null) { 477 continue; 478 } 479 for (Map.Entry<ImmutableByteArray, Long> me : m.entrySet()) { 480 if (me.getValue() <= e.getValue()) { 481 if (toFlush == null) { 482 toFlush = new TreeMap(Bytes.BYTES_COMPARATOR); 483 } 484 toFlush.computeIfAbsent(e.getKey(), k -> new ArrayList<>()) 485 .add(Bytes.toBytes(me.getKey().toString())); 486 } 487 } 488 } 489 } 490 return toFlush; 491 } 492}