001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import java.io.IOException; 021import java.util.Optional; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadPoolExecutor; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 028import org.apache.hadoop.hbase.regionserver.HRegion; 029import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 030import org.apache.hadoop.hbase.regionserver.RegionServerServices; 031import org.apache.hadoop.util.StringUtils; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 037 038/** 039 * Manager the buffer size for all {@link RegionReplicationSink}. 040 * <p/> 041 * If the buffer size exceeds the soft limit, we will find out the region with largest pending size 042 * and trigger a flush, so it can drop all the pending entries and save memories. 043 * <p/> 044 * If the buffer size exceeds the hard limit, we will return {@code false} for 045 * {@link #increase(long)} and let the {@link RegionReplicationSink} to drop the edits immediately. 046 */ 047@InterfaceAudience.Private 048public class RegionReplicationBufferManager { 049 050 private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class); 051 052 /** 053 * This is the total size of pending entries for all the sinks. 054 */ 055 public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size"; 056 057 public static final long MAX_PENDING_SIZE_DEFAULT = 100L * 1024 * 1024; 058 059 public static final String SOFT_LIMIT_PERCENTAGE = 060 "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage"; 061 062 public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f; 063 064 private final RegionServerServices rsServices; 065 066 private final long maxPendingSize; 067 068 private final long softMaxPendingSize; 069 070 private final AtomicLong pendingSize = new AtomicLong(); 071 072 private final ThreadPoolExecutor executor; 073 074 public RegionReplicationBufferManager(RegionServerServices rsServices) { 075 this.rsServices = rsServices; 076 Configuration conf = rsServices.getConfiguration(); 077 this.maxPendingSize = conf.getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT); 078 this.softMaxPendingSize = 079 (long) (conf.getFloat(SOFT_LIMIT_PERCENTAGE, SOFT_LIMIT_PERCENTAGE_DEFAULT) * maxPendingSize); 080 this.executor = new ThreadPoolExecutor( 081 1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>(), new ThreadFactoryBuilder() 082 .setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(), 083 (r, e) -> LOG.debug("A flush task is ongoing, drop the new scheduled one")); 084 executor.allowCoreThreadTimeOut(true); 085 } 086 087 private void flush() { 088 long max = Long.MIN_VALUE; 089 HRegion toFlush = null; 090 for (HRegion region : rsServices.getRegions()) { 091 Optional<RegionReplicationSink> sink = region.getRegionReplicationSink(); 092 if (sink.isPresent()) { 093 RegionReplicationSink s = sink.get(); 094 long p = s.pendingSize(); 095 if (p > max) { 096 max = p; 097 toFlush = region; 098 } 099 } 100 } 101 if (toFlush != null) { 102 // here we need to write flush marker out, so we can drop all the pending edits in the region 103 // replication sink. 104 try { 105 LOG.info("Going to flush {} with {} pending entry size", toFlush.getRegionInfo(), 106 StringUtils.TraditionalBinaryPrefix.long2String(max, "", 1)); 107 FlushResult result = toFlush.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 108 if (!result.isFlushSucceeded()) { 109 LOG.warn("Failed to flush {}, the result is {}", toFlush.getRegionInfo(), 110 result.getResult()); 111 } 112 } catch (IOException e) { 113 LOG.warn("Failed to flush {}", toFlush.getRegionInfo(), e); 114 } 115 } else { 116 // usually this should not happen but since the flush operation is async, theoretically it 117 // could happen. Let's log it, no real harm. 118 LOG.warn("Can not find a region to flush"); 119 } 120 } 121 122 /** 123 * Return whether we should just drop all the edits, if we have reached the hard limit of max 124 * pending size. 125 * @return {@code true} means OK, {@code false} means drop all the edits. 126 */ 127 public boolean increase(long size) { 128 long sz = pendingSize.addAndGet(size); 129 if (sz > softMaxPendingSize) { 130 executor.execute(this::flush); 131 } 132 return sz <= maxPendingSize; 133 } 134 135 /** 136 * Called after you ship the edits out. 137 */ 138 public void decrease(long size) { 139 pendingSize.addAndGet(-size); 140 } 141 142 public void stop() { 143 executor.shutdown(); 144 } 145}