001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.NavigableMap; 025import java.util.TreeMap; 026import java.util.UUID; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 031import org.apache.hadoop.hbase.regionserver.SequenceId; 032import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 033import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.yetus.audience.InterfaceAudience; 037 038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 040import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; 046 047/** 048 * Default implementation of Key for an Entry in the WAL. 049 * For internal use only though Replication needs to have access. 050 * 051 * The log intermingles edits to many tables and rows, so each log entry 052 * identifies the appropriate table and row. Within a table and row, they're 053 * also sorted. 054 * 055 * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. 056 * 057 */ 058// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical 059// purposes. They need to be merged into WALEntry. 060@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION}) 061public class WALKeyImpl implements WALKey { 062 public static final WALKeyImpl EMPTY_WALKEYIMPL = new WALKeyImpl(); 063 064 public MultiVersionConcurrencyControl getMvcc() { 065 return mvcc; 066 } 067 068 /** 069 * Use it to complete mvcc transaction. This WALKeyImpl was part of 070 * (the transaction is started when you call append; see the comment on FSHLog#append). To 071 * complete call 072 * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} 073 * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} 074 * @return A WriteEntry gotten from local WAL subsystem. 075 * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry) 076 */ 077 public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() { 078 return this.writeEntry; 079 } 080 081 public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { 082 assert this.writeEntry == null; 083 this.writeEntry = writeEntry; 084 // Set our sequenceid now using WriteEntry. 085 this.sequenceId = writeEntry.getWriteNumber(); 086 } 087 088 private byte [] encodedRegionName; 089 090 private TableName tablename; 091 092 /** 093 * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is 094 * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. 095 */ 096 private long sequenceId; 097 098 /** 099 * Used during WAL replay; the sequenceId of the edit when it came into the system. 100 */ 101 private long origLogSeqNum = 0; 102 103 /** Time at which this edit was written. */ 104 private long writeTime; 105 106 /** The first element in the list is the cluster id on which the change has originated */ 107 private List<UUID> clusterIds; 108 109 private NavigableMap<byte[], Integer> replicationScope; 110 111 private long nonceGroup = HConstants.NO_NONCE; 112 private long nonce = HConstants.NO_NONCE; 113 private MultiVersionConcurrencyControl mvcc; 114 115 /** 116 * Set in a way visible to multiple threads; e.g. synchronized getter/setters. 117 */ 118 private MultiVersionConcurrencyControl.WriteEntry writeEntry; 119 120 private CompressionContext compressionContext; 121 122 public WALKeyImpl() { 123 init(null, null, 0L, HConstants.LATEST_TIMESTAMP, 124 new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); 125 } 126 127 public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) { 128 init(null, null, 0L, HConstants.LATEST_TIMESTAMP, 129 new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); 130 } 131 132 @VisibleForTesting 133 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 134 long logSeqNum, 135 final long now, UUID clusterId) { 136 List<UUID> clusterIds = new ArrayList<>(1); 137 clusterIds.add(clusterId); 138 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, 139 HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); 140 } 141 142 // TODO: Fix being able to pass in sequenceid. 143 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now) { 144 init(encodedRegionName, 145 tablename, 146 NO_SEQUENCE_ID, 147 now, 148 EMPTY_UUIDS, 149 HConstants.NO_NONCE, 150 HConstants.NO_NONCE, 151 null, null); 152 } 153 154 // TODO: Fix being able to pass in sequenceid. 155 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, 156 final NavigableMap<byte[], Integer> replicationScope) { 157 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, 158 HConstants.NO_NONCE, null, replicationScope); 159 } 160 161 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, 162 MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) { 163 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, 164 HConstants.NO_NONCE, mvcc, replicationScope); 165 } 166 167 public WALKeyImpl(final byte[] encodedRegionName, 168 final TableName tablename, 169 final long now, 170 MultiVersionConcurrencyControl mvcc) { 171 init(encodedRegionName, 172 tablename, 173 NO_SEQUENCE_ID, 174 now, 175 EMPTY_UUIDS, 176 HConstants.NO_NONCE, 177 HConstants.NO_NONCE, 178 mvcc, null); 179 } 180 181 /** 182 * Create the log key for writing to somewhere. 183 * We maintain the tablename mainly for debugging purposes. 184 * A regionName is always a sub-table object. 185 * <p>Used by log splitting and snapshots. 186 * 187 * @param encodedRegionName Encoded name of the region as returned by 188 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 189 * @param tablename - name of table 190 * @param logSeqNum - log sequence number 191 * @param now Time at which this edit was written. 192 * @param clusterIds the clusters that have consumed the change(used in Replication) 193 * @param nonceGroup the nonceGroup 194 * @param nonce the nonce 195 * @param mvcc the mvcc associate the WALKeyImpl 196 * @param replicationScope the non-default replication scope 197 * associated with the region's column families 198 */ 199 // TODO: Fix being able to pass in sequenceid. 200 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, 201 final long now, List<UUID> clusterIds, long nonceGroup, long nonce, 202 MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) { 203 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, 204 replicationScope); 205 } 206 207 /** 208 * Create the log key for writing to somewhere. 209 * We maintain the tablename mainly for debugging purposes. 210 * A regionName is always a sub-table object. 211 * <p>Used by log splitting and snapshots. 212 * 213 * @param encodedRegionName Encoded name of the region as returned by 214 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 215 * @param tablename - name of table 216 * @param logSeqNum - log sequence number 217 * @param now Time at which this edit was written. 218 * @param clusterIds the clusters that have consumed the change(used in Replication) 219 */ 220 // TODO: Fix being able to pass in sequenceid. 221 public WALKeyImpl(final byte[] encodedRegionName, 222 final TableName tablename, 223 long logSeqNum, 224 final long now, 225 List<UUID> clusterIds, 226 long nonceGroup, 227 long nonce, 228 MultiVersionConcurrencyControl mvcc) { 229 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); 230 } 231 232 /** 233 * Create the log key for writing to somewhere. 234 * We maintain the tablename mainly for debugging purposes. 235 * A regionName is always a sub-table object. 236 * 237 * @param encodedRegionName Encoded name of the region as returned by 238 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 239 * @param tablename the tablename 240 * @param now Time at which this edit was written. 241 * @param clusterIds the clusters that have consumed the change(used in Replication) 242 * @param nonceGroup 243 * @param nonce 244 * @param mvcc mvcc control used to generate sequence numbers and control read/write points 245 */ 246 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 247 final long now, List<UUID> clusterIds, long nonceGroup, 248 final long nonce, final MultiVersionConcurrencyControl mvcc) { 249 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, 250 null); 251 } 252 253 /** 254 * Create the log key for writing to somewhere. 255 * We maintain the tablename mainly for debugging purposes. 256 * A regionName is always a sub-table object. 257 * 258 * @param encodedRegionName Encoded name of the region as returned by 259 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 260 * @param tablename 261 * @param now Time at which this edit was written. 262 * @param clusterIds the clusters that have consumed the change(used in Replication) 263 * @param nonceGroup the nonceGroup 264 * @param nonce the nonce 265 * @param mvcc mvcc control used to generate sequence numbers and control read/write points 266 * @param replicationScope the non-default replication scope of the column families 267 */ 268 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 269 final long now, List<UUID> clusterIds, long nonceGroup, 270 final long nonce, final MultiVersionConcurrencyControl mvcc, 271 NavigableMap<byte[], Integer> replicationScope) { 272 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, 273 replicationScope); 274 } 275 276 /** 277 * Create the log key for writing to somewhere. 278 * We maintain the tablename mainly for debugging purposes. 279 * A regionName is always a sub-table object. 280 * 281 * @param encodedRegionName Encoded name of the region as returned by 282 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 283 * @param tablename 284 * @param logSeqNum 285 * @param nonceGroup 286 * @param nonce 287 */ 288 // TODO: Fix being able to pass in sequenceid. 289 public WALKeyImpl(final byte[] encodedRegionName, 290 final TableName tablename, 291 long logSeqNum, 292 long nonceGroup, 293 long nonce, 294 final MultiVersionConcurrencyControl mvcc) { 295 init(encodedRegionName, 296 tablename, 297 logSeqNum, 298 EnvironmentEdgeManager.currentTime(), 299 EMPTY_UUIDS, 300 nonceGroup, 301 nonce, 302 mvcc, null); 303 } 304 305 @InterfaceAudience.Private 306 protected void init(final byte[] encodedRegionName, 307 final TableName tablename, 308 long logSeqNum, 309 final long now, 310 List<UUID> clusterIds, 311 long nonceGroup, 312 long nonce, 313 MultiVersionConcurrencyControl mvcc, 314 NavigableMap<byte[], Integer> replicationScope) { 315 this.sequenceId = logSeqNum; 316 this.writeTime = now; 317 this.clusterIds = clusterIds; 318 this.encodedRegionName = encodedRegionName; 319 this.tablename = tablename; 320 this.nonceGroup = nonceGroup; 321 this.nonce = nonce; 322 this.mvcc = mvcc; 323 if (logSeqNum != NO_SEQUENCE_ID) { 324 setSequenceId(logSeqNum); 325 } 326 this.replicationScope = replicationScope; 327 } 328 329 // For deserialization. DO NOT USE. See setWriteEntry below. 330 @InterfaceAudience.Private 331 protected void setSequenceId(long sequenceId) { 332 this.sequenceId = sequenceId; 333 } 334 335 /** 336 * @param compressionContext Compression context to use 337 */ 338 public void setCompressionContext(CompressionContext compressionContext) { 339 this.compressionContext = compressionContext; 340 } 341 342 /** @return encoded region name */ 343 @Override 344 public byte [] getEncodedRegionName() { 345 return encodedRegionName; 346 } 347 348 /** @return table name */ 349 @Override 350 public TableName getTableName() { 351 return tablename; 352 } 353 354 /** @return log sequence number 355 * @deprecated Use {@link #getSequenceId()} 356 */ 357 @Deprecated 358 public long getLogSeqNum() { 359 return getSequenceId(); 360 } 361 362 /** 363 * Used to set original sequenceId for WALKeyImpl during WAL replay 364 */ 365 public void setOrigLogSeqNum(final long sequenceId) { 366 this.origLogSeqNum = sequenceId; 367 } 368 369 /** 370 * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an 371 * edit that came in when replaying WALs of a crashed server. 372 * @return original sequence number of the WALEdit 373 */ 374 @Override 375 public long getOrigLogSeqNum() { 376 return this.origLogSeqNum; 377 } 378 379 /** 380 * SequenceId is only available post WAL-assign. Calls before this will get you a 381 * {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this 382 * method for more on when this sequenceId comes available. 383 * @return long the new assigned sequence number 384 */ 385 @Override 386 public long getSequenceId() { 387 return this.sequenceId; 388 } 389 390 /** 391 * @return the write time 392 */ 393 @Override 394 public long getWriteTime() { 395 return this.writeTime; 396 } 397 398 public NavigableMap<byte[], Integer> getReplicationScopes() { 399 return replicationScope; 400 } 401 402 /** @return The nonce group */ 403 @Override 404 public long getNonceGroup() { 405 return nonceGroup; 406 } 407 408 /** @return The nonce */ 409 @Override 410 public long getNonce() { 411 return nonce; 412 } 413 414 private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) { 415 this.replicationScope = replicationScope; 416 } 417 418 public void serializeReplicationScope(boolean serialize) { 419 if (!serialize) { 420 setReplicationScope(null); 421 } 422 } 423 424 /** 425 * Marks that the cluster with the given clusterId has consumed the change 426 */ 427 public void addClusterId(UUID clusterId) { 428 if (!clusterIds.contains(clusterId)) { 429 clusterIds.add(clusterId); 430 } 431 } 432 433 /** 434 * @return the set of cluster Ids that have consumed the change 435 */ 436 public List<UUID> getClusterIds() { 437 return clusterIds; 438 } 439 440 /** 441 * @return the cluster id on which the change has originated. It there is no such cluster, it 442 * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) 443 */ 444 @Override 445 public UUID getOriginatingClusterId(){ 446 return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0); 447 } 448 449 @Override 450 public String toString() { 451 return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId; 452 } 453 454 @Override 455 public boolean equals(Object obj) { 456 if (this == obj) { 457 return true; 458 } 459 if (obj == null || getClass() != obj.getClass()) { 460 return false; 461 } 462 return compareTo((WALKey)obj) == 0; 463 } 464 465 @Override 466 public int hashCode() { 467 int result = Bytes.hashCode(this.encodedRegionName); 468 result = (int) (result ^ getSequenceId()); 469 result = (int) (result ^ this.writeTime); 470 return result; 471 } 472 473 @Override 474 public int compareTo(WALKey o) { 475 int result = Bytes.compareTo(this.encodedRegionName, o.getEncodedRegionName()); 476 if (result == 0) { 477 long sid = getSequenceId(); 478 long otherSid = o.getSequenceId(); 479 if (sid < otherSid) { 480 result = -1; 481 } else if (sid > otherSid) { 482 result = 1; 483 } 484 if (result == 0) { 485 if (this.writeTime < o.getWriteTime()) { 486 result = -1; 487 } else if (this.writeTime > o.getWriteTime()) { 488 return 1; 489 } 490 } 491 } 492 // why isn't cluster id accounted for? 493 return result; 494 } 495 496 /** 497 * Drop this instance's tablename byte array and instead 498 * hold a reference to the provided tablename. This is not 499 * meant to be a general purpose setter - it's only used 500 * to collapse references to conserve memory. 501 */ 502 void internTableName(TableName tablename) { 503 // We should not use this as a setter - only to swap 504 // in a new reference to the same table name. 505 assert tablename.equals(this.tablename); 506 this.tablename = tablename; 507 } 508 509 /** 510 * Drop this instance's region name byte array and instead 511 * hold a reference to the provided region name. This is not 512 * meant to be a general purpose setter - it's only used 513 * to collapse references to conserve memory. 514 */ 515 void internEncodedRegionName(byte []encodedRegionName) { 516 // We should not use this as a setter - only to swap 517 // in a new reference to the same table name. 518 assert Bytes.equals(this.encodedRegionName, encodedRegionName); 519 this.encodedRegionName = encodedRegionName; 520 } 521 522 public WALProtos.WALKey.Builder getBuilder( 523 WALCellCodec.ByteStringCompressor compressor) throws IOException { 524 WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); 525 if (compressionContext == null) { 526 builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName)); 527 builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName())); 528 } else { 529 builder.setEncodedRegionName(compressor.compress(this.encodedRegionName, 530 compressionContext.regionDict)); 531 builder.setTableName(compressor.compress(this.tablename.getName(), 532 compressionContext.tableDict)); 533 } 534 builder.setLogSequenceNumber(getSequenceId()); 535 builder.setWriteTime(writeTime); 536 if (this.origLogSeqNum > 0) { 537 builder.setOrigSequenceNumber(this.origLogSeqNum); 538 } 539 if (this.nonce != HConstants.NO_NONCE) { 540 builder.setNonce(nonce); 541 } 542 if (this.nonceGroup != HConstants.NO_NONCE) { 543 builder.setNonceGroup(nonceGroup); 544 } 545 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); 546 for (UUID clusterId : clusterIds) { 547 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); 548 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); 549 builder.addClusterIds(uuidBuilder.build()); 550 } 551 if (replicationScope != null) { 552 for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) { 553 ByteString family = (compressionContext == null) 554 ? UnsafeByteOperations.unsafeWrap(e.getKey()) 555 : compressor.compress(e.getKey(), compressionContext.familyDict); 556 builder.addScopes(FamilyScope.newBuilder() 557 .setFamily(family).setScopeType(ScopeType.forNumber(e.getValue()))); 558 } 559 } 560 return builder; 561 } 562 563 public void readFieldsFromPb(WALProtos.WALKey walKey, 564 WALCellCodec.ByteStringUncompressor uncompressor) 565 throws IOException { 566 if (this.compressionContext != null) { 567 this.encodedRegionName = uncompressor.uncompress( 568 walKey.getEncodedRegionName(), compressionContext.regionDict); 569 byte[] tablenameBytes = uncompressor.uncompress( 570 walKey.getTableName(), compressionContext.tableDict); 571 this.tablename = TableName.valueOf(tablenameBytes); 572 } else { 573 this.encodedRegionName = walKey.getEncodedRegionName().toByteArray(); 574 this.tablename = TableName.valueOf(walKey.getTableName().toByteArray()); 575 } 576 clusterIds.clear(); 577 for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { 578 clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); 579 } 580 if (walKey.hasNonceGroup()) { 581 this.nonceGroup = walKey.getNonceGroup(); 582 } 583 if (walKey.hasNonce()) { 584 this.nonce = walKey.getNonce(); 585 } 586 this.replicationScope = null; 587 if (walKey.getScopesCount() > 0) { 588 this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 589 for (FamilyScope scope : walKey.getScopesList()) { 590 byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() : 591 uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict); 592 this.replicationScope.put(family, scope.getScopeType().getNumber()); 593 } 594 } 595 setSequenceId(walKey.getLogSequenceNumber()); 596 this.writeTime = walKey.getWriteTime(); 597 if(walKey.hasOrigSequenceNumber()) { 598 this.origLogSeqNum = walKey.getOrigSequenceNumber(); 599 } 600 } 601 602 @Override 603 public long estimatedSerializedSizeOf() { 604 long size = encodedRegionName != null ? encodedRegionName.length : 0; 605 size += tablename != null ? tablename.toBytes().length : 0; 606 if (clusterIds != null) { 607 size += 16 * clusterIds.size(); 608 } 609 if (nonceGroup != HConstants.NO_NONCE) { 610 size += Bytes.SIZEOF_LONG; // nonce group 611 } 612 if (nonce != HConstants.NO_NONCE) { 613 size += Bytes.SIZEOF_LONG; // nonce 614 } 615 if (replicationScope != null) { 616 for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) { 617 size += scope.getKey().length; 618 size += Bytes.SIZEOF_INT; 619 } 620 } 621 size += Bytes.SIZEOF_LONG; // sequence number 622 size += Bytes.SIZEOF_LONG; // write time 623 if (origLogSeqNum > 0) { 624 size += Bytes.SIZEOF_LONG; // original sequence number 625 } 626 return size; 627 } 628}