001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.locks.Lock; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.CellScanner; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; 030import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 031import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 032import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 033import org.apache.hadoop.hbase.util.KeyLocker; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALFactory; 039import org.apache.hadoop.hbase.wal.WALStreamReader; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; 049 050/** 051 * This callable executed at RS side to replay sync replication wal. 052 */ 053@InterfaceAudience.Private 054public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class); 057 058 private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 059 "hbase.replay.sync.replication.wal.batch.size"; 060 061 private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024; 062 063 private String peerId; 064 065 private List<String> wals = new ArrayList<>(); 066 067 private long batchSize; 068 069 private final KeyLocker<String> peersLock = new KeyLocker<>(); 070 071 @Override 072 protected void doCall() throws Exception { 073 LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); 074 if (rs.getReplicationSinkService() != null) { 075 Lock peerLock = peersLock.acquireLock(wals.get(0)); 076 try { 077 for (String wal : wals) { 078 replayWAL(wal); 079 } 080 } finally { 081 peerLock.unlock(); 082 } 083 } 084 } 085 086 @Override 087 protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { 088 ReplaySyncReplicationWALParameter param = 089 ReplaySyncReplicationWALParameter.parseFrom(parameter); 090 this.peerId = param.getPeerId(); 091 param.getWalList().forEach(this.wals::add); 092 this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, 093 DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); 094 } 095 096 @Override 097 public EventType getEventType() { 098 return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; 099 } 100 101 private void replayWAL(String wal) throws IOException { 102 WALStreamReader reader = getReader(wal); 103 if (reader == null) { 104 return; 105 } 106 try { 107 List<Entry> entries = readWALEntries(reader, wal); 108 while (!entries.isEmpty()) { 109 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil 110 .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); 111 ReplicateWALEntryRequest request = pair.getFirst(); 112 rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), pair.getSecond(), 113 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 114 request.getSourceHFileArchiveDirPath()); 115 // Read next entries. 116 entries = readWALEntries(reader, wal); 117 } 118 } finally { 119 reader.close(); 120 } 121 } 122 123 private WALStreamReader getReader(String wal) throws IOException { 124 Path path = new Path(rs.getWALRootDir(), wal); 125 try { 126 RecoverLeaseFSUtils.recoverFileLease(rs.getWALFileSystem(), path, rs.getConfiguration()); 127 return WALFactory.createStreamReader(rs.getWALFileSystem(), path, rs.getConfiguration()); 128 } catch (WALHeaderEOFException e) { 129 LOG.warn("EOF while opening WAL reader for {}", path, e); 130 return null; 131 } 132 } 133 134 // return whether we should include this entry. 135 private boolean filter(Entry entry) { 136 WALEdit edit = entry.getEdit(); 137 WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c); 138 return !edit.isEmpty(); 139 } 140 141 private List<Entry> readWALEntries(WALStreamReader reader, String wal) throws IOException { 142 List<Entry> entries = new ArrayList<>(); 143 if (reader == null) { 144 return entries; 145 } 146 long size = 0; 147 for (;;) { 148 Entry entry; 149 try { 150 entry = reader.next(); 151 } catch (EOFException e) { 152 LOG.info("EOF while reading WAL entries from {}: {}, continuing", wal, e.toString()); 153 break; 154 } 155 if (entry == null) { 156 break; 157 } 158 if (filter(entry)) { 159 entries.add(entry); 160 size += entry.getEdit().heapSize(); 161 if (size > batchSize) { 162 break; 163 } 164 } 165 } 166 return entries; 167 } 168}