001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.stream.Collectors;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.util.Pair;
027import org.apache.hadoop.hbase.wal.WAL.Entry;
028import org.apache.yetus.audience.InterfaceAudience;
029
030/**
031 * Holds a batch of WAL entries to replicate, along with some statistics
032 */
033@InterfaceAudience.Private
034class WALEntryBatch {
035
036  // used by recovered replication queue to indicate that all the entries have been read.
037  public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null);
038
039  private List<Pair<Entry, Long>> walEntriesWithSize;
040
041  // last WAL that was read
042  private Path lastWalPath;
043  // position in WAL of last entry in this batch
044  private long lastWalPosition = 0;
045  // number of distinct row keys in this batch
046  private int nbRowKeys = 0;
047  // number of HFiles
048  private int nbHFiles = 0;
049  // heap size of data we need to replicate
050  private long heapSize = 0;
051  // save the last sequenceid for each region if the table has serial-replication scope
052  private Map<String, Long> lastSeqIds = new HashMap<>();
053  // indicate that this is the end of the current file
054  private boolean endOfFile;
055
056  /**
057   * @param lastWalPath Path of the WAL the last entry in this batch was read from
058   */
059  WALEntryBatch(int maxNbEntries, Path lastWalPath) {
060    this.walEntriesWithSize = new ArrayList<>(maxNbEntries);
061    this.lastWalPath = lastWalPath;
062  }
063
064  static WALEntryBatch endOfFile(Path lastWalPath) {
065    WALEntryBatch batch = new WALEntryBatch(0, lastWalPath);
066    batch.setLastWalPosition(-1L);
067    batch.setEndOfFile(true);
068    return batch;
069  }
070
071  public void addEntry(Entry entry, long entrySize) {
072    walEntriesWithSize.add(new Pair<>(entry, entrySize));
073  }
074
075  /** Returns the WAL Entries. */
076  public List<Entry> getWalEntries() {
077    return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList());
078  }
079
080  /** Returns the WAL Entries. */
081  public List<Pair<Entry, Long>> getWalEntriesWithSize() {
082    return walEntriesWithSize;
083  }
084
085  /** Returns the path of the last WAL that was read. */
086  public Path getLastWalPath() {
087    return lastWalPath;
088  }
089
090  public void setLastWalPath(Path lastWalPath) {
091    this.lastWalPath = lastWalPath;
092  }
093
094  /** Returns the position in the last WAL that was read. */
095  public long getLastWalPosition() {
096    return lastWalPosition;
097  }
098
099  public void setLastWalPosition(long lastWalPosition) {
100    this.lastWalPosition = lastWalPosition;
101  }
102
103  public int getNbEntries() {
104    return walEntriesWithSize.size();
105  }
106
107  /** Returns the number of distinct row keys in this batch */
108  public int getNbRowKeys() {
109    return nbRowKeys;
110  }
111
112  /** Returns the number of HFiles in this batch */
113  public int getNbHFiles() {
114    return nbHFiles;
115  }
116
117  /** Returns total number of operations in this batch */
118  public int getNbOperations() {
119    return getNbRowKeys() + getNbHFiles();
120  }
121
122  /** Returns the heap size of this batch */
123  public long getHeapSize() {
124    return heapSize;
125  }
126
127  /** Returns the last sequenceid for each region if the table has serial-replication scope */
128  public Map<String, Long> getLastSeqIds() {
129    return lastSeqIds;
130  }
131
132  public boolean isEndOfFile() {
133    return endOfFile;
134  }
135
136  public void setEndOfFile(boolean endOfFile) {
137    this.endOfFile = endOfFile;
138  }
139
140  public void incrementNbRowKeys(int increment) {
141    nbRowKeys += increment;
142  }
143
144  public void incrementNbHFiles(int increment) {
145    nbHFiles += increment;
146  }
147
148  public void incrementHeapSize(long increment) {
149    heapSize += increment;
150  }
151
152  public void setLastSeqId(String region, long sequenceId) {
153    lastSeqIds.put(region, sequenceId);
154  }
155
156  @Override
157  public String toString() {
158    return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
159      + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
160      + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
161      + endOfFile + "]";
162  }
163}