001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.compaction;
019
020import java.util.List;
021import java.util.Map;
022import java.util.Optional;
023import java.util.Set;
024import java.util.concurrent.locks.ReadWriteLock;
025import java.util.concurrent.locks.ReentrantReadWriteLock;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
029import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
030import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
031import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
032
033@InterfaceAudience.Private
034class ClusterCompactionQueues {
035
036  private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues;
037  private final Set<ServerName> compactingServers;
038  private final ReadWriteLock lock;
039  private final int concurrentServers;
040
041  ClusterCompactionQueues(int concurrentServers) {
042    this.concurrentServers = concurrentServers;
043
044    this.compactionQueues = Maps.newHashMap();
045    this.lock = new ReentrantReadWriteLock();
046    this.compactingServers = Sets.newHashSet();
047  }
048
049  void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) {
050    this.lock.writeLock().lock();
051    try {
052      List<MajorCompactionRequest> result = this.compactionQueues.get(serverName);
053      if (result == null) {
054        result = Lists.newArrayList();
055        compactionQueues.put(serverName, result);
056      }
057      result.add(info);
058    } finally {
059      this.lock.writeLock().unlock();
060    }
061  }
062
063  boolean hasWorkItems() {
064    lock.readLock().lock();
065    try {
066      return !this.compactionQueues.values().stream().allMatch(List::isEmpty);
067    } finally {
068      lock.readLock().unlock();
069    }
070  }
071
072  int getCompactionRequestsLeftToFinish() {
073    lock.readLock().lock();
074    try {
075      int size = 0;
076      for (List<MajorCompactionRequest> queue : compactionQueues.values()) {
077        size += queue.size();
078      }
079      return size;
080    } finally {
081      lock.readLock().unlock();
082    }
083  }
084
085  @VisibleForTesting List<MajorCompactionRequest> getQueue(ServerName serverName) {
086    lock.readLock().lock();
087    try {
088      return compactionQueues.get(serverName);
089    } finally {
090      lock.readLock().unlock();
091    }
092  }
093
094  MajorCompactionRequest reserveForCompaction(ServerName serverName) {
095    lock.writeLock().lock();
096    try {
097      if (!compactionQueues.get(serverName).isEmpty()) {
098        compactingServers.add(serverName);
099        return compactionQueues.get(serverName).remove(0);
100      }
101      return null;
102    } finally {
103      lock.writeLock().unlock();
104    }
105  }
106
107  void releaseCompaction(ServerName serverName) {
108    lock.writeLock().lock();
109    try {
110      compactingServers.remove(serverName);
111    } finally {
112      lock.writeLock().unlock();
113    }
114  }
115
116  boolean atCapacity() {
117    lock.readLock().lock();
118    try {
119      return compactingServers.size() >= concurrentServers;
120    } finally {
121      lock.readLock().unlock();
122    }
123  }
124
125  Optional<ServerName> getLargestQueueFromServersNotCompacting() {
126    lock.readLock().lock();
127    try {
128      return compactionQueues.entrySet().stream()
129          .filter(entry -> !compactingServers.contains(entry.getKey()))
130          .max(Map.Entry.comparingByValue(
131            (o1, o2) -> Integer.compare(o1.size(), o2.size()))).map(Map.Entry::getKey);
132    } finally {
133      lock.readLock().unlock();
134    }
135  }
136
137}