001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.wal;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.Map;
024import java.util.NavigableMap;
025import java.util.TreeMap;
026import java.util.UUID;
027import org.apache.hadoop.hbase.HBaseInterfaceAudience;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
031import org.apache.hadoop.hbase.regionserver.SequenceId;
032import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
033import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
036import org.apache.yetus.audience.InterfaceAudience;
037
038import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FamilyScope;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ScopeType;
045
046/**
047 * Default implementation of Key for an Entry in the WAL.
048 * For internal use only though Replication needs to have access.
049 *
050 * The log intermingles edits to many tables and rows, so each log entry
051 * identifies the appropriate table and row.  Within a table and row, they're
052 * also sorted.
053 *
054 * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
055 *
056 */
057// TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
058//       purposes. They need to be merged into WALEntry.
059@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.REPLICATION})
060public class WALKeyImpl implements WALKey {
061  public static final WALKeyImpl EMPTY_WALKEYIMPL = new WALKeyImpl();
062
063  public MultiVersionConcurrencyControl getMvcc() {
064    return mvcc;
065  }
066
067  /**
068   * Use it to complete mvcc transaction. This WALKeyImpl was part of
069   * (the transaction is started when you call append; see the comment on FSHLog#append). To
070   * complete call
071   * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
072   * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)}
073   * @return A WriteEntry gotten from local WAL subsystem.
074   * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry)
075   */
076  public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() {
077    return this.writeEntry;
078  }
079
080  public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
081    assert this.writeEntry == null;
082    this.writeEntry = writeEntry;
083    // Set our sequenceid now using WriteEntry.
084    this.sequenceId = writeEntry.getWriteNumber();
085  }
086
087  private byte [] encodedRegionName;
088
089  private TableName tablename;
090
091  /**
092   * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is
093   * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized.
094   */
095  private long sequenceId;
096
097  /**
098   * Used during WAL replay; the sequenceId of the edit when it came into the system.
099   */
100  private long origLogSeqNum = 0;
101
102  /** Time at which this edit was written. */
103  private long writeTime;
104
105  /** The first element in the list is the cluster id on which the change has originated */
106  private List<UUID> clusterIds;
107
108  private NavigableMap<byte[], Integer> replicationScope;
109
110  private long nonceGroup = HConstants.NO_NONCE;
111  private long nonce = HConstants.NO_NONCE;
112  private MultiVersionConcurrencyControl mvcc;
113
114  /**
115   * Set in a way visible to multiple threads; e.g. synchronized getter/setters.
116   */
117  private MultiVersionConcurrencyControl.WriteEntry writeEntry;
118
119  public WALKeyImpl() {
120    init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
121        new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
122  }
123
124  public WALKeyImpl(final NavigableMap<byte[], Integer> replicationScope) {
125    init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
126        new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, replicationScope);
127  }
128
129  @VisibleForTesting
130  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
131                long logSeqNum,
132      final long now, UUID clusterId) {
133    List<UUID> clusterIds = new ArrayList<>(1);
134    clusterIds.add(clusterId);
135    init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
136        HConstants.NO_NONCE, HConstants.NO_NONCE, null, null);
137  }
138
139  // TODO: Fix being able to pass in sequenceid.
140  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now) {
141    init(encodedRegionName,
142        tablename,
143        NO_SEQUENCE_ID,
144        now,
145        EMPTY_UUIDS,
146        HConstants.NO_NONCE,
147        HConstants.NO_NONCE,
148        null, null);
149  }
150
151  // TODO: Fix being able to pass in sequenceid.
152  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
153      final NavigableMap<byte[], Integer> replicationScope) {
154    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
155        HConstants.NO_NONCE, null, replicationScope);
156  }
157
158  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, final long now,
159      MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
160    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, EMPTY_UUIDS, HConstants.NO_NONCE,
161        HConstants.NO_NONCE, mvcc, replicationScope);
162  }
163
164  public WALKeyImpl(final byte[] encodedRegionName,
165                final TableName tablename,
166                final long now,
167                MultiVersionConcurrencyControl mvcc) {
168    init(encodedRegionName,
169        tablename,
170        NO_SEQUENCE_ID,
171        now,
172        EMPTY_UUIDS,
173        HConstants.NO_NONCE,
174        HConstants.NO_NONCE,
175        mvcc, null);
176  }
177
178  /**
179   * Create the log key for writing to somewhere.
180   * We maintain the tablename mainly for debugging purposes.
181   * A regionName is always a sub-table object.
182   * <p>Used by log splitting and snapshots.
183   *
184   * @param encodedRegionName Encoded name of the region as returned by
185   *                         <code>HRegionInfo#getEncodedNameAsBytes()</code>.
186   * @param tablename         - name of table
187   * @param logSeqNum         - log sequence number
188   * @param now               Time at which this edit was written.
189   * @param clusterIds        the clusters that have consumed the change(used in Replication)
190   * @param nonceGroup        the nonceGroup
191   * @param nonce             the nonce
192   * @param mvcc              the mvcc associate the WALKeyImpl
193   * @param replicationScope  the non-default replication scope
194   *                          associated with the region's column families
195   */
196  // TODO: Fix being able to pass in sequenceid.
197  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
198      final long now, List<UUID> clusterIds, long nonceGroup, long nonce,
199      MultiVersionConcurrencyControl mvcc, final NavigableMap<byte[], Integer> replicationScope) {
200    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc,
201        replicationScope);
202  }
203
204  /**
205   * Create the log key for writing to somewhere.
206   * We maintain the tablename mainly for debugging purposes.
207   * A regionName is always a sub-table object.
208   * <p>Used by log splitting and snapshots.
209   *
210   * @param encodedRegionName Encoded name of the region as returned by
211   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
212   * @param tablename         - name of table
213   * @param logSeqNum         - log sequence number
214   * @param now               Time at which this edit was written.
215   * @param clusterIds        the clusters that have consumed the change(used in Replication)
216   */
217  // TODO: Fix being able to pass in sequenceid.
218  public WALKeyImpl(final byte[] encodedRegionName,
219                final TableName tablename,
220                long logSeqNum,
221                final long now,
222                List<UUID> clusterIds,
223                long nonceGroup,
224                long nonce,
225                MultiVersionConcurrencyControl mvcc) {
226    init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc, null);
227  }
228
229  /**
230   * Create the log key for writing to somewhere.
231   * We maintain the tablename mainly for debugging purposes.
232   * A regionName is always a sub-table object.
233   *
234   * @param encodedRegionName Encoded name of the region as returned by
235   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
236   * @param tablename         the tablename
237   * @param now               Time at which this edit was written.
238   * @param clusterIds        the clusters that have consumed the change(used in Replication)
239   * @param nonceGroup
240   * @param nonce
241   * @param mvcc mvcc control used to generate sequence numbers and control read/write points
242   */
243  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
244                final long now, List<UUID> clusterIds, long nonceGroup,
245                final long nonce, final MultiVersionConcurrencyControl mvcc) {
246    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
247        null);
248  }
249
250  /**
251   * Create the log key for writing to somewhere.
252   * We maintain the tablename mainly for debugging purposes.
253   * A regionName is always a sub-table object.
254   *
255   * @param encodedRegionName Encoded name of the region as returned by
256   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
257   * @param tablename
258   * @param now               Time at which this edit was written.
259   * @param clusterIds        the clusters that have consumed the change(used in Replication)
260   * @param nonceGroup        the nonceGroup
261   * @param nonce             the nonce
262   * @param mvcc mvcc control used to generate sequence numbers and control read/write points
263   * @param replicationScope  the non-default replication scope of the column families
264   */
265  public WALKeyImpl(final byte[] encodedRegionName, final TableName tablename,
266                final long now, List<UUID> clusterIds, long nonceGroup,
267                final long nonce, final MultiVersionConcurrencyControl mvcc,
268                NavigableMap<byte[], Integer> replicationScope) {
269    init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc,
270        replicationScope);
271  }
272
273  /**
274   * Create the log key for writing to somewhere.
275   * We maintain the tablename mainly for debugging purposes.
276   * A regionName is always a sub-table object.
277   *
278   * @param encodedRegionName Encoded name of the region as returned by
279   *                          <code>HRegionInfo#getEncodedNameAsBytes()</code>.
280   * @param tablename
281   * @param logSeqNum
282   * @param nonceGroup
283   * @param nonce
284   */
285  // TODO: Fix being able to pass in sequenceid.
286  public WALKeyImpl(final byte[] encodedRegionName,
287                final TableName tablename,
288                long logSeqNum,
289                long nonceGroup,
290                long nonce,
291                final MultiVersionConcurrencyControl mvcc) {
292    init(encodedRegionName,
293        tablename,
294        logSeqNum,
295        EnvironmentEdgeManager.currentTime(),
296        EMPTY_UUIDS,
297        nonceGroup,
298        nonce,
299        mvcc, null);
300  }
301
302  @InterfaceAudience.Private
303  protected void init(final byte[] encodedRegionName,
304                      final TableName tablename,
305                      long logSeqNum,
306                      final long now,
307                      List<UUID> clusterIds,
308                      long nonceGroup,
309                      long nonce,
310                      MultiVersionConcurrencyControl mvcc,
311                      NavigableMap<byte[], Integer> replicationScope) {
312    this.sequenceId = logSeqNum;
313    this.writeTime = now;
314    this.clusterIds = clusterIds;
315    this.encodedRegionName = encodedRegionName;
316    this.tablename = tablename;
317    this.nonceGroup = nonceGroup;
318    this.nonce = nonce;
319    this.mvcc = mvcc;
320    if (logSeqNum != NO_SEQUENCE_ID) {
321      setSequenceId(logSeqNum);
322    }
323    this.replicationScope = replicationScope;
324  }
325
326  // For deserialization. DO NOT USE. See setWriteEntry below.
327  @InterfaceAudience.Private
328  protected void setSequenceId(long sequenceId) {
329    this.sequenceId = sequenceId;
330  }
331
332  /**
333   * @param compressionContext Compression context to use
334   * @deprecated deparcated since hbase 2.1.0
335   */
336  @Deprecated
337  public void setCompressionContext(CompressionContext compressionContext) {
338    //do nothing
339  }
340
341  /** @return encoded region name */
342  @Override
343  public byte [] getEncodedRegionName() {
344    return encodedRegionName;
345  }
346
347  /** @return table name */
348  @Override
349  public TableName getTableName() {
350    return tablename;
351  }
352
353  /** @return log sequence number
354   * @deprecated Use {@link #getSequenceId()}
355   */
356  @Deprecated
357  public long getLogSeqNum() {
358    return getSequenceId();
359  }
360
361  /**
362   * Used to set original sequenceId for WALKeyImpl during WAL replay
363   */
364  public void setOrigLogSeqNum(final long sequenceId) {
365    this.origLogSeqNum = sequenceId;
366  }
367
368  /**
369   * Return a positive long if current WALKeyImpl is created from a replay edit; a replay edit is an
370   * edit that came in when replaying WALs of a crashed server.
371   * @return original sequence number of the WALEdit
372   */
373  @Override
374  public long getOrigLogSeqNum() {
375    return this.origLogSeqNum;
376  }
377
378  /**
379   * SequenceId is only available post WAL-assign. Calls before this will get you a
380   * {@link SequenceId#NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this
381   * method for more on when this sequenceId comes available.
382   * @return long the new assigned sequence number
383   */
384  @Override
385  public long getSequenceId() {
386    return this.sequenceId;
387  }
388
389  /**
390   * @return the write time
391   */
392  @Override
393  public long getWriteTime() {
394    return this.writeTime;
395  }
396
397  public NavigableMap<byte[], Integer> getReplicationScopes() {
398    return replicationScope;
399  }
400
401  /** @return The nonce group */
402  @Override
403  public long getNonceGroup() {
404    return nonceGroup;
405  }
406
407  /** @return The nonce */
408  @Override
409  public long getNonce() {
410    return nonce;
411  }
412
413  private void setReplicationScope(NavigableMap<byte[], Integer> replicationScope) {
414    this.replicationScope = replicationScope;
415  }
416
417  public void clearReplicationScope() {
418    setReplicationScope(null);
419  }
420
421  /**
422   * Marks that the cluster with the given clusterId has consumed the change
423   */
424  public void addClusterId(UUID clusterId) {
425    if (!clusterIds.contains(clusterId)) {
426      clusterIds.add(clusterId);
427    }
428  }
429
430  /**
431   * @return the set of cluster Ids that have consumed the change
432   */
433  public List<UUID> getClusterIds() {
434    return clusterIds;
435  }
436
437  /**
438   * @return the cluster id on which the change has originated. It there is no such cluster, it
439   *         returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
440   */
441  @Override
442  public UUID getOriginatingClusterId(){
443    return clusterIds.isEmpty()? HConstants.DEFAULT_CLUSTER_ID: clusterIds.get(0);
444  }
445
446  @Override
447  public String toString() {
448    return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + sequenceId;
449  }
450
451  @Override
452  public boolean equals(Object obj) {
453    if (this == obj) {
454      return true;
455    }
456    if (obj == null || getClass() != obj.getClass()) {
457      return false;
458    }
459    return compareTo((WALKey)obj) == 0;
460  }
461
462  @Override
463  public int hashCode() {
464    int result = Bytes.hashCode(this.encodedRegionName);
465    result = (int) (result ^ getSequenceId());
466    result = (int) (result ^ this.writeTime);
467    return result;
468  }
469
470  @Override
471  public int compareTo(WALKey o) {
472    int result = Bytes.compareTo(this.encodedRegionName, o.getEncodedRegionName());
473    if (result == 0) {
474      long sid = getSequenceId();
475      long otherSid = o.getSequenceId();
476      if (sid < otherSid) {
477        result = -1;
478      } else if (sid  > otherSid) {
479        result = 1;
480      }
481      if (result == 0) {
482        if (this.writeTime < o.getWriteTime()) {
483          result = -1;
484        } else if (this.writeTime > o.getWriteTime()) {
485          return 1;
486        }
487      }
488    }
489    // why isn't cluster id accounted for?
490    return result;
491  }
492
493  /**
494   * Drop this instance's tablename byte array and instead
495   * hold a reference to the provided tablename. This is not
496   * meant to be a general purpose setter - it's only used
497   * to collapse references to conserve memory.
498   */
499  void internTableName(TableName tablename) {
500    // We should not use this as a setter - only to swap
501    // in a new reference to the same table name.
502    assert tablename.equals(this.tablename);
503    this.tablename = tablename;
504  }
505
506  /**
507   * Drop this instance's region name byte array and instead
508   * hold a reference to the provided region name. This is not
509   * meant to be a general purpose setter - it's only used
510   * to collapse references to conserve memory.
511   */
512  void internEncodedRegionName(byte []encodedRegionName) {
513    // We should not use this as a setter - only to swap
514    // in a new reference to the same table name.
515    assert Bytes.equals(this.encodedRegionName, encodedRegionName);
516    this.encodedRegionName = encodedRegionName;
517  }
518
519  public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
520      throws IOException {
521    WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
522    builder.setEncodedRegionName(
523      compressor.compress(this.encodedRegionName, CompressionContext.DictionaryIndex.REGION));
524    builder.setTableName(
525      compressor.compress(this.tablename.getName(), CompressionContext.DictionaryIndex.TABLE));
526    builder.setLogSequenceNumber(getSequenceId());
527    builder.setWriteTime(writeTime);
528    if (this.origLogSeqNum > 0) {
529      builder.setOrigSequenceNumber(this.origLogSeqNum);
530    }
531    if (this.nonce != HConstants.NO_NONCE) {
532      builder.setNonce(nonce);
533    }
534    if (this.nonceGroup != HConstants.NO_NONCE) {
535      builder.setNonceGroup(nonceGroup);
536    }
537    HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
538    for (UUID clusterId : clusterIds) {
539      uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
540      uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
541      builder.addClusterIds(uuidBuilder.build());
542    }
543    if (replicationScope != null) {
544      for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
545        ByteString family =
546            compressor.compress(e.getKey(), CompressionContext.DictionaryIndex.FAMILY);
547        builder.addScopes(FamilyScope.newBuilder().setFamily(family)
548            .setScopeType(ScopeType.forNumber(e.getValue())));
549      }
550    }
551    return builder;
552  }
553
554  public void readFieldsFromPb(WALProtos.WALKey walKey,
555      WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
556    this.encodedRegionName = uncompressor.uncompress(walKey.getEncodedRegionName(),
557      CompressionContext.DictionaryIndex.REGION);
558    byte[] tablenameBytes =
559        uncompressor.uncompress(walKey.getTableName(), CompressionContext.DictionaryIndex.TABLE);
560    this.tablename = TableName.valueOf(tablenameBytes);
561    clusterIds.clear();
562    for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
563      clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
564    }
565    if (walKey.hasNonceGroup()) {
566      this.nonceGroup = walKey.getNonceGroup();
567    }
568    if (walKey.hasNonce()) {
569      this.nonce = walKey.getNonce();
570    }
571    this.replicationScope = null;
572    if (walKey.getScopesCount() > 0) {
573      this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
574      for (FamilyScope scope : walKey.getScopesList()) {
575        byte[] family =
576            uncompressor.uncompress(scope.getFamily(), CompressionContext.DictionaryIndex.FAMILY);
577        this.replicationScope.put(family, scope.getScopeType().getNumber());
578      }
579    }
580    setSequenceId(walKey.getLogSequenceNumber());
581    this.writeTime = walKey.getWriteTime();
582    if (walKey.hasOrigSequenceNumber()) {
583      this.origLogSeqNum = walKey.getOrigSequenceNumber();
584    }
585  }
586
587  @Override
588  public long estimatedSerializedSizeOf() {
589    long size = encodedRegionName != null ? encodedRegionName.length : 0;
590    size += tablename != null ? tablename.toBytes().length : 0;
591    if (clusterIds != null) {
592      size += 16 * clusterIds.size();
593    }
594    if (nonceGroup != HConstants.NO_NONCE) {
595      size += Bytes.SIZEOF_LONG; // nonce group
596    }
597    if (nonce != HConstants.NO_NONCE) {
598      size += Bytes.SIZEOF_LONG; // nonce
599    }
600    if (replicationScope != null) {
601      for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
602        size += scope.getKey().length;
603        size += Bytes.SIZEOF_INT;
604      }
605    }
606    size += Bytes.SIZEOF_LONG; // sequence number
607    size += Bytes.SIZEOF_LONG; // write time
608    if (origLogSeqNum > 0) {
609      size += Bytes.SIZEOF_LONG; // original sequence number
610    }
611    return size;
612  }
613}