001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.Comparator;
023import java.util.HashMap;
024import java.util.Map;
025import java.util.NavigableSet;
026import java.util.concurrent.ConcurrentHashMap;
027import java.util.concurrent.ConcurrentSkipListSet;
028import java.util.function.Function;
029import org.apache.hadoop.hbase.io.ByteBuffAllocator;
030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler;
031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
032import org.apache.hadoop.hbase.io.hfile.BlockPriority;
033import org.apache.hadoop.hbase.io.hfile.BlockType;
034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
035import org.apache.hadoop.hbase.io.hfile.HFileBlock;
036import org.apache.hadoop.hbase.util.Pair;
037import org.apache.yetus.audience.InterfaceAudience;
038
039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
042
043@InterfaceAudience.Private
044final class BucketProtoUtils {
045
046  final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };
047
048  private BucketProtoUtils() {
049
050  }
051
052  static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
053    BucketCacheProtos.BackingMap.Builder backingMapBuilder) {
054    return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
055      .setIoClass(cache.ioEngine.getClass().getName())
056      .setMapClass(cache.backingMap.getClass().getName())
057      .putAllDeserializers(CacheableDeserializerIdManager.save())
058      .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
059      .setBackingMap(backingMapBuilder.build())
060      .setChecksum(ByteString
061        .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
062      .build();
063  }
064
065  public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize)
066    throws IOException {
067    // Write the new version of magic number.
068    fos.write(PB_MAGIC_V2);
069
070    BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder();
071    BucketCacheProtos.BackingMapEntry.Builder entryBuilder =
072      BucketCacheProtos.BackingMapEntry.newBuilder();
073
074    // Persist the metadata first.
075    toPB(cache, builder).writeDelimitedTo(fos);
076
077    int blockCount = 0;
078    // Persist backing map entries in chunks of size 'chunkSize'.
079    for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) {
080      blockCount++;
081      addEntryToBuilder(entry, entryBuilder, builder);
082      if (blockCount % chunkSize == 0) {
083        builder.build().writeDelimitedTo(fos);
084        builder.clear();
085      }
086    }
087    // Persist the last chunk.
088    if (builder.getEntryList().size() > 0) {
089      builder.build().writeDelimitedTo(fos);
090    }
091  }
092
093  private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry,
094    BucketCacheProtos.BackingMapEntry.Builder entryBuilder,
095    BucketCacheProtos.BackingMap.Builder builder) {
096    entryBuilder.clear();
097    entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey()));
098    entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue()));
099    builder.addEntry(entryBuilder.build());
100  }
101
102  private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
103    BucketCacheProtos.BlockCacheKey.Builder builder = BucketCacheProtos.BlockCacheKey.newBuilder()
104      .setHfilename(key.getHfileName()).setOffset(key.getOffset())
105      .setPrimaryReplicaBlock(key.isPrimary()).setBlockType(toPB(key.getBlockType()));
106    if (key.getCfName() != null) {
107      builder.setFamilyName(key.getCfName());
108    }
109    if (key.getRegionName() != null) {
110      builder.setRegionName(key.getRegionName());
111    }
112    return builder.build();
113  }
114
115  private static BucketCacheProtos.BlockType toPB(BlockType blockType) {
116    switch (blockType) {
117      case DATA:
118        return BucketCacheProtos.BlockType.data;
119      case META:
120        return BucketCacheProtos.BlockType.meta;
121      case TRAILER:
122        return BucketCacheProtos.BlockType.trailer;
123      case INDEX_V1:
124        return BucketCacheProtos.BlockType.index_v1;
125      case FILE_INFO:
126        return BucketCacheProtos.BlockType.file_info;
127      case LEAF_INDEX:
128        return BucketCacheProtos.BlockType.leaf_index;
129      case ROOT_INDEX:
130        return BucketCacheProtos.BlockType.root_index;
131      case BLOOM_CHUNK:
132        return BucketCacheProtos.BlockType.bloom_chunk;
133      case ENCODED_DATA:
134        return BucketCacheProtos.BlockType.encoded_data;
135      case GENERAL_BLOOM_META:
136        return BucketCacheProtos.BlockType.general_bloom_meta;
137      case INTERMEDIATE_INDEX:
138        return BucketCacheProtos.BlockType.intermediate_index;
139      case DELETE_FAMILY_BLOOM_META:
140        return BucketCacheProtos.BlockType.delete_family_bloom_meta;
141      default:
142        throw new Error("Unrecognized BlockType.");
143    }
144  }
145
146  private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
147    return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
148      .setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
149      .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
150      .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter())
151      .setPriority(toPB(entry.getPriority())).build();
152  }
153
154  private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
155    switch (p) {
156      case MULTI:
157        return BucketCacheProtos.BlockPriority.multi;
158      case MEMORY:
159        return BucketCacheProtos.BlockPriority.memory;
160      case SINGLE:
161        return BucketCacheProtos.BlockPriority.single;
162      default:
163        throw new Error("Unrecognized BlockPriority.");
164    }
165  }
166
167  static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB(
168    Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap,
169    Function<BucketEntry, Recycler> createRecycler) throws IOException {
170    ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>();
171    NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator
172      .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset));
173    for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) {
174      BucketCacheProtos.BlockCacheKey protoKey = entry.getKey();
175      BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getFamilyName(),
176        protoKey.getRegionName(), protoKey.getOffset(), protoKey.getPrimaryReplicaBlock(),
177        fromPb(protoKey.getBlockType()), protoKey.getArchived());
178      BucketCacheProtos.BucketEntry protoValue = entry.getValue();
179      // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator
180      // which created by RpcServer elegantly.
181      BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(),
182        protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(),
183        protoValue.getCachedTime(),
184        protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler,
185        ByteBuffAllocator.HEAP);
186      // This is the deserializer that we stored
187      int oldIndex = protoValue.getDeserialiserIndex();
188      String deserializerClass = deserializers.get(oldIndex);
189      if (deserializerClass == null) {
190        throw new IOException("Found deserializer index without matching entry.");
191      }
192      // Convert it to the identifier for the deserializer that we have in this runtime
193      if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) {
194        int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier();
195        value.deserializerIndex = (byte) actualIndex;
196      } else {
197        // We could make this more plugable, but right now HFileBlock is the only implementation
198        // of Cacheable outside of tests, so this might not ever matter.
199        throw new IOException("Unknown deserializer class found: " + deserializerClass);
200      }
201      result.put(key, value);
202      resultSet.add(key);
203    }
204    return new Pair<>(result, resultSet);
205  }
206
207  private static BlockType fromPb(BucketCacheProtos.BlockType blockType) {
208    switch (blockType) {
209      case data:
210        return BlockType.DATA;
211      case meta:
212        return BlockType.META;
213      case trailer:
214        return BlockType.TRAILER;
215      case index_v1:
216        return BlockType.INDEX_V1;
217      case file_info:
218        return BlockType.FILE_INFO;
219      case leaf_index:
220        return BlockType.LEAF_INDEX;
221      case root_index:
222        return BlockType.ROOT_INDEX;
223      case bloom_chunk:
224        return BlockType.BLOOM_CHUNK;
225      case encoded_data:
226        return BlockType.ENCODED_DATA;
227      case general_bloom_meta:
228        return BlockType.GENERAL_BLOOM_META;
229      case intermediate_index:
230        return BlockType.INTERMEDIATE_INDEX;
231      case delete_family_bloom_meta:
232        return BlockType.DELETE_FAMILY_BLOOM_META;
233      default:
234        throw new Error("Unrecognized BlockType.");
235    }
236  }
237
238  static Map<String, BucketCacheProtos.RegionFileSizeMap>
239    toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) {
240    Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>();
241    prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> {
242      BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize =
243        BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst())
244          .setRegionCachedSize(regionPrefetchMap.getSecond()).build();
245      tmpMap.put(hfileName, tmpRegionFileSize);
246    });
247    return tmpMap;
248  }
249
250  static Map<String, Pair<String, Long>>
251    fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) {
252    Map<String, Pair<String, Long>> hfileMap = new HashMap<>();
253    prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> {
254      hfileMap.put(hfileName,
255        new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize()));
256    });
257    return hfileMap;
258  }
259}