001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import com.google.errorprone.annotations.RestrictedApi;
021import java.io.IOException;
022import java.util.ArrayDeque;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Optional;
028import java.util.Queue;
029import java.util.Set;
030import java.util.TreeSet;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.stream.Collectors;
034import org.agrona.collections.IntHashSet;
035import org.apache.commons.lang3.mutable.MutableObject;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Cell;
038import org.apache.hadoop.hbase.CellUtil;
039import org.apache.hadoop.hbase.client.AsyncClusterConnection;
040import org.apache.hadoop.hbase.client.RegionInfo;
041import org.apache.hadoop.hbase.client.RegionReplicaUtil;
042import org.apache.hadoop.hbase.client.TableDescriptor;
043import org.apache.hadoop.hbase.ipc.ServerCall;
044import org.apache.hadoop.hbase.util.Bytes;
045import org.apache.hadoop.hbase.util.FutureUtils;
046import org.apache.hadoop.hbase.wal.WAL;
047import org.apache.hadoop.hbase.wal.WALEdit;
048import org.apache.hadoop.hbase.wal.WALKeyImpl;
049import org.apache.hadoop.util.StringUtils;
050import org.apache.yetus.audience.InterfaceAudience;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
055
056import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
058
059/**
060 * The class for replicating WAL edits to secondary replicas, one instance per region.
061 */
062@InterfaceAudience.Private
063public class RegionReplicationSink {
064
065  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
066
067  public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
068
069  public static final int RETRIES_NUMBER_DEFAULT = 3;
070
071  public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
072
073  public static final long RPC_TIMEOUT_MS_DEFAULT = 1000;
074
075  public static final String OPERATION_TIMEOUT_MS =
076    "hbase.region.read-replica.sink.operation.timeout.ms";
077
078  public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000;
079
080  // the two options below are for replicating meta edits, as usually a meta edit will trigger a
081  // refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta
082  // edit is more important for fixing inconsistent state so it worth to wait for more time.
083  public static final String META_EDIT_RPC_TIMEOUT_MS =
084    "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
085
086  public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000;
087
088  public static final String META_EDIT_OPERATION_TIMEOUT_MS =
089    "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
090
091  public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000;
092
093  public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
094
095  public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024;
096
097  public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
098
099  public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
100
101  private static final class SinkEntry {
102
103    final WALKeyImpl key;
104
105    final WALEdit edit;
106
107    final ServerCall<?> rpcCall;
108
109    final long size;
110
111    SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
112      this.key = key;
113      this.edit = edit;
114      this.rpcCall = rpcCall;
115      this.size = key.estimatedSerializedSizeOf() + edit.estimatedSerializedSizeOf();
116      if (rpcCall != null) {
117        // increase the reference count to avoid the rpc framework free the memory before we
118        // actually sending them out.
119        rpcCall.retainByWAL();
120      }
121    }
122
123    /**
124     * Should be called regardless of the result of the replicating operation. Unless you still want
125     * to reuse this entry, otherwise you must call this method to release the possible off heap
126     * memories.
127     */
128    void replicated() {
129      if (rpcCall != null) {
130        rpcCall.releaseByWAL();
131      }
132    }
133  }
134
135  private final RegionInfo primary;
136
137  private final TableDescriptor tableDesc;
138
139  // store it here to avoid passing it every time when calling TableDescriptor.getRegionReplication.
140  private final int regionReplication;
141
142  private final RegionReplicationBufferManager manager;
143
144  private final RegionReplicationFlushRequester flushRequester;
145
146  private final AsyncClusterConnection conn;
147
148  // used to track the replicas which we failed to replicate edits to them
149  // the key is the replica id, the value is the sequence id of the last failed edit
150  // when we get a flush all request, we will try to remove a replica from this map, the key point
151  // here is the flush sequence number must be greater than the failed sequence id, otherwise we
152  // should not remove the replica from this map
153  private final IntHashSet failedReplicas;
154
155  private final Queue<SinkEntry> entries = new ArrayDeque<>();
156
157  private final int retries;
158
159  private final long rpcTimeoutNs;
160
161  private final long operationTimeoutNs;
162
163  private final long metaEditRpcTimeoutNs;
164
165  private final long metaEditOperationTimeoutNs;
166
167  private final long batchSizeCapacity;
168
169  private final long batchCountCapacity;
170
171  private volatile long pendingSize;
172
173  private long lastFlushedSequenceId;
174
175  private boolean sending;
176
177  private boolean stopping;
178
179  private boolean stopped;
180
181  public RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
182    RegionReplicationBufferManager manager, Runnable flushRequester, AsyncClusterConnection conn) {
183    Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
184      primary);
185    this.regionReplication = td.getRegionReplication();
186    Preconditions.checkArgument(this.regionReplication > 1,
187      "region replication should be greater than 1 but got %s", this.regionReplication);
188    this.primary = primary;
189    this.tableDesc = td;
190    this.manager = manager;
191    this.flushRequester = new RegionReplicationFlushRequester(conf, flushRequester);
192    this.conn = conn;
193    this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
194    this.rpcTimeoutNs =
195      TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
196    this.operationTimeoutNs = TimeUnit.MILLISECONDS
197      .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
198    this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS
199      .toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT));
200    this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
201      conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT));
202    this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
203    this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
204    this.failedReplicas = new IntHashSet(regionReplication - 1);
205  }
206
207  void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) {
208    long maxSequenceId = Long.MIN_VALUE;
209    long toReleaseSize = 0;
210    for (SinkEntry entry : sent) {
211      maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
212      entry.replicated();
213      toReleaseSize += entry.size;
214    }
215    manager.decrease(toReleaseSize);
216    synchronized (entries) {
217      pendingSize -= toReleaseSize;
218      boolean addFailedReplicas = false;
219      for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
220        Integer replicaId = entry.getKey();
221        Throwable error = entry.getValue().getValue();
222        if (error != null) {
223          if (maxSequenceId > lastFlushedSequenceId) {
224            LOG.warn(
225              "Failed to replicate to secondary replica {} for {}, since the max sequence"
226                + " id of sunk entris is {}, which is greater than the last flush SN {},"
227                + " we will stop replicating for a while and trigger a flush",
228              replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
229            failedReplicas.add(replicaId);
230            addFailedReplicas = true;
231          } else {
232            LOG.warn(
233              "Failed to replicate to secondary replica {} for {}, since the max sequence"
234                + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
235                + " we will not stop replicating",
236              replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
237          }
238        }
239      }
240
241      if (addFailedReplicas) {
242        flushRequester.requestFlush(maxSequenceId);
243      }
244      sending = false;
245      if (stopping) {
246        stopped = true;
247        entries.notifyAll();
248        return;
249      }
250      if (!entries.isEmpty()) {
251        send();
252      }
253    }
254  }
255
256  private void send() {
257    List<SinkEntry> toSend = new ArrayList<>();
258    long totalSize = 0L;
259    boolean hasMetaEdit = false;
260    for (SinkEntry entry;;) {
261      entry = entries.poll();
262      if (entry == null) {
263        break;
264      }
265      toSend.add(entry);
266      totalSize += entry.size;
267      hasMetaEdit |= entry.edit.isMetaEdit();
268      if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
269        break;
270      }
271    }
272    int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
273    if (toSendReplicaCount <= 0) {
274      return;
275    }
276    long rpcTimeoutNsToUse;
277    long operationTimeoutNsToUse;
278    if (!hasMetaEdit) {
279      rpcTimeoutNsToUse = rpcTimeoutNs;
280      operationTimeoutNsToUse = operationTimeoutNs;
281    } else {
282      rpcTimeoutNsToUse = metaEditRpcTimeoutNs;
283      operationTimeoutNsToUse = metaEditOperationTimeoutNs;
284    }
285    sending = true;
286    List<WAL.Entry> walEntries =
287      toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
288    AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
289    Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
290    for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
291      if (failedReplicas.contains(replicaId)) {
292        continue;
293      }
294      MutableObject<Throwable> error = new MutableObject<>();
295      replica2Error.put(replicaId, error);
296      RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
297      FutureUtils.addListener(
298        conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse),
299        (r, e) -> {
300          error.setValue(e);
301          if (remaining.decrementAndGet() == 0) {
302            onComplete(toSend, replica2Error);
303          }
304        });
305    }
306  }
307
308  private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
309    if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) {
310      // this means the memstore is empty, which means all data before this sequence id are flushed
311      // out, so it equals to a flush all, return true
312      return true;
313    }
314    if (flushDesc.getAction() != FlushAction.START_FLUSH) {
315      return false;
316    }
317    Set<byte[]> storesFlushed =
318      flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
319        .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
320    if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
321      return false;
322    }
323    return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
324  }
325
326  Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
327    if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
328      return Optional.empty();
329    }
330    FlushDescriptor flushDesc;
331    try {
332      flushDesc = WALEdit.getFlushDescriptor(metaCell);
333    } catch (IOException e) {
334      LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
335      return Optional.empty();
336    }
337    if (flushDesc != null && isStartFlushAllStores(flushDesc)) {
338      return Optional.of(flushDesc);
339    } else {
340      return Optional.empty();
341    }
342  }
343
344  private long clearAllEntries() {
345    long toClearSize = 0;
346    for (SinkEntry entry : entries) {
347      toClearSize += entry.size;
348      entry.replicated();
349    }
350    entries.clear();
351    pendingSize -= toClearSize;
352    manager.decrease(toClearSize);
353    return toClearSize;
354  }
355
356  /**
357   * Add this edit to replication queue.
358   * <p/>
359   * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
360   * rpc call has cell scanner, which is off heap.
361   */
362  public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
363    if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
364      // only replicate meta edit if region memstore replication is not enabled
365      return;
366    }
367    synchronized (entries) {
368      if (stopping) {
369        return;
370      }
371      if (edit.isMetaEdit()) {
372        // check whether we flushed all stores, which means we could drop all the previous edits,
373        // and also, recover from the previous failure of some replicas
374        for (Cell metaCell : edit.getCells()) {
375          getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
376            long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
377            lastFlushedSequenceId = flushSequenceNumber;
378            long clearedCount = entries.size();
379            long clearedSize = clearAllEntries();
380            if (LOG.isDebugEnabled()) {
381              LOG.debug(
382                "Got a flush all request with sequence id {}, clear {} pending"
383                  + " entries with size {}, clear failed replicas {}",
384                flushSequenceNumber, clearedCount,
385                StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
386                failedReplicas);
387            }
388            failedReplicas.clear();
389            flushRequester.recordFlush(flushSequenceNumber);
390          });
391        }
392      }
393      if (failedReplicas.size() == regionReplication - 1) {
394        // this means we have marked all the replicas as failed, so just give up here
395        return;
396      }
397      SinkEntry entry = new SinkEntry(key, edit, rpcCall);
398      entries.add(entry);
399      pendingSize += entry.size;
400      if (manager.increase(entry.size)) {
401        if (!sending) {
402          send();
403        }
404      } else {
405        // we have run out of the max pending size, drop all the edits, and mark all replicas as
406        // failed
407        clearAllEntries();
408        for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
409          failedReplicas.add(replicaId);
410        }
411        flushRequester.requestFlush(entry.key.getSequenceId());
412      }
413    }
414  }
415
416  long pendingSize() {
417    return pendingSize;
418  }
419
420  /**
421   * Stop the replication sink.
422   * <p/>
423   * Usually this should only be called when you want to close a region.
424   */
425  public void stop() {
426    synchronized (entries) {
427      stopping = true;
428      clearAllEntries();
429      if (!sending) {
430        stopped = true;
431        entries.notifyAll();
432      }
433    }
434  }
435
436  /**
437   * Make sure that we have finished all the replicating requests.
438   * <p/>
439   * After returning, we can make sure there will be no new replicating requests to secondary
440   * replicas.
441   * <p/>
442   * This is used to keep the replicating order the same with the WAL edit order when writing.
443   */
444  public void waitUntilStopped() throws InterruptedException {
445    synchronized (entries) {
446      while (!stopped) {
447        entries.wait();
448      }
449    }
450  }
451
452  @RestrictedApi(explanation = "Should only be called in tests", link = "",
453      allowedOnPath = ".*/src/test/.*")
454  IntHashSet getFailedReplicas() {
455    synchronized (entries) {
456      return this.failedReplicas;
457    }
458  }
459}