001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.Comparator;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.NavigableSet;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentSkipListSet;
028import java.util.function.Function;
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
032import org.apache.hadoop.hbase.io.hfile.BlockPriority;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
035import org.apache.hadoop.hbase.io.hfile.HFileBlock;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
042
043@InterfaceAudience.Private
044final class BucketProtoUtils {
045
046  final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };
047
048  private BucketProtoUtils() {
049
050  }
051
052  static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
053    BucketCacheProtos.BackingMap.Builder backingMapBuilder) {
054    return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
055      .setIoClass(cache.ioEngine.getClass().getName())
056      .setMapClass(cache.backingMap.getClass().getName())
057      .putAllDeserializers(CacheableDeserializerIdManager.save())
058      .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
059      .setBackingMap(backingMapBuilder.build())
060      .setChecksum(ByteString
061        .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
062      .build();
063  }
064
065  public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize)
066    throws IOException {
067    // Write the new version of magic number.
068    fos.write(PB_MAGIC_V2);
069
070    BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
071    BucketCacheProtos.BackingMapEntry.Builder entryBuilder =
072      BucketCacheProtos.BackingMapEntry.newBuilder();
073
074    // Persist the metadata first.
075    toPB(cache, builder).writeDelimitedTo(fos);
076
077    int blockCount = 0;
078    // Persist backing map entries in chunks of size 'chunkSize'.
079    for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
080      blockCount++;
081      addEntryToBuilder(entry, entryBuilder, builder);
082      if (blockCount % chunkSize == 0) {
083        builder.build().writeDelimitedTo(fos);
084        builder.clear();
085      }
086    }
087    // Persist the last chunk.
088    if (builder.getEntryList().size() > 0) {
089      builder.build().writeDelimitedTo(fos);
090    }
091  }
092
093  private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry,
094    BucketCacheProtos.BackingMapEntry.Builder entryBuilder,
095    BucketCacheProtos.BackingMap.Builder builder) {
096    entryBuilder.clear();
097    entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey()));
098    entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue()));
099    builder.addEntry(entryBuilder.build());
100  }
101
102  private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
103    return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName())
104      .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary())
105      .setBlockType(toPB(key.getBlockType())).build();
106  }
107
108  private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
109    switch (blockType) {
110      case DATA:
111        return BucketCacheProtos.BlockType.data;
112      case META:
113        return BucketCacheProtos.BlockType.meta;
114      case TRAILER:
115        return BucketCacheProtos.BlockType.trailer;
116      case INDEX_V1:
117        return BucketCacheProtos.BlockType.index_v1;
118      case FILE_INFO:
119        return BucketCacheProtos.BlockType.file_info;
120      case LEAF_INDEX:
121        return BucketCacheProtos.BlockType.leaf_index;
122      case ROOT_INDEX:
123        return BucketCacheProtos.BlockType.root_index;
124      case BLOOM_CHUNK:
125        return BucketCacheProtos.BlockType.bloom_chunk;
126      case ENCODED_DATA:
127        return BucketCacheProtos.BlockType.encoded_data;
128      case GENERAL_BLOOM_META:
129        return BucketCacheProtos.BlockType.general_bloom_meta;
130      case INTERMEDIATE_INDEX:
131        return BucketCacheProtos.BlockType.intermediate_index;
132      case DELETE_FAMILY_BLOOM_META:
133        return BucketCacheProtos.BlockType.delete_family_bloom_meta;
134      default:
135        throw new Error("Unrecognized BlockType.");
136    }
137  }
138
139  private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
140    return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
141      .setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
142      .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
143      .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter())
144      .setPriority(toPB(entry.getPriority())).build();
145  }
146
147  private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
148    switch (p) {
149      case MULTI:
150        return BucketCacheProtos.BlockPriority.multi;
151      case MEMORY:
152        return BucketCacheProtos.BlockPriority.memory;
153      case SINGLE:
154        return BucketCacheProtos.BlockPriority.single;
155      default:
156        throw new Error("Unrecognized BlockPriority.");
157    }
158  }
159
160  static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB(
161    Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
162    Function<BucketEntry, Recycler> createRecycler) throws IOException {
163    ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
164    NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
165      .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
166    for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
167      BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
168      BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getFamilyName(),
169        protoKey.getRegionName(), protoKey.getOffset(), protoKey.getPrimaryReplicaBlock(),
170        fromPb(protoKey.getBlockType()), protoKey.getArchived());
171      BucketCacheProtos.BucketEntry protoValue = entry.getValue();
172      // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
173      // which created by RpcServer elegantly.
174      BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(),
175        protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(),
176        protoValue.getCachedTime(),
177        protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
178        ByteBuffAllocator.HEAP);
179      // This is the deserializer that we stored
180      int oldIndex = protoValue.getDeserialiserIndex();
181      String deserializerClass = deserializers.get(oldIndex);
182      if (deserializerClass == null) {
183        throw new IOException("Found deserializer index without matching entry.");
184      }
185      // Convert it to the identifier for the deserializer that we have in this runtime
186      if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
187        int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier();
188        value.deserializerIndex = (byte) actualIndex;
189      } else {
190        // We could make this more plugable, but right now HFileBlock is the only implementation
191        // of Cacheable outside of tests, so this might not ever matter.
192        throw new IOException("Unknown deserializer class found: " + deserializerClass);
193      }
194      result.put(key, value);
195      resultSet.add(key);
196    }
197    return new Pair<>(result, resultSet);
198  }
199
200  private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
201    switch (blockType) {
202      case data:
203        return BlockType.DATA;
204      case meta:
205        return BlockType.META;
206      case trailer:
207        return BlockType.TRAILER;
208      case index_v1:
209        return BlockType.INDEX_V1;
210      case file_info:
211        return BlockType.FILE_INFO;
212      case leaf_index:
213        return BlockType.LEAF_INDEX;
214      case root_index:
215        return BlockType.ROOT_INDEX;
216      case bloom_chunk:
217        return BlockType.BLOOM_CHUNK;
218      case encoded_data:
219        return BlockType.ENCODED_DATA;
220      case general_bloom_meta:
221        return BlockType.GENERAL_BLOOM_META;
222      case intermediate_index:
223        return BlockType.INTERMEDIATE_INDEX;
224      case delete_family_bloom_meta:
225        return BlockType.DELETE_FAMILY_BLOOM_META;
226      default:
227        throw new Error("Unrecognized BlockType.");
228    }
229  }
230
231  static Map<String, BucketCacheProtos.RegionFileSizeMap>
232    toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
233    Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
234    prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> {
235      BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize =
236        BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst())
237          .setRegionCachedSize(regionPrefetchMap.getSecond()).build();
238      tmpMap.put(hfileName, tmpRegionFileSize);
239    });
240    return tmpMap;
241  }
242
243  static Map<String, Pair<String, Long>>
244    fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) {
245    Map<String, Pair<String, Long>> hfileMap = new HashMap<>();
246    prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> {
247      hfileMap.put(hfileName,
248        new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize()));
249    });
250    return hfileMap;
251  }
252}