001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.stream.Collectors;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.hadoop.hbase.wal.WAL.Entry;
028import org.apache.hadoop.hbase.wal.WALEdit;
029import org.apache.hadoop.hbase.wal.WALKey;
030import org.apache.yetus.audience.InterfaceAudience;
031
032/**
033 * Holds a batch of WAL entries to replicate, along with some statistics
034 */
035@InterfaceAudience.Private
036class WALEntryBatch {
037
038  // used by recovered replication queue to indicate that all the entries have been read.
039  public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
040
041  private List<Pair<Entry, Long>> walEntriesWithSize;
042
043  // last WAL that was read
044  private Path lastWalPath;
045  // position in WAL of last entry in this batch
046  private long lastWalPosition = 0;
047  // number of distinct row keys in this batch
048  private int nbRowKeys = 0;
049  // number of HFiles
050  private int nbHFiles = 0;
051  // heap size of data we need to replicate
052  private long heapSize = 0;
053  // save the last sequenceid for each region if the table has serial-replication scope
054  private Map<String, Long> lastSeqIds = new HashMap<>();
055  // indicate that this is the end of the current file
056  private boolean endOfFile;
057  // indicate the buffer size used, which is added to
058  // ReplicationSourceWALReader.totalBufferUsed
059  private long usedBufferSize;
060
061  /**
062   * @param lastWalPath Path of the WAL the last entry in this batch was read from
063   */
064  WALEntryBatch(int maxNbEntries, Path lastWalPath) {
065    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
066    this.lastWalPath = lastWalPath;
067  }
068
069  static WALEntryBatch endOfFile(Path lastWalPath) {
070    WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
071    batch.setLastWalPosition(-1L);
072    batch.setEndOfFile(true);
073    return batch;
074  }
075
076  public void addEntry(Entry entry, long entrySize) {
077    walEntriesWithSize.add(new Pair<>(entry, entrySize));
078  }
079
080  /** Returns the WAL Entries. */
081  public List<Entry> getWalEntries() {
082    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
083  }
084
085  /** Returns the WAL Entries. */
086  public List<Pair<Entry, Long>> getWalEntriesWithSize() {
087    return walEntriesWithSize;
088  }
089
090  /** Returns the path of the last WAL that was read. */
091  public Path getLastWalPath() {
092    return lastWalPath;
093  }
094
095  public void setLastWalPath(Path lastWalPath) {
096    this.lastWalPath = lastWalPath;
097  }
098
099  /** Returns the position in the last WAL that was read. */
100  public long getLastWalPosition() {
101    return lastWalPosition;
102  }
103
104  public void setLastWalPosition(long lastWalPosition) {
105    this.lastWalPosition = lastWalPosition;
106  }
107
108  public int getNbEntries() {
109    return walEntriesWithSize.size();
110  }
111
112  /** Returns the number of distinct row keys in this batch */
113  public int getNbRowKeys() {
114    return nbRowKeys;
115  }
116
117  /** Returns the number of HFiles in this batch */
118  public int getNbHFiles() {
119    return nbHFiles;
120  }
121
122  /** Returns total number of operations in this batch */
123  public int getNbOperations() {
124    return getNbRowKeys() + getNbHFiles();
125  }
126
127  /** Returns the heap size of this batch */
128  public long getHeapSize() {
129    return heapSize;
130  }
131
132  /** Returns the last sequenceid for each region if the table has serial-replication scope */
133  public Map<String, Long> getLastSeqIds() {
134    return lastSeqIds;
135  }
136
137  public boolean isEndOfFile() {
138    return endOfFile;
139  }
140
141  public void setEndOfFile(boolean endOfFile) {
142    this.endOfFile = endOfFile;
143  }
144
145  public void incrementNbRowKeys(int increment) {
146    nbRowKeys += increment;
147  }
148
149  public void incrementNbHFiles(int increment) {
150    nbHFiles += increment;
151  }
152
153  public void incrementHeapSize(long increment) {
154    heapSize += increment;
155  }
156
157  public void setLastSeqId(String region, long sequenceId) {
158    lastSeqIds.put(region, sequenceId);
159  }
160
161  public long incrementUsedBufferSize(Entry entry) {
162    long increment = getEntrySizeExcludeBulkLoad(entry);
163    usedBufferSize += increment;
164    return increment;
165  }
166
167  public long getUsedBufferSize() {
168    return this.usedBufferSize;
169  }
170
171  @Override
172  public String toString() {
173    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
174      + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
175      + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
176      + endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
177  }
178
179  static long getEntrySizeExcludeBulkLoad(Entry entry) {
180    WALEdit edit = entry.getEdit();
181    WALKey key = entry.getKey();
182    return edit.heapSize() + key.estimatedSerializedSizeOf();
183  }
184}