001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.util.Map; 021import java.util.Queue; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 027import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 028import org.apache.yetus.audience.InterfaceAudience; 029import org.apache.yetus.audience.InterfaceStability; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/* 034 Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics 035 just at one place. 036 */ 037@InterfaceAudience.Private 038@InterfaceStability.Evolving 039public class ReplicationSourceLogQueue { 040 private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); 041 // Queues of logs to process, entry in format of walGroupId->queue, 042 // each presents a queue for one wal group 043 private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>(); 044 private MetricsSource metrics; 045 private Configuration conf; 046 // per group queue size, keep no more than this number of logs in each wal group 047 private int queueSizePerGroup; 048 // WARN threshold for the number of queued logs, defaults to 2 049 private int logQueueWarnThreshold; 050 private ReplicationSource source; 051 052 public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics, 053 ReplicationSource source) { 054 this.conf = conf; 055 this.metrics = metrics; 056 this.source = source; 057 this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32); 058 this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2); 059 } 060 061 /** 062 * Enqueue the wal 063 * @param wal wal to be enqueued 064 * @param walGroupId Key for the wal in @queues map 065 * @return boolean whether this is the first time we are seeing this walGroupId. 066 */ 067 public boolean enqueueLog(Path wal, String walGroupId) { 068 boolean exists = false; 069 PriorityBlockingQueue<Path> queue = queues.get(walGroupId); 070 if (queue == null) { 071 queue = new PriorityBlockingQueue<>(queueSizePerGroup, 072 new AbstractFSWALProvider.WALStartTimeComparator()); 073 // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise 074 // the shipper may quit immediately 075 queue.put(wal); 076 queues.put(walGroupId, queue); 077 } else { 078 exists = true; 079 queue.put(wal); 080 } 081 // Increment size of logQueue 082 this.metrics.incrSizeOfLogQueue(); 083 // Compute oldest wal age 084 this.metrics.setOldestWalAge(getOldestWalAge()); 085 // This will wal a warning for each new wal that gets created above the warn threshold 086 int queueSize = queue.size(); 087 if (queueSize > this.logQueueWarnThreshold) { 088 LOG.warn( 089 "{} WAL group {} queue size: {} exceeds value of " + "replication.source.log.queue.warn {}", 090 source.logPeerId(), walGroupId, queueSize, logQueueWarnThreshold); 091 } 092 return exists; 093 } 094 095 /** 096 * Get the queue size for the given walGroupId. 097 * @param walGroupId walGroupId 098 */ 099 public int getQueueSize(String walGroupId) { 100 Queue<Path> queue = queues.get(walGroupId); 101 if (queue == null) { 102 return 0; 103 } 104 return queue.size(); 105 } 106 107 /** 108 * Returns number of queues. 109 */ 110 public int getNumQueues() { 111 return queues.size(); 112 } 113 114 public Map<String, PriorityBlockingQueue<Path>> getQueues() { 115 return queues; 116 } 117 118 /** 119 * Return queue for the given walGroupId Please don't add or remove elements from the returned 120 * queue. Use {@link #enqueueLog(Path, String)} and {@link #remove(String)} methods respectively. 121 * @param walGroupId walGroupId 122 */ 123 public PriorityBlockingQueue<Path> getQueue(String walGroupId) { 124 return queues.get(walGroupId); 125 } 126 127 /** 128 * Remove head from the queue corresponding to given walGroupId. 129 * @param walGroupId walGroupId 130 */ 131 public void remove(String walGroupId) { 132 PriorityBlockingQueue<Path> queue = getQueue(walGroupId); 133 if (queue == null || queue.isEmpty()) { 134 return; 135 } 136 queue.remove(); 137 // Decrease size logQueue. 138 this.metrics.decrSizeOfLogQueue(); 139 // Re-compute age of oldest wal metric. 140 this.metrics.setOldestWalAge(getOldestWalAge()); 141 } 142 143 /** 144 * Remove all the elements from the queue corresponding to walGroupId 145 * @param walGroupId walGroupId 146 */ 147 public void clear(String walGroupId) { 148 PriorityBlockingQueue<Path> queue = getQueue(walGroupId); 149 while (!queue.isEmpty()) { 150 // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1. 151 queue.remove(); 152 metrics.decrSizeOfLogQueue(); 153 } 154 this.metrics.setOldestWalAge(getOldestWalAge()); 155 } 156 157 /* 158 * Returns the age of oldest wal. 159 */ 160 long getOldestWalAge() { 161 long now = EnvironmentEdgeManager.currentTime(); 162 long timestamp = getOldestWalTimestamp(); 163 if (timestamp == Long.MAX_VALUE) { 164 // If there are no wals in the queue then set the oldest wal timestamp to current time 165 // so that the oldest wal age will be 0. 166 timestamp = now; 167 } 168 long age = now - timestamp; 169 return age; 170 } 171 172 /* 173 * Get the oldest wal timestamp from all the queues. 174 */ 175 private long getOldestWalTimestamp() { 176 long oldestWalTimestamp = Long.MAX_VALUE; 177 for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) { 178 PriorityBlockingQueue<Path> queue = entry.getValue(); 179 Path path = queue.peek(); 180 // Can path ever be null ? 181 if (path != null) { 182 oldestWalTimestamp = 183 Math.min(oldestWalTimestamp, AbstractFSWALProvider.WALStartTimeComparator.getTS(path)); 184 } 185 } 186 return oldestWalTimestamp; 187 } 188 189 public MetricsSource getMetrics() { 190 return metrics; 191 } 192}