001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.FileOutputStream; 021import java.io.IOException; 022import java.util.Comparator; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.NavigableSet; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentSkipListSet; 028import java.util.function.Function; 029import org.apache.hadoop.hbase.io.ByteBuffAllocator; 030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 032import org.apache.hadoop.hbase.io.hfile.BlockPriority; 033import org.apache.hadoop.hbase.io.hfile.BlockType; 034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 035import org.apache.hadoop.hbase.io.hfile.HFileBlock; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 042 043@InterfaceAudience.Private 044final class BucketProtoUtils { 045 046 final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; 047 048 private BucketProtoUtils() { 049 050 } 051 052 static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, 053 BucketCacheProtos.BackingMap.Builder backingMapBuilder) { 054 return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) 055 .setIoClass(cache.ioEngine.getClass().getName()) 056 .setMapClass(cache.backingMap.getClass().getName()) 057 .putAllDeserializers(CacheableDeserializerIdManager.save()) 058 .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) 059 .setBackingMap(backingMapBuilder.build()) 060 .setChecksum(ByteString 061 .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) 062 .build(); 063 } 064 065 public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize) 066 throws IOException { 067 // Write the new version of magic number. 068 fos.write(PB_MAGIC_V2); 069 070 BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); 071 BucketCacheProtos.BackingMapEntry.Builder entryBuilder = 072 BucketCacheProtos.BackingMapEntry.newBuilder(); 073 074 // Persist the metadata first. 075 toPB(cache, builder).writeDelimitedTo(fos); 076 077 int blockCount = 0; 078 // Persist backing map entries in chunks of size 'chunkSize'. 079 for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) { 080 blockCount++; 081 addEntryToBuilder(entry, entryBuilder, builder); 082 if (blockCount % chunkSize == 0) { 083 builder.build().writeDelimitedTo(fos); 084 builder.clear(); 085 } 086 } 087 // Persist the last chunk. 088 if (builder.getEntryList().size() > 0) { 089 builder.build().writeDelimitedTo(fos); 090 } 091 } 092 093 private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry, 094 BucketCacheProtos.BackingMapEntry.Builder entryBuilder, 095 BucketCacheProtos.BackingMap.Builder builder) { 096 entryBuilder.clear(); 097 entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey())); 098 entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue())); 099 builder.addEntry(entryBuilder.build()); 100 } 101 102 private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { 103 return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName()) 104 .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary()) 105 .setBlockType(toPB(key.getBlockType())).build(); 106 } 107 108 private static BucketCacheProtos.BlockType toPB(BlockType blockType) { 109 switch (blockType) { 110 case DATA: 111 return BucketCacheProtos.BlockType.data; 112 case META: 113 return BucketCacheProtos.BlockType.meta; 114 case TRAILER: 115 return BucketCacheProtos.BlockType.trailer; 116 case INDEX_V1: 117 return BucketCacheProtos.BlockType.index_v1; 118 case FILE_INFO: 119 return BucketCacheProtos.BlockType.file_info; 120 case LEAF_INDEX: 121 return BucketCacheProtos.BlockType.leaf_index; 122 case ROOT_INDEX: 123 return BucketCacheProtos.BlockType.root_index; 124 case BLOOM_CHUNK: 125 return BucketCacheProtos.BlockType.bloom_chunk; 126 case ENCODED_DATA: 127 return BucketCacheProtos.BlockType.encoded_data; 128 case GENERAL_BLOOM_META: 129 return BucketCacheProtos.BlockType.general_bloom_meta; 130 case INTERMEDIATE_INDEX: 131 return BucketCacheProtos.BlockType.intermediate_index; 132 case DELETE_FAMILY_BLOOM_META: 133 return BucketCacheProtos.BlockType.delete_family_bloom_meta; 134 default: 135 throw new Error("Unrecognized BlockType."); 136 } 137 } 138 139 private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { 140 return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) 141 .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) 142 .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) 143 .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) 144 .setPriority(toPB(entry.getPriority())).build(); 145 } 146 147 private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { 148 switch (p) { 149 case MULTI: 150 return BucketCacheProtos.BlockPriority.multi; 151 case MEMORY: 152 return BucketCacheProtos.BlockPriority.memory; 153 case SINGLE: 154 return BucketCacheProtos.BlockPriority.single; 155 default: 156 throw new Error("Unrecognized BlockPriority."); 157 } 158 } 159 160 static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB( 161 Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap, 162 Function<BucketEntry, Recycler> createRecycler) throws IOException { 163 ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>(); 164 NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator 165 .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 166 for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { 167 BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); 168 BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), 169 protoKey.getPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); 170 BucketCacheProtos.BucketEntry protoValue = entry.getValue(); 171 // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator 172 // which created by RpcServer elegantly. 173 BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), 174 protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(), 175 protoValue.getCachedTime(), 176 protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, 177 ByteBuffAllocator.HEAP); 178 // This is the deserializer that we stored 179 int oldIndex = protoValue.getDeserialiserIndex(); 180 String deserializerClass = deserializers.get(oldIndex); 181 if (deserializerClass == null) { 182 throw new IOException("Found deserializer index without matching entry."); 183 } 184 // Convert it to the identifier for the deserializer that we have in this runtime 185 if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) { 186 int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier(); 187 value.deserializerIndex = (byte) actualIndex; 188 } else { 189 // We could make this more plugable, but right now HFileBlock is the only implementation 190 // of Cacheable outside of tests, so this might not ever matter. 191 throw new IOException("Unknown deserializer class found: " + deserializerClass); 192 } 193 result.put(key, value); 194 resultSet.add(key); 195 } 196 return new Pair<>(result, resultSet); 197 } 198 199 private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { 200 switch (blockType) { 201 case data: 202 return BlockType.DATA; 203 case meta: 204 return BlockType.META; 205 case trailer: 206 return BlockType.TRAILER; 207 case index_v1: 208 return BlockType.INDEX_V1; 209 case file_info: 210 return BlockType.FILE_INFO; 211 case leaf_index: 212 return BlockType.LEAF_INDEX; 213 case root_index: 214 return BlockType.ROOT_INDEX; 215 case bloom_chunk: 216 return BlockType.BLOOM_CHUNK; 217 case encoded_data: 218 return BlockType.ENCODED_DATA; 219 case general_bloom_meta: 220 return BlockType.GENERAL_BLOOM_META; 221 case intermediate_index: 222 return BlockType.INTERMEDIATE_INDEX; 223 case delete_family_bloom_meta: 224 return BlockType.DELETE_FAMILY_BLOOM_META; 225 default: 226 throw new Error("Unrecognized BlockType."); 227 } 228 } 229 230 static Map<String, BucketCacheProtos.RegionFileSizeMap> 231 toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) { 232 Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>(); 233 prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> { 234 BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize = 235 BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst()) 236 .setRegionCachedSize(regionPrefetchMap.getSecond()).build(); 237 tmpMap.put(hfileName, tmpRegionFileSize); 238 }); 239 return tmpMap; 240 } 241 242 static Map<String, Pair<String, Long>> 243 fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) { 244 Map<String, Pair<String, Long>> hfileMap = new HashMap<>(); 245 prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> { 246 hfileMap.put(hfileName, 247 new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize())); 248 }); 249 return hfileMap; 250 } 251}