001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.locks.Lock; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.ExtendedCellScanner; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; 030import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 031import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 032import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 033import org.apache.hadoop.hbase.util.KeyLocker; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALFactory; 039import org.apache.hadoop.hbase.wal.WALStreamReader; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; 049 050/** 051 * This callable executed at RS side to replay sync replication wal. 052 */ 053@InterfaceAudience.Private 054public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class); 057 058 private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 059 "hbase.replay.sync.replication.wal.batch.size"; 060 061 private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024; 062 063 private String peerId; 064 065 private List<String> wals = new ArrayList<>(); 066 067 private long batchSize; 068 069 private final KeyLocker<String> peersLock = new KeyLocker<>(); 070 071 @Override 072 protected byte[] doCall() throws Exception { 073 LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); 074 if (rs.getReplicationSinkService() != null) { 075 Lock peerLock = peersLock.acquireLock(wals.get(0)); 076 try { 077 for (String wal : wals) { 078 replayWAL(wal); 079 } 080 } finally { 081 peerLock.unlock(); 082 } 083 } 084 return null; 085 } 086 087 @Override 088 protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { 089 ReplaySyncReplicationWALParameter param = 090 ReplaySyncReplicationWALParameter.parseFrom(parameter); 091 this.peerId = param.getPeerId(); 092 param.getWalList().forEach(this.wals::add); 093 this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, 094 DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); 095 } 096 097 @Override 098 public EventType getEventType() { 099 return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; 100 } 101 102 private void replayWAL(String wal) throws IOException { 103 WALStreamReader reader = getReader(wal); 104 if (reader == null) { 105 return; 106 } 107 try { 108 List<Entry> entries = readWALEntries(reader, wal); 109 while (!entries.isEmpty()) { 110 Pair<AdminProtos.ReplicateWALEntryRequest, ExtendedCellScanner> pair = 111 ReplicationProtobufUtil 112 .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); 113 ReplicateWALEntryRequest request = pair.getFirst(); 114 rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), pair.getSecond(), 115 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 116 request.getSourceHFileArchiveDirPath()); 117 // Read next entries. 118 entries = readWALEntries(reader, wal); 119 } 120 } finally { 121 reader.close(); 122 } 123 } 124 125 private WALStreamReader getReader(String wal) throws IOException { 126 Path path = new Path(rs.getWALRootDir(), wal); 127 try { 128 RecoverLeaseFSUtils.recoverFileLease(rs.getWALFileSystem(), path, rs.getConfiguration()); 129 return WALFactory.createStreamReader(rs.getWALFileSystem(), path, rs.getConfiguration()); 130 } catch (WALHeaderEOFException e) { 131 LOG.warn("EOF while opening WAL reader for {}", path, e); 132 return null; 133 } 134 } 135 136 // return whether we should include this entry. 137 private boolean filter(Entry entry) { 138 WALEdit edit = entry.getEdit(); 139 WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c); 140 return !edit.isEmpty(); 141 } 142 143 private List<Entry> readWALEntries(WALStreamReader reader, String wal) throws IOException { 144 List<Entry> entries = new ArrayList<>(); 145 if (reader == null) { 146 return entries; 147 } 148 long size = 0; 149 for (;;) { 150 Entry entry; 151 try { 152 entry = reader.next(); 153 } catch (EOFException e) { 154 LOG.info("EOF while reading WAL entries from {}: {}, continuing", wal, e.toString()); 155 break; 156 } 157 if (entry == null) { 158 break; 159 } 160 if (filter(entry)) { 161 entries.add(entry); 162 size += entry.getEdit().heapSize(); 163 if (size > batchSize) { 164 break; 165 } 166 } 167 } 168 return entries; 169 } 170}