001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.namequeues.impl; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.Queue; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.client.BalancerDecision; 027import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; 028import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; 029import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; 030import org.apache.hadoop.hbase.namequeues.NamedQueueService; 031import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; 032import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue; 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039import org.apache.hbase.thirdparty.com.google.common.collect.Queues; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs; 042 043/** 044 * In-memory Queue service provider for Balancer Decision events 045 */ 046@InterfaceAudience.Private 047public class BalancerDecisionQueueService implements NamedQueueService { 048 049 private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class); 050 051 private final boolean isBalancerDecisionRecording; 052 053 private static final String BALANCER_DECISION_QUEUE_SIZE = 054 "hbase.master.balancer.decision.queue.size"; 055 private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250; 056 057 private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15; 058 059 private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue; 060 061 public BalancerDecisionQueueService(Configuration conf) { 062 isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED, 063 BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED); 064 if (!isBalancerDecisionRecording) { 065 balancerDecisionQueue = null; 066 return; 067 } 068 final int queueSize = 069 conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE); 070 final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue = 071 EvictingQueue.create(queueSize); 072 balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue); 073 } 074 075 @Override 076 public NamedQueuePayload.NamedQueueEvent getEvent() { 077 return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION; 078 } 079 080 @Override 081 public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) { 082 if (!isBalancerDecisionRecording) { 083 return; 084 } 085 if (!(namedQueuePayload instanceof BalancerDecisionDetails)) { 086 LOG.warn( 087 "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails."); 088 return; 089 } 090 BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload; 091 BalancerDecision balancerDecisionRecords = balancerDecisionDetails.getBalancerDecision(); 092 List<String> regionPlans = balancerDecisionRecords.getRegionPlans(); 093 List<List<String>> regionPlansList; 094 if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) { 095 regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER); 096 } else { 097 regionPlansList = Collections.singletonList(regionPlans); 098 } 099 for (List<String> regionPlansPerBalancer : regionPlansList) { 100 RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder() 101 .setInitTotalCost(balancerDecisionRecords.getInitTotalCost()) 102 .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts()) 103 .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost()) 104 .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts()) 105 .setComputedSteps(balancerDecisionRecords.getComputedSteps()) 106 .addAllRegionPlans(regionPlansPerBalancer).build(); 107 balancerDecisionQueue.add(balancerDecision); 108 } 109 } 110 111 @Override 112 public boolean clearNamedQueue() { 113 if (!isBalancerDecisionRecording) { 114 return false; 115 } 116 LOG.debug("Received request to clean up balancer decision queue."); 117 balancerDecisionQueue.clear(); 118 return true; 119 } 120 121 @Override 122 public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) { 123 if (!isBalancerDecisionRecording) { 124 return null; 125 } 126 List<RecentLogs.BalancerDecision> balancerDecisions = 127 Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0])) 128 .collect(Collectors.toList()); 129 // latest records should be displayed first, hence reverse order sorting 130 Collections.reverse(balancerDecisions); 131 int limit = balancerDecisions.size(); 132 if (request.getBalancerDecisionsRequest().hasLimit()) { 133 limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size()); 134 } 135 // filter limit if provided 136 balancerDecisions = balancerDecisions.subList(0, limit); 137 final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse(); 138 namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT); 139 namedQueueGetResponse.setBalancerDecisions(balancerDecisions); 140 return namedQueueGetResponse; 141 } 142 143 @Override 144 public void persistAll() { 145 // no-op for now 146 } 147 148}