001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019 020package org.apache.hadoop.hbase.namequeues.impl; 021 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.client.BalancerDecision; 024import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 025import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 026import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 027import org.apache.hadoop.hbase.namequeues.NamedQueueService; 028import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 029import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 030import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; 031import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 032import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 033import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037import java.util.Arrays; 038import java.util.Collections; 039import java.util.List; 040import java.util.Queue; 041import java.util.stream.Collectors; 042 043/** 044 * In-memory Queue service provider for Balancer Decision events 045 */ 046@InterfaceAudience.Private 047public class BalancerDecisionQueueService implements NamedQueueService { 048 049 private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class); 050 051 private final boolean isBalancerDecisionRecording; 052 053 private static final String BALANCER_DECISION_QUEUE_SIZE = 054 "hbase.master.balancer.decision.queue.size"; 055 private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250; 056 057 private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15; 058 059 private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue; 060 061 public BalancerDecisionQueueService(Configuration conf) { 062 isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 063 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 064 if (!isBalancerDecisionRecording) { 065 balancerDecisionQueue = null; 066 return; 067 } 068 final int queueSize = 069 conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE); 070 final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue = 071 EvictingQueue.create(queueSize); 072 balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue); 073 } 074 075 @Override 076 public NamedQueuePayload.NamedQueueEvent getEvent() { 077 return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION; 078 } 079 080 @Override 081 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 082 if (!isBalancerDecisionRecording) { 083 return; 084 } 085 if (!(namedQueuePayload instanceof BalancerDecisionDetails)) { 086 LOG.warn( 087 "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails."); 088 return; 089 } 090 BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload; 091 BalancerDecision balancerDecisionRecords = 092 balancerDecisionDetails.getBalancerDecision(); 093 List<String> regionPlans = balancerDecisionRecords.getRegionPlans(); 094 List<List<String>> regionPlansList; 095 if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) { 096 regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER); 097 } else { 098 regionPlansList = Collections.singletonList(regionPlans); 099 } 100 for (List<String> regionPlansPerBalancer : regionPlansList) { 101 RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder() 102 .setInitTotalCost(balancerDecisionRecords.getInitTotalCost()) 103 .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts()) 104 .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost()) 105 .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts()) 106 .setComputedSteps(balancerDecisionRecords.getComputedSteps()) 107 .addAllRegionPlans(regionPlansPerBalancer) 108 .build(); 109 balancerDecisionQueue.add(balancerDecision); 110 } 111 } 112 113 @Override 114 public boolean clearNamedQueue() { 115 if (!isBalancerDecisionRecording) { 116 return false; 117 } 118 LOG.debug("Received request to clean up balancer decision queue."); 119 balancerDecisionQueue.clear(); 120 return true; 121 } 122 123 @Override 124 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 125 if (!isBalancerDecisionRecording) { 126 return null; 127 } 128 List<RecentLogs.BalancerDecision> balancerDecisions = 129 Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0])) 130 .collect(Collectors.toList()); 131 // latest records should be displayed first, hence reverse order sorting 132 Collections.reverse(balancerDecisions); 133 int limit = balancerDecisions.size(); 134 if (request.getBalancerDecisionsRequest().hasLimit()) { 135 limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size()); 136 } 137 // filter limit if provided 138 balancerDecisions = balancerDecisions.subList(0, limit); 139 final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); 140 namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); 141 namedQueueGetResponse.setBalancerDecisions(balancerDecisions); 142 return namedQueueGetResponse; 143 } 144 145 @Override 146 public void persistAll() { 147 // no-op for now 148 } 149 150}