001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.util.Map;
021import java.util.Queue;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
027import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.apache.yetus.audience.InterfaceStability;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/*
034  Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
035  just at one place.
036 */
037@InterfaceAudience.Private
038@InterfaceStability.Evolving
039public class ReplicationSourceLogQueue {
040  private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
041  // Queues of logs to process, entry in format of walGroupId->queue,
042  // each presents a queue for one wal group
043  private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
044  private MetricsSource metrics;
045  private Configuration conf;
046  // per group queue size, keep no more than this number of logs in each wal group
047  private int queueSizePerGroup;
048  // WARN threshold for the number of queued logs, defaults to 2
049  private int logQueueWarnThreshold;
050  private ReplicationSource source;
051
052  public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics,
053    ReplicationSource source) {
054    this.conf = conf;
055    this.metrics = metrics;
056    this.source = source;
057    this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
058    this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
059  }
060
061  /**
062   * Enqueue the wal
063   * @param wal        wal to be enqueued
064   * @param walGroupId Key for the wal in @queues map
065   * @return boolean whether this is the first time we are seeing this walGroupId.
066   */
067  public boolean enqueueLog(Path wal, String walGroupId) {
068    boolean exists = false;
069    PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
070    if (queue == null) {
071      queue = new PriorityBlockingQueue<>(queueSizePerGroup,
072        new AbstractFSWALProvider.WALStartTimeComparator());
073      // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
074      // the shipper may quit immediately
075      queue.put(wal);
076      queues.put(walGroupId, queue);
077    } else {
078      exists = true;
079      queue.put(wal);
080    }
081    // Increment size of logQueue
082    this.metrics.incrSizeOfLogQueue();
083    // Compute oldest wal age
084    this.metrics.setOldestWalAge(getOldestWalAge());
085    // This will wal a warning for each new wal that gets created above the warn threshold
086    int queueSize = queue.size();
087    if (queueSize > this.logQueueWarnThreshold) {
088      LOG.warn(
089        "{} WAL group {} queue size: {} exceeds value of " + "replication.source.log.queue.warn {}",
090        source.logPeerId(), walGroupId, queueSize, logQueueWarnThreshold);
091    }
092    return exists;
093  }
094
095  /**
096   * Get the queue size for the given walGroupId.
097   * @param walGroupId walGroupId
098   */
099  public int getQueueSize(String walGroupId) {
100    Queue<Path> queue = queues.get(walGroupId);
101    if (queue == null) {
102      return 0;
103    }
104    return queue.size();
105  }
106
107  /**
108   * Returns number of queues.
109   */
110  public int getNumQueues() {
111    return queues.size();
112  }
113
114  public Map<String, PriorityBlockingQueue<Path>> getQueues() {
115    return queues;
116  }
117
118  /**
119   * Return queue for the given walGroupId Please don't add or remove elements from the returned
120   * queue. Use {@link #enqueueLog(Path, String)} and {@link #remove(String)} methods respectively.
121   * @param walGroupId walGroupId
122   */
123  public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
124    return queues.get(walGroupId);
125  }
126
127  /**
128   * Remove head from the queue corresponding to given walGroupId.
129   * @param walGroupId walGroupId
130   */
131  public void remove(String walGroupId) {
132    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
133    if (queue == null || queue.isEmpty()) {
134      return;
135    }
136    queue.remove();
137    // Decrease size logQueue.
138    this.metrics.decrSizeOfLogQueue();
139    // Re-compute age of oldest wal metric.
140    this.metrics.setOldestWalAge(getOldestWalAge());
141  }
142
143  /**
144   * Remove all the elements from the queue corresponding to walGroupId
145   * @param walGroupId walGroupId
146   */
147  public void clear(String walGroupId) {
148    PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
149    while (!queue.isEmpty()) {
150      // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
151      queue.remove();
152      metrics.decrSizeOfLogQueue();
153    }
154    this.metrics.setOldestWalAge(getOldestWalAge());
155  }
156
157  /*
158   * Returns the age of oldest wal.
159   */
160  long getOldestWalAge() {
161    long now = EnvironmentEdgeManager.currentTime();
162    long timestamp = getOldestWalTimestamp();
163    if (timestamp == Long.MAX_VALUE) {
164      // If there are no wals in the queue then set the oldest wal timestamp to current time
165      // so that the oldest wal age will be 0.
166      timestamp = now;
167    }
168    long age = now - timestamp;
169    return age;
170  }
171
172  /*
173   * Get the oldest wal timestamp from all the queues.
174   */
175  private long getOldestWalTimestamp() {
176    long oldestWalTimestamp = Long.MAX_VALUE;
177    for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
178      PriorityBlockingQueue<Path> queue = entry.getValue();
179      Path path = queue.peek();
180      // Can path ever be null ?
181      if (path != null) {
182        oldestWalTimestamp =
183          Math.min(oldestWalTimestamp, AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
184      }
185    }
186    return oldestWalTimestamp;
187  }
188
189  public MetricsSource getMetrics() {
190    return metrics;
191  }
192}