001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.balancer;
019
020/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
021 * based on the amount they are cached on a given server. A region can move across the region
022 * servers whenever a region server shuts down or crashes. The region server preserves the cache
023 * periodically and restores the cache when it is restarted. This balancer implements a mechanism
024 * where it maintains the amount by which a region is cached on a region server. During balancer
025 * run, a region plan is generated that takes into account this cache information and tries to
026 * move the regions so that the cache is minimally impacted.
027 */
028
029import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
030
031import java.util.ArrayDeque;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.Deque;
035import java.util.HashMap;
036import java.util.List;
037import java.util.Map;
038import java.util.Optional;
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.hbase.ClusterMetrics;
041import org.apache.hadoop.hbase.RegionMetrics;
042import org.apache.hadoop.hbase.ServerMetrics;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.Size;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.master.RegionPlan;
048import org.apache.hadoop.hbase.util.Pair;
049import org.apache.yetus.audience.InterfaceAudience;
050import org.slf4j.Logger;
051import org.slf4j.LoggerFactory;
052
053@InterfaceAudience.Private
054public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
055  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
056
057  public static final String CACHE_RATIO_THRESHOLD =
058    "hbase.master.balancer.stochastic.throttling.cacheRatio";
059  public static final float CACHE_RATIO_THRESHOLD_DEFAULT = 0.8f;
060
061  public Float ratioThreshold;
062
063  private Long sleepTime;
064  private Configuration configuration;
065
066  public enum GeneratorFunctionType {
067    LOAD,
068    CACHE_RATIO
069  }
070
071  @Override
072  public synchronized void loadConf(Configuration configuration) {
073    this.configuration = configuration;
074    this.costFunctions = new ArrayList<>();
075    super.loadConf(configuration);
076    ratioThreshold =
077      this.configuration.getFloat(CACHE_RATIO_THRESHOLD, CACHE_RATIO_THRESHOLD_DEFAULT);
078    sleepTime = configuration.getLong(MOVE_THROTTLING, MOVE_THROTTLING_DEFAULT.toMillis());
079  }
080
081  @Override
082  protected Map<Class<? extends CandidateGenerator>, CandidateGenerator>
083    createCandidateGenerators(Configuration conf) {
084    Map<Class<? extends CandidateGenerator>, CandidateGenerator> candidateGenerators =
085      new HashMap<>(2);
086    candidateGenerators.put(CacheAwareSkewnessCandidateGenerator.class,
087      new CacheAwareSkewnessCandidateGenerator());
088    candidateGenerators.put(CacheAwareCandidateGenerator.class, new CacheAwareCandidateGenerator());
089    return candidateGenerators;
090  }
091
092  @Override
093  protected List<CostFunction> createCostFunctions(Configuration configuration) {
094    List<CostFunction> costFunctions = new ArrayList<>();
095    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
096    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
097    return costFunctions;
098  }
099
100  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
101    if (costFunction.getMultiplier() > 0) {
102      costFunctions.add(costFunction);
103    }
104  }
105
106  @Override
107  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
108    this.clusterStatus = clusterMetrics;
109    updateRegionLoad();
110  }
111
112  /**
113   * Collect the amount of region cached for all the regions from all the active region servers.
114   */
115  private void updateRegionLoad() {
116    loads = new HashMap<>();
117    regionCacheRatioOnOldServerMap = new HashMap<>();
118    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
119
120    // Build current region cache statistics
121    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
122      // Create a map of region and the server where it is currently hosted
123      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
124        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
125
126        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
127
128        // Get the total size of the hFiles in this region
129        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
130
131        rload.add(new BalancerRegionLoad(rm));
132        // Maintain a map of region and its total size. This is needed to calculate the cache
133        // ratios for the regions cached on old region servers
134        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
135        loads.put(regionEncodedName, rload);
136      });
137    });
138
139    // Build cache statistics for the regions hosted previously on old region servers
140    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
141      // Find if a region was previously hosted on a server other than the one it is currently
142      // hosted on.
143      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
144        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
145        // this server
146        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
147          ServerName currentServer =
148            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
149          if (!ServerName.isSameAddress(currentServer, sn)) {
150            int regionSizeMB =
151              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
152            float regionCacheRatioOnOldServer =
153              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
154            regionCacheRatioOnOldServerMap.put(regionEncodedName,
155              new Pair<>(sn, regionCacheRatioOnOldServer));
156          }
157        }
158      });
159    });
160  }
161
162  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
163    Optional<RegionInfo> regionInfoOptional =
164      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
165        return regionName.equals(ri.getEncodedName());
166      }).findFirst();
167
168    if (regionInfoOptional.isPresent()) {
169      return regionInfoOptional.get();
170    }
171    return null;
172  }
173
174  @Override
175  public void throttle(RegionPlan plan) {
176    Pair<ServerName, Float> rsRatio = this.regionCacheRatioOnOldServerMap.get(plan.getRegionName());
177    if (
178      rsRatio != null && plan.getDestination().equals(rsRatio.getFirst())
179        && rsRatio.getSecond() >= ratioThreshold
180    ) {
181      LOG.debug("Moving region {} to server {} with cache ratio {}. No throttling needed.",
182        plan.getRegionInfo().getEncodedName(), plan.getDestination(), rsRatio.getSecond());
183    } else {
184      if (rsRatio != null) {
185        LOG.debug("Moving region {} to server {} with cache ratio: {}. Throttling move for {}ms.",
186          plan.getRegionInfo().getEncodedName(), plan.getDestination(),
187          plan.getDestination().equals(rsRatio.getFirst()) ? rsRatio.getSecond() : "unknown",
188          sleepTime);
189      } else {
190        LOG.debug(
191          "Moving region {} to server {} with no cache ratio info for the region. "
192            + "Throttling move for {}ms.",
193          plan.getRegionInfo().getEncodedName(), plan.getDestination(), sleepTime);
194      }
195      try {
196        Thread.sleep(sleepTime);
197      } catch (InterruptedException e) {
198        throw new RuntimeException(e);
199      }
200    }
201  }
202
203  @Override
204  protected List<RegionPlan> balanceTable(TableName tableName,
205    Map<ServerName, List<RegionInfo>> loadOfOneTable) {
206    final Map<String, Pair<ServerName, Float>> snapshot = new HashMap<>();
207    snapshot.putAll(this.regionCacheRatioOnOldServerMap);
208    List<RegionPlan> plans = super.balanceTable(tableName, loadOfOneTable);
209    if (plans == null) {
210      return plans;
211    }
212    plans.sort((p1, p2) -> {
213      Pair<ServerName, Float> pair1 = snapshot.get(p1.getRegionName());
214      Float ratio1 =
215        pair1 == null ? 0 : pair1.getFirst().equals(p1.getDestination()) ? pair1.getSecond() : 0f;
216      Pair<ServerName, Float> pair2 = snapshot.get(p2.getRegionName());
217      Float ratio2 =
218        pair2 == null ? 0 : pair2.getFirst().equals(p2.getDestination()) ? pair2.getSecond() : 0f;
219      return ratio1.compareTo(ratio2) * (-1);
220    });
221    return plans;
222  }
223
224  private class CacheAwareCandidateGenerator extends CandidateGenerator {
225    @Override
226    protected BalanceAction generate(BalancerClusterState cluster) {
227      // Move the regions to the servers they were previously hosted on based on the cache ratio
228      if (
229        !regionCacheRatioOnOldServerMap.isEmpty()
230          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
231      ) {
232        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
233          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
234        // Get the server where this region was previously hosted
235        String regionEncodedName = regionCacheRatioServerMap.getKey();
236        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
237        if (regionInfo == null) {
238          LOG.warn("Region {} not found", regionEncodedName);
239          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
240          return BalanceAction.NULL_ACTION;
241        }
242        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
243          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
244          return BalanceAction.NULL_ACTION;
245        }
246        int regionIndex = cluster.regionsToIndex.get(regionInfo);
247        int oldServerIndex = cluster.serversToIndex
248          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
249        if (oldServerIndex < 0) {
250          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
251          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
252          return BalanceAction.NULL_ACTION;
253        }
254
255        float oldRegionCacheRatio =
256          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
257        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
258        float currentRegionCacheRatio =
259          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
260
261        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
262          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
263        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
264        return action;
265      }
266      return BalanceAction.NULL_ACTION;
267    }
268
269    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
270      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
271      float cacheRatioOnOldServer) {
272      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
273        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
274          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
275          : BalanceAction.NULL_ACTION;
276    }
277
278    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
279      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
280      float cacheRatioOnOldServer) {
281      // Find if the region has already moved by comparing the current server index with the
282      // current server index. This can happen when other candidate generator has moved the region
283      if (currentServerIndex < 0 || oldServerIndex < 0) {
284        return false;
285      }
286
287      float cacheRatioDiffThreshold = 0.6f;
288
289      // Conditions for moving the region
290
291      // If the region is fully cached on the old server, move the region back
292      if (cacheRatioOnOldServer == 1.0f) {
293        if (LOG.isDebugEnabled()) {
294          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
295            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
296        }
297        return true;
298      }
299
300      // Move the region back to the old server if it is cached equally on both the servers
301      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
302        if (LOG.isDebugEnabled()) {
303          LOG.debug(
304            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
305            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
306            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
307        }
308        return true;
309      }
310
311      // If the region is not fully cached on either of the servers, move the region back to the
312      // old server if the region cache ratio on the current server is still much less than the old
313      // server
314      if (
315        cacheRatioOnOldServer > 0.0f
316          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
317      ) {
318        if (LOG.isDebugEnabled()) {
319          LOG.debug(
320            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
321              + "cache ratio {}",
322            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
323            cluster.servers[oldServerIndex], cacheRatioOnOldServer, cacheRatioOnCurrentServer);
324        }
325        return true;
326      }
327
328      if (LOG.isDebugEnabled()) {
329        LOG.debug(
330          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
331          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
332          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
333      }
334      return false;
335    }
336  }
337
338  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
339    @Override
340    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
341      // First move all the regions which were hosted previously on some other server back to their
342      // old servers
343      if (
344        !regionCacheRatioOnOldServerMap.isEmpty()
345          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
346      ) {
347        // Get the first region index in the historical cache ratio list
348        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
349          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
350        String regionEncodedName = regionEntry.getKey();
351
352        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
353        if (regionInfo == null) {
354          LOG.warn("Region {} does not exist", regionEncodedName);
355          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
356          return BalanceAction.NULL_ACTION;
357        }
358        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
359          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
360          return BalanceAction.NULL_ACTION;
361        }
362
363        int regionIndex = cluster.regionsToIndex.get(regionInfo);
364
365        // Get the current host name for this region
366        thisServer = cluster.regionIndexToServerIndex[regionIndex];
367
368        // Get the old server index
369        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
370
371        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
372
373        if (otherServer < 0) {
374          // The old server has been moved to other host and hence, the region cannot be moved back
375          // to the old server
376          if (LOG.isDebugEnabled()) {
377            LOG.debug(
378              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
379                + "server {} as the server does not exist",
380              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
381          }
382          return BalanceAction.NULL_ACTION;
383        }
384
385        if (LOG.isDebugEnabled()) {
386          LOG.debug(
387            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
388              + "was hosted there earlier",
389            regionEncodedName, cluster.servers[thisServer].getHostname(),
390            cluster.servers[otherServer].getHostname());
391        }
392
393        return getAction(thisServer, regionIndex, otherServer, -1);
394      }
395
396      if (thisServer < 0 || otherServer < 0) {
397        return BalanceAction.NULL_ACTION;
398      }
399
400      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
401      if (regionIndexToMove < 0) {
402        if (LOG.isDebugEnabled()) {
403          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
404        }
405        return BalanceAction.NULL_ACTION;
406      }
407      if (LOG.isDebugEnabled()) {
408        LOG.debug(
409          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
410            + "least cached on current server",
411          cluster.regions[regionIndexToMove].getEncodedName(),
412          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
413      }
414      return getAction(thisServer, regionIndexToMove, otherServer, -1);
415    }
416
417    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
418      float minCacheRatio = Float.MAX_VALUE;
419      int leastCachedRegion = -1;
420      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
421        int regionIndex = cluster.regionsPerServer[thisServer][i];
422
423        float cacheRatioOnCurrentServer =
424          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
425        if (cacheRatioOnCurrentServer < minCacheRatio) {
426          minCacheRatio = cacheRatioOnCurrentServer;
427          leastCachedRegion = regionIndex;
428        }
429      }
430      return leastCachedRegion;
431    }
432  }
433
434  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
435    static final String REGION_COUNT_SKEW_COST_KEY =
436      "hbase.master.balancer.stochastic.regionCountCost";
437    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
438    private final DoubleArrayCost cost = new DoubleArrayCost();
439
440    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
441      // Load multiplier should be the greatest as it is the most general way to balance data.
442      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
443    }
444
445    @Override
446    void prepare(BalancerClusterState cluster) {
447      super.prepare(cluster);
448      cost.prepare(cluster.numServers);
449      cost.applyCostsChange(costs -> {
450        for (int i = 0; i < cluster.numServers; i++) {
451          costs[i] = cluster.regionsPerServer[i].length;
452        }
453      });
454    }
455
456    @Override
457    protected double cost() {
458      return cost.cost();
459    }
460
461    @Override
462    protected void regionMoved(int region, int oldServer, int newServer) {
463      cost.applyCostsChange(costs -> {
464        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
465        costs[newServer] = cluster.regionsPerServer[newServer].length;
466      });
467    }
468
469    @Override
470    public final void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
471      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
472    }
473  }
474
475  static class CacheAwareCostFunction extends CostFunction {
476    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
477    private double cacheRatio;
478    private double bestCacheRatio;
479
480    private static final float DEFAULT_CACHE_COST = 20;
481
482    CacheAwareCostFunction(Configuration conf) {
483      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
484      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
485      this.setMultiplier(
486        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
487      bestCacheRatio = 0.0;
488      cacheRatio = 0.0;
489    }
490
491    @Override
492    void prepare(BalancerClusterState cluster) {
493      super.prepare(cluster);
494      cacheRatio = 0.0;
495      bestCacheRatio = 0.0;
496
497      for (int region = 0; region < cluster.numRegions; region++) {
498        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
499          cluster.regionIndexToServerIndex[region]);
500        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
501          getServerWithBestCacheRatioForRegion(region));
502      }
503
504      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
505      if (LOG.isDebugEnabled()) {
506        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
507      }
508    }
509
510    @Override
511    protected double cost() {
512      return scale(0, 1, 1 - cacheRatio);
513    }
514
515    @Override
516    protected void regionMoved(int region, int oldServer, int newServer) {
517      double regionCacheRatioOnOldServer =
518        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
519      double regionCacheRatioOnNewServer =
520        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
521      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
522      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
523      cacheRatio += normalizedDelta;
524      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
525        LOG.debug(
526          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
527            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
528          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
529          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
530          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
531      }
532    }
533
534    private int getServerWithBestCacheRatioForRegion(int region) {
535      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
536    }
537
538    @Override
539    public void updateWeight(Map<Class<? extends CandidateGenerator>, Double> weights) {
540      weights.merge(LoadCandidateGenerator.class, cost(), Double::sum);
541    }
542  }
543}