001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Map; 024import java.util.NavigableMap; 025import java.util.TreeMap; 026import java.util.UUID; 027import org.apache.hadoop.hbase.HBaseInterfaceAudience; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; 031import org.apache.hadoop.hbase.regionserver.SequenceId; 032import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; 033import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 036import org.apache.yetus.audience.InterfaceAudience; 037 038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType; 045 046/** 047 * Default implementation of Key for an Entry in the WAL. 048 * For internal use only though Replication needs to have access. 049 * 050 * The log intermingles edits to many tables and rows, so each log entry 051 * identifies the appropriate table and row. Within a table and row, they're 052 * also sorted. 053 * 054 * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. 055 * 056 */ 057// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical 058// purposes. They need to be merged into WALEntry. 059@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION}) 060public class WALKeyImpl implements WALKey { 061 public static final WALKeyImpl EMPTY_WALKEYIMPL = new WALKeyImpl(); 062 063 public MultiVersionConcurrencyControl getMvcc() { 064 return mvcc; 065 } 066 067 /** 068 * Use it to complete mvcc transaction. This WALKeyImpl was part of 069 * (the transaction is started when you call append; see the comment on FSHLog#append). To 070 * complete call 071 * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} 072 * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} 073 * @return A WriteEntry gotten from local WAL subsystem. 074 * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry) 075 */ 076 public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() { 077 return this.writeEntry; 078 } 079 080 public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { 081 assert this.writeEntry == null; 082 this.writeEntry = writeEntry; 083 // Set our sequenceid now using WriteEntry. 084 this.sequenceId = writeEntry.getWriteNumber(); 085 } 086 087 private byte [] encodedRegionName; 088 089 private TableName tablename; 090 091 /** 092 * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is 093 * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. 094 */ 095 private long sequenceId; 096 097 /** 098 * Used during WAL replay; the sequenceId of the edit when it came into the system. 099 */ 100 private long origLogSeqNum = 0; 101 102 /** Time at which this edit was written. */ 103 private long writeTime; 104 105 /** The first element in the list is the cluster id on which the change has originated */ 106 private List<UUID> clusterIds; 107 108 private NavigableMap<byte[], Integer> replicationScope; 109 110 private long nonceGroup = HConstants.NO_NONCE; 111 private long nonce = HConstants.NO_NONCE; 112 private MultiVersionConcurrencyControl mvcc; 113 114 /** 115 * Set in a way visible to multiple threads; e.g. synchronized getter/setters. 116 */ 117 private MultiVersionConcurrencyControl.WriteEntry writeEntry; 118 119 public WALKeyImpl() { 120 init(null, null, 0L, HConstants.LATEST_TIMESTAMP, 121 new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); 122 } 123 124 public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) { 125 init(null, null, 0L, HConstants.LATEST_TIMESTAMP, 126 new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope); 127 } 128 129 @VisibleForTesting 130 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 131 long logSeqNum, 132 final long now, UUID clusterId) { 133 List<UUID> clusterIds = new ArrayList<>(1); 134 clusterIds.add(clusterId); 135 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, 136 HConstants.NO_NONCE, HConstants.NO_NONCE, null, null); 137 } 138 139 // TODO: Fix being able to pass in sequenceid. 140 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now) { 141 init(encodedRegionName, 142 tablename, 143 NO_SEQUENCE_ID, 144 now, 145 EMPTY_UUIDS, 146 HConstants.NO_NONCE, 147 HConstants.NO_NONCE, 148 null, null); 149 } 150 151 // TODO: Fix being able to pass in sequenceid. 152 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, 153 final NavigableMap<byte[], Integer> replicationScope) { 154 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, 155 HConstants.NO_NONCE, null, replicationScope); 156 } 157 158 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now, 159 MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) { 160 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE, 161 HConstants.NO_NONCE, mvcc, replicationScope); 162 } 163 164 public WALKeyImpl(final byte[] encodedRegionName, 165 final TableName tablename, 166 final long now, 167 MultiVersionConcurrencyControl mvcc) { 168 init(encodedRegionName, 169 tablename, 170 NO_SEQUENCE_ID, 171 now, 172 EMPTY_UUIDS, 173 HConstants.NO_NONCE, 174 HConstants.NO_NONCE, 175 mvcc, null); 176 } 177 178 /** 179 * Create the log key for writing to somewhere. 180 * We maintain the tablename mainly for debugging purposes. 181 * A regionName is always a sub-table object. 182 * <p>Used by log splitting and snapshots. 183 * 184 * @param encodedRegionName Encoded name of the region as returned by 185 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 186 * @param tablename - name of table 187 * @param logSeqNum - log sequence number 188 * @param now Time at which this edit was written. 189 * @param clusterIds the clusters that have consumed the change(used in Replication) 190 * @param nonceGroup the nonceGroup 191 * @param nonce the nonce 192 * @param mvcc the mvcc associate the WALKeyImpl 193 * @param replicationScope the non-default replication scope 194 * associated with the region's column families 195 */ 196 // TODO: Fix being able to pass in sequenceid. 197 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, 198 final long now, List<UUID> clusterIds, long nonceGroup, long nonce, 199 MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) { 200 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, 201 replicationScope); 202 } 203 204 /** 205 * Create the log key for writing to somewhere. 206 * We maintain the tablename mainly for debugging purposes. 207 * A regionName is always a sub-table object. 208 * <p>Used by log splitting and snapshots. 209 * 210 * @param encodedRegionName Encoded name of the region as returned by 211 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 212 * @param tablename - name of table 213 * @param logSeqNum - log sequence number 214 * @param now Time at which this edit was written. 215 * @param clusterIds the clusters that have consumed the change(used in Replication) 216 */ 217 // TODO: Fix being able to pass in sequenceid. 218 public WALKeyImpl(final byte[] encodedRegionName, 219 final TableName tablename, 220 long logSeqNum, 221 final long now, 222 List<UUID> clusterIds, 223 long nonceGroup, 224 long nonce, 225 MultiVersionConcurrencyControl mvcc) { 226 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null); 227 } 228 229 /** 230 * Create the log key for writing to somewhere. 231 * We maintain the tablename mainly for debugging purposes. 232 * A regionName is always a sub-table object. 233 * 234 * @param encodedRegionName Encoded name of the region as returned by 235 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 236 * @param tablename the tablename 237 * @param now Time at which this edit was written. 238 * @param clusterIds the clusters that have consumed the change(used in Replication) 239 * @param nonceGroup 240 * @param nonce 241 * @param mvcc mvcc control used to generate sequence numbers and control read/write points 242 */ 243 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 244 final long now, List<UUID> clusterIds, long nonceGroup, 245 final long nonce, final MultiVersionConcurrencyControl mvcc) { 246 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, 247 null); 248 } 249 250 /** 251 * Create the log key for writing to somewhere. 252 * We maintain the tablename mainly for debugging purposes. 253 * A regionName is always a sub-table object. 254 * 255 * @param encodedRegionName Encoded name of the region as returned by 256 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 257 * @param tablename 258 * @param now Time at which this edit was written. 259 * @param clusterIds the clusters that have consumed the change(used in Replication) 260 * @param nonceGroup the nonceGroup 261 * @param nonce the nonce 262 * @param mvcc mvcc control used to generate sequence numbers and control read/write points 263 * @param replicationScope the non-default replication scope of the column families 264 */ 265 public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, 266 final long now, List<UUID> clusterIds, long nonceGroup, 267 final long nonce, final MultiVersionConcurrencyControl mvcc, 268 NavigableMap<byte[], Integer> replicationScope) { 269 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, 270 replicationScope); 271 } 272 273 /** 274 * Create the log key for writing to somewhere. 275 * We maintain the tablename mainly for debugging purposes. 276 * A regionName is always a sub-table object. 277 * 278 * @param encodedRegionName Encoded name of the region as returned by 279 * <code>HRegionInfo#getEncodedNameAsBytes()</code>. 280 * @param tablename 281 * @param logSeqNum 282 * @param nonceGroup 283 * @param nonce 284 */ 285 // TODO: Fix being able to pass in sequenceid. 286 public WALKeyImpl(final byte[] encodedRegionName, 287 final TableName tablename, 288 long logSeqNum, 289 long nonceGroup, 290 long nonce, 291 final MultiVersionConcurrencyControl mvcc) { 292 init(encodedRegionName, 293 tablename, 294 logSeqNum, 295 EnvironmentEdgeManager.currentTime(), 296 EMPTY_UUIDS, 297 nonceGroup, 298 nonce, 299 mvcc, null); 300 } 301 302 @InterfaceAudience.Private 303 protected void init(final byte[] encodedRegionName, 304 final TableName tablename, 305 long logSeqNum, 306 final long now, 307 List<UUID> clusterIds, 308 long nonceGroup, 309 long nonce, 310 MultiVersionConcurrencyControl mvcc, 311 NavigableMap<byte[], Integer> replicationScope) { 312 this.sequenceId = logSeqNum; 313 this.writeTime = now; 314 this.clusterIds = clusterIds; 315 this.encodedRegionName = encodedRegionName; 316 this.tablename = tablename; 317 this.nonceGroup = nonceGroup; 318 this.nonce = nonce; 319 this.mvcc = mvcc; 320 if (logSeqNum != NO_SEQUENCE_ID) { 321 setSequenceId(logSeqNum); 322 } 323 this.replicationScope = replicationScope; 324 } 325 326 // For deserialization. DO NOT USE. See setWriteEntry below. 327 @InterfaceAudience.Private 328 protected void setSequenceId(long sequenceId) { 329 this.sequenceId = sequenceId; 330 } 331 332 /** 333 * @param compressionContext Compression context to use 334 * @deprecated deparcated since hbase 2.1.0 335 */ 336 @Deprecated 337 public void setCompressionContext(CompressionContext compressionContext) { 338 //do nothing 339 } 340 341 /** @return encoded region name */ 342 @Override 343 public byte [] getEncodedRegionName() { 344 return encodedRegionName; 345 } 346 347 /** @return table name */ 348 @Override 349 public TableName getTableName() { 350 return tablename; 351 } 352 353 /** @return log sequence number 354 * @deprecated Use {@link #getSequenceId()} 355 */ 356 @Deprecated 357 public long getLogSeqNum() { 358 return getSequenceId(); 359 } 360 361 /** 362 * Used to set original sequenceId for WALKeyImpl during WAL replay 363 */ 364 public void setOrigLogSeqNum(final long sequenceId) { 365 this.origLogSeqNum = sequenceId; 366 } 367 368 /** 369 * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an 370 * edit that came in when replaying WALs of a crashed server. 371 * @return original sequence number of the WALEdit 372 */ 373 @Override 374 public long getOrigLogSeqNum() { 375 return this.origLogSeqNum; 376 } 377 378 /** 379 * SequenceId is only available post WAL-assign. Calls before this will get you a 380 * {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this 381 * method for more on when this sequenceId comes available. 382 * @return long the new assigned sequence number 383 */ 384 @Override 385 public long getSequenceId() { 386 return this.sequenceId; 387 } 388 389 /** 390 * @return the write time 391 */ 392 @Override 393 public long getWriteTime() { 394 return this.writeTime; 395 } 396 397 public NavigableMap<byte[], Integer> getReplicationScopes() { 398 return replicationScope; 399 } 400 401 /** @return The nonce group */ 402 @Override 403 public long getNonceGroup() { 404 return nonceGroup; 405 } 406 407 /** @return The nonce */ 408 @Override 409 public long getNonce() { 410 return nonce; 411 } 412 413 private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) { 414 this.replicationScope = replicationScope; 415 } 416 417 public void clearReplicationScope() { 418 setReplicationScope(null); 419 } 420 421 /** 422 * Marks that the cluster with the given clusterId has consumed the change 423 */ 424 public void addClusterId(UUID clusterId) { 425 if (!clusterIds.contains(clusterId)) { 426 clusterIds.add(clusterId); 427 } 428 } 429 430 /** 431 * @return the set of cluster Ids that have consumed the change 432 */ 433 public List<UUID> getClusterIds() { 434 return clusterIds; 435 } 436 437 /** 438 * @return the cluster id on which the change has originated. It there is no such cluster, it 439 * returns DEFAULT_CLUSTER_ID (cases where replication is not enabled) 440 */ 441 @Override 442 public UUID getOriginatingClusterId(){ 443 return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0); 444 } 445 446 @Override 447 public String toString() { 448 return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId; 449 } 450 451 @Override 452 public boolean equals(Object obj) { 453 if (this == obj) { 454 return true; 455 } 456 if (obj == null || getClass() != obj.getClass()) { 457 return false; 458 } 459 return compareTo((WALKey)obj) == 0; 460 } 461 462 @Override 463 public int hashCode() { 464 int result = Bytes.hashCode(this.encodedRegionName); 465 result = (int) (result ^ getSequenceId()); 466 result = (int) (result ^ this.writeTime); 467 return result; 468 } 469 470 @Override 471 public int compareTo(WALKey o) { 472 int result = Bytes.compareTo(this.encodedRegionName, o.getEncodedRegionName()); 473 if (result == 0) { 474 long sid = getSequenceId(); 475 long otherSid = o.getSequenceId(); 476 if (sid < otherSid) { 477 result = -1; 478 } else if (sid > otherSid) { 479 result = 1; 480 } 481 if (result == 0) { 482 if (this.writeTime < o.getWriteTime()) { 483 result = -1; 484 } else if (this.writeTime > o.getWriteTime()) { 485 return 1; 486 } 487 } 488 } 489 // why isn't cluster id accounted for? 490 return result; 491 } 492 493 /** 494 * Drop this instance's tablename byte array and instead 495 * hold a reference to the provided tablename. This is not 496 * meant to be a general purpose setter - it's only used 497 * to collapse references to conserve memory. 498 */ 499 void internTableName(TableName tablename) { 500 // We should not use this as a setter - only to swap 501 // in a new reference to the same table name. 502 assert tablename.equals(this.tablename); 503 this.tablename = tablename; 504 } 505 506 /** 507 * Drop this instance's region name byte array and instead 508 * hold a reference to the provided region name. This is not 509 * meant to be a general purpose setter - it's only used 510 * to collapse references to conserve memory. 511 */ 512 void internEncodedRegionName(byte []encodedRegionName) { 513 // We should not use this as a setter - only to swap 514 // in a new reference to the same table name. 515 assert Bytes.equals(this.encodedRegionName, encodedRegionName); 516 this.encodedRegionName = encodedRegionName; 517 } 518 519 public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor) 520 throws IOException { 521 WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder(); 522 builder.setEncodedRegionName( 523 compressor.compress(this.encodedRegionName, CompressionContext.DictionaryIndex.REGION)); 524 builder.setTableName( 525 compressor.compress(this.tablename.getName(), CompressionContext.DictionaryIndex.TABLE)); 526 builder.setLogSequenceNumber(getSequenceId()); 527 builder.setWriteTime(writeTime); 528 if (this.origLogSeqNum > 0) { 529 builder.setOrigSequenceNumber(this.origLogSeqNum); 530 } 531 if (this.nonce != HConstants.NO_NONCE) { 532 builder.setNonce(nonce); 533 } 534 if (this.nonceGroup != HConstants.NO_NONCE) { 535 builder.setNonceGroup(nonceGroup); 536 } 537 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder(); 538 for (UUID clusterId : clusterIds) { 539 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits()); 540 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits()); 541 builder.addClusterIds(uuidBuilder.build()); 542 } 543 if (replicationScope != null) { 544 for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) { 545 ByteString family = 546 compressor.compress(e.getKey(), CompressionContext.DictionaryIndex.FAMILY); 547 builder.addScopes(FamilyScope.newBuilder().setFamily(family) 548 .setScopeType(ScopeType.forNumber(e.getValue()))); 549 } 550 } 551 return builder; 552 } 553 554 public void readFieldsFromPb(WALProtos.WALKey walKey, 555 WALCellCodec.ByteStringUncompressor uncompressor) throws IOException { 556 this.encodedRegionName = uncompressor.uncompress(walKey.getEncodedRegionName(), 557 CompressionContext.DictionaryIndex.REGION); 558 byte[] tablenameBytes = 559 uncompressor.uncompress(walKey.getTableName(), CompressionContext.DictionaryIndex.TABLE); 560 this.tablename = TableName.valueOf(tablenameBytes); 561 clusterIds.clear(); 562 for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) { 563 clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits())); 564 } 565 if (walKey.hasNonceGroup()) { 566 this.nonceGroup = walKey.getNonceGroup(); 567 } 568 if (walKey.hasNonce()) { 569 this.nonce = walKey.getNonce(); 570 } 571 this.replicationScope = null; 572 if (walKey.getScopesCount() > 0) { 573 this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); 574 for (FamilyScope scope : walKey.getScopesList()) { 575 byte[] family = 576 uncompressor.uncompress(scope.getFamily(), CompressionContext.DictionaryIndex.FAMILY); 577 this.replicationScope.put(family, scope.getScopeType().getNumber()); 578 } 579 } 580 setSequenceId(walKey.getLogSequenceNumber()); 581 this.writeTime = walKey.getWriteTime(); 582 if (walKey.hasOrigSequenceNumber()) { 583 this.origLogSeqNum = walKey.getOrigSequenceNumber(); 584 } 585 } 586 587 @Override 588 public long estimatedSerializedSizeOf() { 589 long size = encodedRegionName != null ? encodedRegionName.length : 0; 590 size += tablename != null ? tablename.toBytes().length : 0; 591 if (clusterIds != null) { 592 size += 16 * clusterIds.size(); 593 } 594 if (nonceGroup != HConstants.NO_NONCE) { 595 size += Bytes.SIZEOF_LONG; // nonce group 596 } 597 if (nonce != HConstants.NO_NONCE) { 598 size += Bytes.SIZEOF_LONG; // nonce 599 } 600 if (replicationScope != null) { 601 for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) { 602 size += scope.getKey().length; 603 size += Bytes.SIZEOF_INT; 604 } 605 } 606 size += Bytes.SIZEOF_LONG; // sequence number 607 size += Bytes.SIZEOF_LONG; // write time 608 if (origLogSeqNum > 0) { 609 size += Bytes.SIZEOF_LONG; // original sequence number 610 } 611 return size; 612 } 613}