001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.stream.Collectors; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.util.Pair; 027import org.apache.hadoop.hbase.wal.WAL.Entry; 028import org.apache.hadoop.hbase.wal.WALEdit; 029import org.apache.hadoop.hbase.wal.WALKey; 030import org.apache.yetus.audience.InterfaceAudience; 031 032/** 033 * Holds a batch of WAL entries to replicate, along with some statistics 034 */ 035@InterfaceAudience.Private 036class WALEntryBatch { 037 038 // used by recovered replication queue to indicate that all the entries have been read. 039 public static final WALEntryBatch NO_MORE_DATA = new WALEntryBatch(0, null); 040 041 private List<Pair<Entry, Long>> walEntriesWithSize; 042 043 // last WAL that was read 044 private Path lastWalPath; 045 // position in WAL of last entry in this batch 046 private long lastWalPosition = 0; 047 // number of distinct row keys in this batch 048 private int nbRowKeys = 0; 049 // number of HFiles 050 private int nbHFiles = 0; 051 // heap size of data we need to replicate 052 private long heapSize = 0; 053 // save the last sequenceid for each region if the table has serial-replication scope 054 private Map<String, Long> lastSeqIds = new HashMap<>(); 055 // indicate that this is the end of the current file 056 private boolean endOfFile; 057 // indicate the buffer size used, which is added to 058 // ReplicationSourceWALReader.totalBufferUsed 059 private long usedBufferSize; 060 061 /** 062 * @param lastWalPath Path of the WAL the last entry in this batch was read from 063 */ 064 WALEntryBatch(int maxNbEntries, Path lastWalPath) { 065 this.walEntriesWithSize = new ArrayList<>(maxNbEntries); 066 this.lastWalPath = lastWalPath; 067 } 068 069 static WALEntryBatch endOfFile(Path lastWalPath) { 070 WALEntryBatch batch = new WALEntryBatch(0, lastWalPath); 071 batch.setLastWalPosition(-1L); 072 batch.setEndOfFile(true); 073 return batch; 074 } 075 076 public void addEntry(Entry entry, long entrySize) { 077 walEntriesWithSize.add(new Pair<>(entry, entrySize)); 078 } 079 080 /** Returns the WAL Entries. */ 081 public List<Entry> getWalEntries() { 082 return walEntriesWithSize.stream().map(Pair::getFirst).collect(Collectors.toList()); 083 } 084 085 /** Returns the WAL Entries. */ 086 public List<Pair<Entry, Long>> getWalEntriesWithSize() { 087 return walEntriesWithSize; 088 } 089 090 /** Returns the path of the last WAL that was read. */ 091 public Path getLastWalPath() { 092 return lastWalPath; 093 } 094 095 public void setLastWalPath(Path lastWalPath) { 096 this.lastWalPath = lastWalPath; 097 } 098 099 /** Returns the position in the last WAL that was read. */ 100 public long getLastWalPosition() { 101 return lastWalPosition; 102 } 103 104 public void setLastWalPosition(long lastWalPosition) { 105 this.lastWalPosition = lastWalPosition; 106 } 107 108 public int getNbEntries() { 109 return walEntriesWithSize.size(); 110 } 111 112 /** Returns the number of distinct row keys in this batch */ 113 public int getNbRowKeys() { 114 return nbRowKeys; 115 } 116 117 /** Returns the number of HFiles in this batch */ 118 public int getNbHFiles() { 119 return nbHFiles; 120 } 121 122 /** Returns total number of operations in this batch */ 123 public int getNbOperations() { 124 return getNbRowKeys() + getNbHFiles(); 125 } 126 127 /** Returns the heap size of this batch */ 128 public long getHeapSize() { 129 return heapSize; 130 } 131 132 /** Returns the last sequenceid for each region if the table has serial-replication scope */ 133 public Map<String, Long> getLastSeqIds() { 134 return lastSeqIds; 135 } 136 137 public boolean isEndOfFile() { 138 return endOfFile; 139 } 140 141 public void setEndOfFile(boolean endOfFile) { 142 this.endOfFile = endOfFile; 143 } 144 145 public void incrementNbRowKeys(int increment) { 146 nbRowKeys += increment; 147 } 148 149 public void incrementNbHFiles(int increment) { 150 nbHFiles += increment; 151 } 152 153 public void incrementHeapSize(long increment) { 154 heapSize += increment; 155 } 156 157 public void setLastSeqId(String region, long sequenceId) { 158 lastSeqIds.put(region, sequenceId); 159 } 160 161 public long incrementUsedBufferSize(Entry entry) { 162 long increment = getEntrySizeExcludeBulkLoad(entry); 163 usedBufferSize += increment; 164 return increment; 165 } 166 167 public long getUsedBufferSize() { 168 return this.usedBufferSize; 169 } 170 171 @Override 172 public String toString() { 173 return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath 174 + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles=" 175 + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile=" 176 + endOfFile + ",usedBufferSize=" + usedBufferSize + "]"; 177 } 178 179 static long getEntrySizeExcludeBulkLoad(Entry entry) { 180 WALEdit edit = entry.getEdit(); 181 WALKey key = entry.getKey(); 182 return edit.heapSize() + key.estimatedSerializedSizeOf(); 183 } 184}