001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.replication.WALEntryFilter; 026import org.apache.hadoop.hbase.util.Bytes; 027import org.apache.hadoop.hbase.wal.WAL.Entry; 028import org.apache.yetus.audience.InterfaceAudience; 029 030/** 031 * WAL reader for a serial replication peer. 032 */ 033@InterfaceAudience.Private 034public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader { 035 036 // used to store the first cell in an entry before filtering. This is because that if serial 037 // replication is enabled, we may find out that an entry can not be pushed after filtering. And 038 // when we try the next time, the cells maybe null since the entry has already been filtered, 039 // especially for region event wal entries. And this can also used to determine whether we can 040 // skip filtering. 041 private Cell firstCellInEntryBeforeFiltering; 042 043 private final SerialReplicationChecker checker; 044 045 public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf, 046 ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter, 047 ReplicationSource source, String walGroupId) { 048 super(fs, conf, logQueue, startPosition, filter, source, walGroupId); 049 checker = new SerialReplicationChecker(conf, source); 050 } 051 052 @Override 053 protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) 054 throws IOException, InterruptedException { 055 Path currentPath = entryStream.getCurrentPath(); 056 long positionBefore = entryStream.getPosition(); 057 for (;;) { 058 Entry entry = entryStream.peek(); 059 boolean doFiltering = true; 060 if (firstCellInEntryBeforeFiltering == null) { 061 assert !entry.getEdit().isEmpty() : "should not write empty edits"; 062 // Used to locate the region record in meta table. In WAL we only have the table name and 063 // encoded region name which can not be mapping to region name without scanning all the 064 // records for a table, so we need a start key, just like what we have done at client side 065 // when locating a region. For the markers, we will use the start key of the region as the 066 // row key for the edit. And we need to do this before filtering since all the cells may 067 // be filtered out, especially that for the markers. 068 firstCellInEntryBeforeFiltering = entry.getEdit().getCells().get(0); 069 } else { 070 // if this is not null then we know that the entry has already been filtered. 071 doFiltering = false; 072 } 073 074 if (doFiltering) { 075 entry = filterEntry(entry); 076 } 077 if (entry != null) { 078 if (!checker.canPush(entry, firstCellInEntryBeforeFiltering)) { 079 if (batch.getLastWalPosition() > positionBefore) { 080 // we have something that can push, break 081 break; 082 } else { 083 checker.waitUntilCanPush(entry, firstCellInEntryBeforeFiltering); 084 } 085 } 086 // arrive here means we can push the entry, record the last sequence id 087 batch.setLastSeqId(Bytes.toString(entry.getKey().getEncodedRegionName()), 088 entry.getKey().getSequenceId()); 089 // actually remove the entry. 090 removeEntryFromStream(entryStream, batch); 091 if (addEntryToBatch(batch, entry)) { 092 break; 093 } 094 } else { 095 // actually remove the entry. 096 removeEntryFromStream(entryStream, batch); 097 } 098 boolean hasNext = entryStream.hasNext(); 099 // always return if we have switched to a new file. 100 if (switched(entryStream, currentPath)) { 101 batch.setEndOfFile(true); 102 break; 103 } 104 if (!hasNext) { 105 break; 106 } 107 } 108 } 109 110 private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) 111 throws IOException { 112 entryStream.next(); 113 firstCellInEntryBeforeFiltering = null; 114 batch.setLastWalPosition(entryStream.getPosition()); 115 } 116}