001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.locks.Lock; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.ExtendedCellScanner; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; 030import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 031import org.apache.hadoop.hbase.regionserver.wal.WALHeaderEOFException; 032import org.apache.hadoop.hbase.regionserver.wal.WALUtil; 033import org.apache.hadoop.hbase.util.KeyLocker; 034import org.apache.hadoop.hbase.util.Pair; 035import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; 036import org.apache.hadoop.hbase.wal.WAL.Entry; 037import org.apache.hadoop.hbase.wal.WALEdit; 038import org.apache.hadoop.hbase.wal.WALFactory; 039import org.apache.hadoop.hbase.wal.WALStreamReader; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 045 046import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; 049 050/** 051 * This callable executed at RS side to replay sync replication wal. 052 */ 053@InterfaceAudience.Private 054public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { 055 056 private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class); 057 058 private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 059 "hbase.replay.sync.replication.wal.batch.size"; 060 061 private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8 * 1024 * 1024; 062 063 private String peerId; 064 065 private List<String> wals = new ArrayList<>(); 066 067 private long batchSize; 068 069 private final KeyLocker<String> peersLock = new KeyLocker<>(); 070 071 @Override 072 protected void doCall() throws Exception { 073 LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); 074 if (rs.getReplicationSinkService() != null) { 075 Lock peerLock = peersLock.acquireLock(wals.get(0)); 076 try { 077 for (String wal : wals) { 078 replayWAL(wal); 079 } 080 } finally { 081 peerLock.unlock(); 082 } 083 } 084 } 085 086 @Override 087 protected void initParameter(byte[] parameter) throws InvalidProtocolBufferException { 088 ReplaySyncReplicationWALParameter param = 089 ReplaySyncReplicationWALParameter.parseFrom(parameter); 090 this.peerId = param.getPeerId(); 091 param.getWalList().forEach(this.wals::add); 092 this.batchSize = rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, 093 DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE); 094 } 095 096 @Override 097 public EventType getEventType() { 098 return EventType.RS_REPLAY_SYNC_REPLICATION_WAL; 099 } 100 101 private void replayWAL(String wal) throws IOException { 102 WALStreamReader reader = getReader(wal); 103 if (reader == null) { 104 return; 105 } 106 try { 107 List<Entry> entries = readWALEntries(reader, wal); 108 while (!entries.isEmpty()) { 109 Pair<AdminProtos.ReplicateWALEntryRequest, ExtendedCellScanner> pair = 110 ReplicationProtobufUtil 111 .buildReplicateWALEntryRequest(entries.toArray(new Entry[entries.size()])); 112 ReplicateWALEntryRequest request = pair.getFirst(); 113 rs.getReplicationSinkService().replicateLogEntries(request.getEntryList(), pair.getSecond(), 114 request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), 115 request.getSourceHFileArchiveDirPath()); 116 // Read next entries. 117 entries = readWALEntries(reader, wal); 118 } 119 } finally { 120 reader.close(); 121 } 122 } 123 124 private WALStreamReader getReader(String wal) throws IOException { 125 Path path = new Path(rs.getWALRootDir(), wal); 126 try { 127 RecoverLeaseFSUtils.recoverFileLease(rs.getWALFileSystem(), path, rs.getConfiguration()); 128 return WALFactory.createStreamReader(rs.getWALFileSystem(), path, rs.getConfiguration()); 129 } catch (WALHeaderEOFException e) { 130 LOG.warn("EOF while opening WAL reader for {}", path, e); 131 return null; 132 } 133 } 134 135 // return whether we should include this entry. 136 private boolean filter(Entry entry) { 137 WALEdit edit = entry.getEdit(); 138 WALUtil.filterCells(edit, c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY) ? null : c); 139 return !edit.isEmpty(); 140 } 141 142 private List<Entry> readWALEntries(WALStreamReader reader, String wal) throws IOException { 143 List<Entry> entries = new ArrayList<>(); 144 if (reader == null) { 145 return entries; 146 } 147 long size = 0; 148 for (;;) { 149 Entry entry; 150 try { 151 entry = reader.next(); 152 } catch (EOFException e) { 153 LOG.info("EOF while reading WAL entries from {}: {}, continuing", wal, e.toString()); 154 break; 155 } 156 if (entry == null) { 157 break; 158 } 159 if (filter(entry)) { 160 entries.add(entry); 161 size += entry.getEdit().heapSize(); 162 if (size > batchSize) { 163 break; 164 } 165 } 166 } 167 return entries; 168 } 169}