001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import java.io.IOException;
021import java.util.Optional;
022import java.util.concurrent.SynchronousQueue;
023import java.util.concurrent.ThreadPoolExecutor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
028import org.apache.hadoop.hbase.regionserver.HRegion;
029import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
030import org.apache.hadoop.hbase.regionserver.RegionServerServices;
031import org.apache.hadoop.util.StringUtils;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
037
038/**
039 * Manager the buffer size for all {@link RegionReplicationSink}.
040 * <p/>
041 * If the buffer size exceeds the soft limit, we will find out the region with largest pending size
042 * and trigger a flush, so it can drop all the pending entries and save memories.
043 * <p/>
044 * If the buffer size exceeds the hard limit, we will return {@code false} for
045 * {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately.
046 */
047@InterfaceAudience.Private
048public class RegionReplicationBufferManager {
049
050  private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class);
051
052  /**
053   * This is the total size of pending entries for all the sinks.
054   */
055  public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
056
057  public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024;
058
059  public static final String SOFT_LIMIT_PERCENTAGE =
060    "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage";
061
062  public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f;
063
064  private final RegionServerServices rsServices;
065
066  private final long maxPendingSize;
067
068  private final long softMaxPendingSize;
069
070  private final AtomicLong pendingSize = new AtomicLong();
071
072  private final ThreadPoolExecutor executor;
073
074  public RegionReplicationBufferManager(RegionServerServices rsServices) {
075    this.rsServices = rsServices;
076    Configuration conf = rsServices.getConfiguration();
077    this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT);
078    this.softMaxPendingSize =
079      (long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize);
080    this.executor = new ThreadPoolExecutor(
081      1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder()
082        .setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(),
083      (r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one"));
084    executor.allowCoreThreadTimeOut(true);
085  }
086
087  private void flush() {
088    long max = Long.MIN_VALUE;
089    HRegion toFlush = null;
090    for (HRegion region : rsServices.getRegions()) {
091      Optional<RegionReplicationSink> sink = region.getRegionReplicationSink();
092      if (sink.isPresent()) {
093        RegionReplicationSink s = sink.get();
094        long p = s.pendingSize();
095        if (p > max) {
096          max = p;
097          toFlush = region;
098        }
099      }
100    }
101    if (toFlush != null) {
102      // here we need to write flush marker out, so we can drop all the pending edits in the region
103      // replication sink.
104      try {
105        LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(),
106          StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1));
107        FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
108        if (!result.isFlushSucceeded()) {
109          LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(),
110            result.getResult());
111        }
112      } catch (IOException e) {
113        LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e);
114      }
115    } else {
116      // usually this should not happen but since the flush operation is async, theoretically it
117      // could happen. Let's log it, no real harm.
118      LOG.warn("Can not find a region to flush");
119    }
120  }
121
122  /**
123   * Return whether we should just drop all the edits, if we have reached the hard limit of max
124   * pending size.
125   * @return {@code true} means OK, {@code false} means drop all the edits.
126   */
127  public boolean increase(long size) {
128    long sz = pendingSize.addAndGet(size);
129    if (sz > softMaxPendingSize) {
130      executor.execute(this::flush);
131    }
132    return sz <= maxPendingSize;
133  }
134
135  /**
136   * Called after you ship the edits out.
137   */
138  public void decrease(long size) {
139    pendingSize.addAndGet(-size);
140  }
141
142  public void stop() {
143    executor.shutdown();
144  }
145}