001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.Queue; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.BalancerDecision; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 029import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 030import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 031import org.apache.hadoop.hbase.namequeues.NamedQueueService; 032import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 033import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 039import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 040import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 041 042import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; 043 044/** 045 * In-memory Queue service provider for Balancer Decision events 046 */ 047@InterfaceAudience.Private 048public class BalancerDecisionQueueService implements NamedQueueService { 049 050 private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class); 051 052 private final boolean isBalancerDecisionRecording; 053 054 private static final String BALANCER_DECISION_QUEUE_SIZE = 055 "hbase.master.balancer.decision.queue.size"; 056 private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250; 057 058 private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15; 059 060 private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue; 061 062 public BalancerDecisionQueueService(Configuration conf) { 063 isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 064 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 065 if (!isBalancerDecisionRecording) { 066 balancerDecisionQueue = null; 067 return; 068 } 069 final int queueSize = 070 conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE); 071 final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue = 072 EvictingQueue.create(queueSize); 073 balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue); 074 } 075 076 @Override 077 public NamedQueuePayload.NamedQueueEvent getEvent() { 078 return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION; 079 } 080 081 @Override 082 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 083 if (!isBalancerDecisionRecording) { 084 return; 085 } 086 if (!(namedQueuePayload instanceof BalancerDecisionDetails)) { 087 LOG.warn( 088 "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails."); 089 return; 090 } 091 BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload; 092 BalancerDecision balancerDecisionRecords = balancerDecisionDetails.getBalancerDecision(); 093 List<String> regionPlans = balancerDecisionRecords.getRegionPlans(); 094 List<List<String>> regionPlansList; 095 if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) { 096 regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER); 097 } else { 098 regionPlansList = Collections.singletonList(regionPlans); 099 } 100 for (List<String> regionPlansPerBalancer : regionPlansList) { 101 RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder() 102 .setInitTotalCost(balancerDecisionRecords.getInitTotalCost()) 103 .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts()) 104 .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost()) 105 .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts()) 106 .setComputedSteps(balancerDecisionRecords.getComputedSteps()) 107 .addAllRegionPlans(regionPlansPerBalancer).build(); 108 balancerDecisionQueue.add(balancerDecision); 109 } 110 } 111 112 @Override 113 public boolean clearNamedQueue() { 114 if (!isBalancerDecisionRecording) { 115 return false; 116 } 117 LOG.debug("Received request to clean up balancer decision queue."); 118 balancerDecisionQueue.clear(); 119 return true; 120 } 121 122 @Override 123 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 124 if (!isBalancerDecisionRecording) { 125 return null; 126 } 127 List<RecentLogs.BalancerDecision> balancerDecisions = 128 Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0])) 129 .collect(Collectors.toList()); 130 // latest records should be displayed first, hence reverse order sorting 131 Collections.reverse(balancerDecisions); 132 int limit = balancerDecisions.size(); 133 if (request.getBalancerDecisionsRequest().hasLimit()) { 134 limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size()); 135 } 136 // filter limit if provided 137 balancerDecisions = balancerDecisions.subList(0, limit); 138 final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); 139 namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); 140 namedQueueGetResponse.setBalancerDecisions(balancerDecisions); 141 return namedQueueGetResponse; 142 } 143 144 @Override 145 public void persistAll(Connection connection) { 146 // no-op for now 147 } 148 149}