001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import java.io.IOException;
021import java.util.Collection;
022import java.util.List;
023import java.util.Map;
024import java.util.NavigableMap;
025import java.util.TreeMap;
026import java.util.function.Function;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.io.hfile.HFileWriterImpl;
030import org.apache.hadoop.hbase.util.Bytes;
031import org.apache.yetus.audience.InterfaceAudience;
032
033@InterfaceAudience.Private
034public class CustomTieringMultiFileWriter extends DateTieredMultiFileWriter {
035
036  public static final byte[] CUSTOM_TIERING_TIME_RANGE = Bytes.toBytes("CUSTOM_TIERING_TIME_RANGE");
037
038  private NavigableMap<Long, TimeRangeTracker> lowerBoundary2TimeRanger = new TreeMap<>();
039
040  public CustomTieringMultiFileWriter(List<Long> lowerBoundaries,
041    Map<Long, String> lowerBoundariesPolicies, boolean needEmptyFile,
042    Function<ExtendedCell, Long> tieringFunction) {
043    super(lowerBoundaries, lowerBoundariesPolicies, needEmptyFile, tieringFunction);
044    for (Long lowerBoundary : lowerBoundaries) {
045      lowerBoundary2TimeRanger.put(lowerBoundary, null);
046    }
047  }
048
049  @Override
050  public void append(ExtendedCell cell) throws IOException {
051    super.append(cell);
052    long tieringValue = tieringFunction.apply(cell);
053    Map.Entry<Long, TimeRangeTracker> entry = lowerBoundary2TimeRanger.floorEntry(tieringValue);
054    if (entry.getValue() == null) {
055      TimeRangeTracker timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
056      timeRangeTracker.setMin(tieringValue);
057      timeRangeTracker.setMax(tieringValue);
058      lowerBoundary2TimeRanger.put(entry.getKey(), timeRangeTracker);
059      ((HFileWriterImpl) lowerBoundary2Writer.get(entry.getKey()).getLiveFileWriter())
060        .setTimeRangeTrackerForTiering(() -> timeRangeTracker);
061    } else {
062      TimeRangeTracker timeRangeTracker = entry.getValue();
063      if (timeRangeTracker.getMin() > tieringValue) {
064        timeRangeTracker.setMin(tieringValue);
065      }
066      if (timeRangeTracker.getMax() < tieringValue) {
067        timeRangeTracker.setMax(tieringValue);
068      }
069    }
070  }
071
072  @Override
073  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
074    Collection<HStoreFile> storeFiles) throws IOException {
075    for (Map.Entry<Long, StoreFileWriter> entry : this.lowerBoundary2Writer.entrySet()) {
076      StoreFileWriter writer = entry.getValue();
077      if (writer != null) {
078        writer.appendFileInfo(CUSTOM_TIERING_TIME_RANGE,
079          TimeRangeTracker.toByteArray(lowerBoundary2TimeRanger.get(entry.getKey())));
080      }
081    }
082    return super.commitWriters(maxSeqId, majorCompaction, storeFiles);
083  }
084
085}