001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.namequeues.impl;
019
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.List;
023import java.util.Queue;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.client.BalancerDecision;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
029import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
030import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
031import org.apache.hadoop.hbase.namequeues.NamedQueueService;
032import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
033import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
039import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
040import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
041
042import org.apache.hadoop.hbase.shaded.protobuf.generated.RecentLogs;
043
044/**
045 * In-memory Queue service provider for Balancer Decision events
046 */
047@InterfaceAudience.Private
048public class BalancerDecisionQueueService implements NamedQueueService {
049
050  private static final Logger LOG = LoggerFactory.getLogger(BalancerDecisionQueueService.class);
051
052  private final boolean isBalancerDecisionRecording;
053
054  private static final String BALANCER_DECISION_QUEUE_SIZE =
055    "hbase.master.balancer.decision.queue.size";
056  private static final int DEFAULT_BALANCER_DECISION_QUEUE_SIZE = 250;
057
058  private static final int REGION_PLANS_THRESHOLD_PER_BALANCER = 15;
059
060  private final Queue<RecentLogs.BalancerDecision> balancerDecisionQueue;
061
062  public BalancerDecisionQueueService(Configuration conf) {
063    isBalancerDecisionRecording = conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
064      BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
065    if (!isBalancerDecisionRecording) {
066      balancerDecisionQueue = null;
067      return;
068    }
069    final int queueSize =
070      conf.getInt(BALANCER_DECISION_QUEUE_SIZE, DEFAULT_BALANCER_DECISION_QUEUE_SIZE);
071    final EvictingQueue<RecentLogs.BalancerDecision> evictingQueue =
072      EvictingQueue.create(queueSize);
073    balancerDecisionQueue = Queues.synchronizedQueue(evictingQueue);
074  }
075
076  @Override
077  public NamedQueuePayload.NamedQueueEvent getEvent() {
078    return NamedQueuePayload.NamedQueueEvent.BALANCE_DECISION;
079  }
080
081  @Override
082  public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
083    if (!isBalancerDecisionRecording) {
084      return;
085    }
086    if (!(namedQueuePayload instanceof BalancerDecisionDetails)) {
087      LOG.warn(
088        "BalancerDecisionQueueService: NamedQueuePayload is not of type BalancerDecisionDetails.");
089      return;
090    }
091    BalancerDecisionDetails balancerDecisionDetails = (BalancerDecisionDetails) namedQueuePayload;
092    BalancerDecision balancerDecisionRecords = balancerDecisionDetails.getBalancerDecision();
093    List<String> regionPlans = balancerDecisionRecords.getRegionPlans();
094    List<List<String>> regionPlansList;
095    if (regionPlans.size() > REGION_PLANS_THRESHOLD_PER_BALANCER) {
096      regionPlansList = Lists.partition(regionPlans, REGION_PLANS_THRESHOLD_PER_BALANCER);
097    } else {
098      regionPlansList = Collections.singletonList(regionPlans);
099    }
100    for (List<String> regionPlansPerBalancer : regionPlansList) {
101      RecentLogs.BalancerDecision balancerDecision = RecentLogs.BalancerDecision.newBuilder()
102        .setInitTotalCost(balancerDecisionRecords.getInitTotalCost())
103        .setInitialFunctionCosts(balancerDecisionRecords.getInitialFunctionCosts())
104        .setComputedTotalCost(balancerDecisionRecords.getComputedTotalCost())
105        .setFinalFunctionCosts(balancerDecisionRecords.getFinalFunctionCosts())
106        .setComputedSteps(balancerDecisionRecords.getComputedSteps())
107        .addAllRegionPlans(regionPlansPerBalancer).build();
108      balancerDecisionQueue.add(balancerDecision);
109    }
110  }
111
112  @Override
113  public boolean clearNamedQueue() {
114    if (!isBalancerDecisionRecording) {
115      return false;
116    }
117    LOG.debug("Received request to clean up balancer decision queue.");
118    balancerDecisionQueue.clear();
119    return true;
120  }
121
122  @Override
123  public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
124    if (!isBalancerDecisionRecording) {
125      return null;
126    }
127    List<RecentLogs.BalancerDecision> balancerDecisions =
128      Arrays.stream(balancerDecisionQueue.toArray(new RecentLogs.BalancerDecision[0]))
129        .collect(Collectors.toList());
130    // latest records should be displayed first, hence reverse order sorting
131    Collections.reverse(balancerDecisions);
132    int limit = balancerDecisions.size();
133    if (request.getBalancerDecisionsRequest().hasLimit()) {
134      limit = Math.min(request.getBalancerDecisionsRequest().getLimit(), balancerDecisions.size());
135    }
136    // filter limit if provided
137    balancerDecisions = balancerDecisions.subList(0, limit);
138    final NamedQueueGetResponse namedQueueGetResponse = new NamedQueueGetResponse();
139    namedQueueGetResponse.setNamedQueueEvent(BalancerDecisionDetails.BALANCER_DECISION_EVENT);
140    namedQueueGetResponse.setBalancerDecisions(balancerDecisions);
141    return namedQueueGetResponse;
142  }
143
144  @Override
145  public void persistAll(Connection connection) {
146    // no-op for now
147  }
148
149}