001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.FileOutputStream; 021import java.io.IOException; 022import java.util.Comparator; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.NavigableSet; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentSkipListSet; 028import java.util.function.Function; 029import org.apache.hadoop.hbase.io.ByteBuffAllocator; 030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 032import org.apache.hadoop.hbase.io.hfile.BlockPriority; 033import org.apache.hadoop.hbase.io.hfile.BlockType; 034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 035import org.apache.hadoop.hbase.io.hfile.HFileBlock; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 042 043@InterfaceAudience.Private 044final class BucketProtoUtils { 045 046 final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; 047 048 private BucketProtoUtils() { 049 050 } 051 052 static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, 053 BucketCacheProtos.BackingMap.Builder backingMapBuilder) { 054 return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) 055 .setIoClass(cache.ioEngine.getClass().getName()) 056 .setMapClass(cache.backingMap.getClass().getName()) 057 .putAllDeserializers(CacheableDeserializerIdManager.save()) 058 .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) 059 .setBackingMap(backingMapBuilder.build()) 060 .setChecksum(ByteString 061 .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) 062 .build(); 063 } 064 065 public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize) 066 throws IOException { 067 // Write the new version of magic number. 068 fos.write(PB_MAGIC_V2); 069 070 BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); 071 BucketCacheProtos.BackingMapEntry.Builder entryBuilder = 072 BucketCacheProtos.BackingMapEntry.newBuilder(); 073 074 // Persist the metadata first. 075 toPB(cache, builder).writeDelimitedTo(fos); 076 077 int blockCount = 0; 078 // Persist backing map entries in chunks of size 'chunkSize'. 079 for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) { 080 blockCount++; 081 addEntryToBuilder(entry, entryBuilder, builder); 082 if (blockCount % chunkSize == 0) { 083 builder.build().writeDelimitedTo(fos); 084 builder.clear(); 085 } 086 } 087 // Persist the last chunk. 088 if (builder.getEntryList().size() > 0) { 089 builder.build().writeDelimitedTo(fos); 090 } 091 } 092 093 private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry, 094 BucketCacheProtos.BackingMapEntry.Builder entryBuilder, 095 BucketCacheProtos.BackingMap.Builder builder) { 096 entryBuilder.clear(); 097 entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey())); 098 entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue())); 099 builder.addEntry(entryBuilder.build()); 100 } 101 102 private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { 103 return BucketCacheProtos.BlockCacheKey.newBuilder().setHfilename(key.getHfileName()) 104 .setOffset(key.getOffset()).setPrimaryReplicaBlock(key.isPrimary()) 105 .setBlockType(toPB(key.getBlockType())).build(); 106 } 107 108 private static BucketCacheProtos.BlockType toPB(BlockType blockType) { 109 switch (blockType) { 110 case DATA: 111 return BucketCacheProtos.BlockType.data; 112 case META: 113 return BucketCacheProtos.BlockType.meta; 114 case TRAILER: 115 return BucketCacheProtos.BlockType.trailer; 116 case INDEX_V1: 117 return BucketCacheProtos.BlockType.index_v1; 118 case FILE_INFO: 119 return BucketCacheProtos.BlockType.file_info; 120 case LEAF_INDEX: 121 return BucketCacheProtos.BlockType.leaf_index; 122 case ROOT_INDEX: 123 return BucketCacheProtos.BlockType.root_index; 124 case BLOOM_CHUNK: 125 return BucketCacheProtos.BlockType.bloom_chunk; 126 case ENCODED_DATA: 127 return BucketCacheProtos.BlockType.encoded_data; 128 case GENERAL_BLOOM_META: 129 return BucketCacheProtos.BlockType.general_bloom_meta; 130 case INTERMEDIATE_INDEX: 131 return BucketCacheProtos.BlockType.intermediate_index; 132 case DELETE_FAMILY_BLOOM_META: 133 return BucketCacheProtos.BlockType.delete_family_bloom_meta; 134 default: 135 throw new Error("Unrecognized BlockType."); 136 } 137 } 138 139 private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { 140 return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) 141 .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) 142 .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) 143 .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) 144 .setPriority(toPB(entry.getPriority())).build(); 145 } 146 147 private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { 148 switch (p) { 149 case MULTI: 150 return BucketCacheProtos.BlockPriority.multi; 151 case MEMORY: 152 return BucketCacheProtos.BlockPriority.memory; 153 case SINGLE: 154 return BucketCacheProtos.BlockPriority.single; 155 default: 156 throw new Error("Unrecognized BlockPriority."); 157 } 158 } 159 160 static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB( 161 Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap, 162 Function<BucketEntry, Recycler> createRecycler) throws IOException { 163 ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>(); 164 NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator 165 .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 166 for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { 167 BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); 168 BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getFamilyName(), 169 protoKey.getRegionName(), protoKey.getOffset(), protoKey.getPrimaryReplicaBlock(), 170 fromPb(protoKey.getBlockType()), protoKey.getArchived()); 171 BucketCacheProtos.BucketEntry protoValue = entry.getValue(); 172 // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator 173 // which created by RpcServer elegantly. 174 BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), 175 protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(), 176 protoValue.getCachedTime(), 177 protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, 178 ByteBuffAllocator.HEAP); 179 // This is the deserializer that we stored 180 int oldIndex = protoValue.getDeserialiserIndex(); 181 String deserializerClass = deserializers.get(oldIndex); 182 if (deserializerClass == null) { 183 throw new IOException("Found deserializer index without matching entry."); 184 } 185 // Convert it to the identifier for the deserializer that we have in this runtime 186 if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) { 187 int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier(); 188 value.deserializerIndex = (byte) actualIndex; 189 } else { 190 // We could make this more plugable, but right now HFileBlock is the only implementation 191 // of Cacheable outside of tests, so this might not ever matter. 192 throw new IOException("Unknown deserializer class found: " + deserializerClass); 193 } 194 result.put(key, value); 195 resultSet.add(key); 196 } 197 return new Pair<>(result, resultSet); 198 } 199 200 private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { 201 switch (blockType) { 202 case data: 203 return BlockType.DATA; 204 case meta: 205 return BlockType.META; 206 case trailer: 207 return BlockType.TRAILER; 208 case index_v1: 209 return BlockType.INDEX_V1; 210 case file_info: 211 return BlockType.FILE_INFO; 212 case leaf_index: 213 return BlockType.LEAF_INDEX; 214 case root_index: 215 return BlockType.ROOT_INDEX; 216 case bloom_chunk: 217 return BlockType.BLOOM_CHUNK; 218 case encoded_data: 219 return BlockType.ENCODED_DATA; 220 case general_bloom_meta: 221 return BlockType.GENERAL_BLOOM_META; 222 case intermediate_index: 223 return BlockType.INTERMEDIATE_INDEX; 224 case delete_family_bloom_meta: 225 return BlockType.DELETE_FAMILY_BLOOM_META; 226 default: 227 throw new Error("Unrecognized BlockType."); 228 } 229 } 230 231 static Map<String, BucketCacheProtos.RegionFileSizeMap> 232 toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) { 233 Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>(); 234 prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> { 235 BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize = 236 BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst()) 237 .setRegionCachedSize(regionPrefetchMap.getSecond()).build(); 238 tmpMap.put(hfileName, tmpRegionFileSize); 239 }); 240 return tmpMap; 241 } 242 243 static Map<String, Pair<String, Long>> 244 fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) { 245 Map<String, Pair<String, Long>> hfileMap = new HashMap<>(); 246 prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> { 247 hfileMap.put(hfileName, 248 new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize())); 249 }); 250 return hfileMap; 251 } 252}