001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.compaction; 019 020import java.util.List; 021import java.util.Map; 022import java.util.Optional; 023import java.util.Set; 024import java.util.concurrent.locks.ReadWriteLock; 025import java.util.concurrent.locks.ReentrantReadWriteLock; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 029import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 030import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 031import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 032 033@InterfaceAudience.Private 034class ClusterCompactionQueues { 035 036 private final Map<ServerName, List<MajorCompactionRequest>> compactionQueues; 037 private final Set<ServerName> compactingServers; 038 private final ReadWriteLock lock; 039 private final int concurrentServers; 040 041 ClusterCompactionQueues(int concurrentServers) { 042 this.concurrentServers = concurrentServers; 043 044 this.compactionQueues = Maps.newHashMap(); 045 this.lock = new ReentrantReadWriteLock(); 046 this.compactingServers = Sets.newHashSet(); 047 } 048 049 void addToCompactionQueue(ServerName serverName, MajorCompactionRequest info) { 050 this.lock.writeLock().lock(); 051 try { 052 List<MajorCompactionRequest> result = this.compactionQueues.get(serverName); 053 if (result == null) { 054 result = Lists.newArrayList(); 055 compactionQueues.put(serverName, result); 056 } 057 result.add(info); 058 } finally { 059 this.lock.writeLock().unlock(); 060 } 061 } 062 063 boolean hasWorkItems() { 064 lock.readLock().lock(); 065 try { 066 return !this.compactionQueues.values().stream().allMatch(List::isEmpty); 067 } finally { 068 lock.readLock().unlock(); 069 } 070 } 071 072 int getCompactionRequestsLeftToFinish() { 073 lock.readLock().lock(); 074 try { 075 int size = 0; 076 for (List<MajorCompactionRequest> queue : compactionQueues.values()) { 077 size += queue.size(); 078 } 079 return size; 080 } finally { 081 lock.readLock().unlock(); 082 } 083 } 084 085 @VisibleForTesting List<MajorCompactionRequest> getQueue(ServerName serverName) { 086 lock.readLock().lock(); 087 try { 088 return compactionQueues.get(serverName); 089 } finally { 090 lock.readLock().unlock(); 091 } 092 } 093 094 MajorCompactionRequest reserveForCompaction(ServerName serverName) { 095 lock.writeLock().lock(); 096 try { 097 if (!compactionQueues.get(serverName).isEmpty()) { 098 compactingServers.add(serverName); 099 return compactionQueues.get(serverName).remove(0); 100 } 101 return null; 102 } finally { 103 lock.writeLock().unlock(); 104 } 105 } 106 107 void releaseCompaction(ServerName serverName) { 108 lock.writeLock().lock(); 109 try { 110 compactingServers.remove(serverName); 111 } finally { 112 lock.writeLock().unlock(); 113 } 114 } 115 116 boolean atCapacity() { 117 lock.readLock().lock(); 118 try { 119 return compactingServers.size() >= concurrentServers; 120 } finally { 121 lock.readLock().unlock(); 122 } 123 } 124 125 Optional<ServerName> getLargestQueueFromServersNotCompacting() { 126 lock.readLock().lock(); 127 try { 128 return compactionQueues.entrySet().stream() 129 .filter(entry -> !compactingServers.contains(entry.getKey())) 130 .max(Map.Entry.comparingByValue( 131 (o1, o2) -> Integer.compare(o1.size(), o2.size()))).map(Map.Entry::getKey); 132 } finally { 133 lock.readLock().unlock(); 134 } 135 } 136 137}