001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.FileOutputStream; 021import java.io.IOException; 022import java.util.Comparator; 023import java.util.HashMap; 024import java.util.Map; 025import java.util.NavigableSet; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentSkipListSet; 028import java.util.function.Function; 029import org.apache.hadoop.hbase.io.ByteBuffAllocator; 030import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; 031import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; 032import org.apache.hadoop.hbase.io.hfile.BlockPriority; 033import org.apache.hadoop.hbase.io.hfile.BlockType; 034import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 035import org.apache.hadoop.hbase.io.hfile.HFileBlock; 036import org.apache.hadoop.hbase.util.Pair; 037import org.apache.yetus.audience.InterfaceAudience; 038 039import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; 042 043@InterfaceAudience.Private 044final class BucketProtoUtils { 045 046 final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' }; 047 048 private BucketProtoUtils() { 049 050 } 051 052 static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache, 053 BucketCacheProtos.BackingMap.Builder backingMapBuilder) { 054 return BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize()) 055 .setIoClass(cache.ioEngine.getClass().getName()) 056 .setMapClass(cache.backingMap.getClass().getName()) 057 .putAllDeserializers(CacheableDeserializerIdManager.save()) 058 .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)) 059 .setBackingMap(backingMapBuilder.build()) 060 .setChecksum(ByteString 061 .copyFrom(((PersistentIOEngine) cache.ioEngine).calculateChecksum(cache.getAlgorithm()))) 062 .build(); 063 } 064 065 public static void serializeAsPB(BucketCache cache, FileOutputStream fos, long chunkSize) 066 throws IOException { 067 // Write the new version of magic number. 068 fos.write(PB_MAGIC_V2); 069 070 BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); 071 BucketCacheProtos.BackingMapEntry.Builder entryBuilder = 072 BucketCacheProtos.BackingMapEntry.newBuilder(); 073 074 // Persist the metadata first. 075 toPB(cache, builder).writeDelimitedTo(fos); 076 077 int blockCount = 0; 078 // Persist backing map entries in chunks of size 'chunkSize'. 079 for (Map.Entry<BlockCacheKey, BucketEntry> entry : cache.backingMap.entrySet()) { 080 blockCount++; 081 addEntryToBuilder(entry, entryBuilder, builder); 082 if (blockCount % chunkSize == 0) { 083 builder.build().writeDelimitedTo(fos); 084 builder.clear(); 085 } 086 } 087 // Persist the last chunk. 088 if (builder.getEntryList().size() > 0) { 089 builder.build().writeDelimitedTo(fos); 090 } 091 } 092 093 private static void addEntryToBuilder(Map.Entry<BlockCacheKey, BucketEntry> entry, 094 BucketCacheProtos.BackingMapEntry.Builder entryBuilder, 095 BucketCacheProtos.BackingMap.Builder builder) { 096 entryBuilder.clear(); 097 entryBuilder.setKey(BucketProtoUtils.toPB(entry.getKey())); 098 entryBuilder.setValue(BucketProtoUtils.toPB(entry.getValue())); 099 builder.addEntry(entryBuilder.build()); 100 } 101 102 private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { 103 BucketCacheProtos.BlockCacheKey.Builder builder = BucketCacheProtos.BlockCacheKey.newBuilder() 104 .setHfilename(key.getHfileName()).setOffset(key.getOffset()) 105 .setPrimaryReplicaBlock(key.isPrimary()).setBlockType(toPB(key.getBlockType())); 106 if (key.getCfName() != null) { 107 builder.setFamilyName(key.getCfName()); 108 } 109 if (key.getRegionName() != null) { 110 builder.setRegionName(key.getRegionName()); 111 } 112 return builder.build(); 113 } 114 115 private static BucketCacheProtos.BlockType toPB(BlockType blockType) { 116 switch (blockType) { 117 case DATA: 118 return BucketCacheProtos.BlockType.data; 119 case META: 120 return BucketCacheProtos.BlockType.meta; 121 case TRAILER: 122 return BucketCacheProtos.BlockType.trailer; 123 case INDEX_V1: 124 return BucketCacheProtos.BlockType.index_v1; 125 case FILE_INFO: 126 return BucketCacheProtos.BlockType.file_info; 127 case LEAF_INDEX: 128 return BucketCacheProtos.BlockType.leaf_index; 129 case ROOT_INDEX: 130 return BucketCacheProtos.BlockType.root_index; 131 case BLOOM_CHUNK: 132 return BucketCacheProtos.BlockType.bloom_chunk; 133 case ENCODED_DATA: 134 return BucketCacheProtos.BlockType.encoded_data; 135 case GENERAL_BLOOM_META: 136 return BucketCacheProtos.BlockType.general_bloom_meta; 137 case INTERMEDIATE_INDEX: 138 return BucketCacheProtos.BlockType.intermediate_index; 139 case DELETE_FAMILY_BLOOM_META: 140 return BucketCacheProtos.BlockType.delete_family_bloom_meta; 141 default: 142 throw new Error("Unrecognized BlockType."); 143 } 144 } 145 146 private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) { 147 return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset()) 148 .setCachedTime(entry.getCachedTime()).setLength(entry.getLength()) 149 .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader()) 150 .setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter()) 151 .setPriority(toPB(entry.getPriority())).build(); 152 } 153 154 private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { 155 switch (p) { 156 case MULTI: 157 return BucketCacheProtos.BlockPriority.multi; 158 case MEMORY: 159 return BucketCacheProtos.BlockPriority.memory; 160 case SINGLE: 161 return BucketCacheProtos.BlockPriority.single; 162 default: 163 throw new Error("Unrecognized BlockPriority."); 164 } 165 } 166 167 static Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, NavigableSet<BlockCacheKey>> fromPB( 168 Map<Integer, String> deserializers, BucketCacheProtos.BackingMap backingMap, 169 Function<BucketEntry, Recycler> createRecycler) throws IOException { 170 ConcurrentHashMap<BlockCacheKey, BucketEntry> result = new ConcurrentHashMap<>(); 171 NavigableSet<BlockCacheKey> resultSet = new ConcurrentSkipListSet<>(Comparator 172 .comparing(BlockCacheKey::getHfileName).thenComparingLong(BlockCacheKey::getOffset)); 173 for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { 174 BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); 175 BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getFamilyName(), 176 protoKey.getRegionName(), protoKey.getOffset(), protoKey.getPrimaryReplicaBlock(), 177 fromPb(protoKey.getBlockType()), protoKey.getArchived()); 178 BucketCacheProtos.BucketEntry protoValue = entry.getValue(); 179 // TODO:We use ByteBuffAllocator.HEAP here, because we could not get the ByteBuffAllocator 180 // which created by RpcServer elegantly. 181 BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), 182 protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(), 183 protoValue.getCachedTime(), 184 protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory, createRecycler, 185 ByteBuffAllocator.HEAP); 186 // This is the deserializer that we stored 187 int oldIndex = protoValue.getDeserialiserIndex(); 188 String deserializerClass = deserializers.get(oldIndex); 189 if (deserializerClass == null) { 190 throw new IOException("Found deserializer index without matching entry."); 191 } 192 // Convert it to the identifier for the deserializer that we have in this runtime 193 if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) { 194 int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserializerIdentifier(); 195 value.deserializerIndex = (byte) actualIndex; 196 } else { 197 // We could make this more plugable, but right now HFileBlock is the only implementation 198 // of Cacheable outside of tests, so this might not ever matter. 199 throw new IOException("Unknown deserializer class found: " + deserializerClass); 200 } 201 result.put(key, value); 202 resultSet.add(key); 203 } 204 return new Pair<>(result, resultSet); 205 } 206 207 private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { 208 switch (blockType) { 209 case data: 210 return BlockType.DATA; 211 case meta: 212 return BlockType.META; 213 case trailer: 214 return BlockType.TRAILER; 215 case index_v1: 216 return BlockType.INDEX_V1; 217 case file_info: 218 return BlockType.FILE_INFO; 219 case leaf_index: 220 return BlockType.LEAF_INDEX; 221 case root_index: 222 return BlockType.ROOT_INDEX; 223 case bloom_chunk: 224 return BlockType.BLOOM_CHUNK; 225 case encoded_data: 226 return BlockType.ENCODED_DATA; 227 case general_bloom_meta: 228 return BlockType.GENERAL_BLOOM_META; 229 case intermediate_index: 230 return BlockType.INTERMEDIATE_INDEX; 231 case delete_family_bloom_meta: 232 return BlockType.DELETE_FAMILY_BLOOM_META; 233 default: 234 throw new Error("Unrecognized BlockType."); 235 } 236 } 237 238 static Map<String, BucketCacheProtos.RegionFileSizeMap> 239 toCachedPB(Map<String, Pair<String, Long>> prefetchedHfileNames) { 240 Map<String, BucketCacheProtos.RegionFileSizeMap> tmpMap = new HashMap<>(); 241 prefetchedHfileNames.forEach((hfileName, regionPrefetchMap) -> { 242 BucketCacheProtos.RegionFileSizeMap tmpRegionFileSize = 243 BucketCacheProtos.RegionFileSizeMap.newBuilder().setRegionName(regionPrefetchMap.getFirst()) 244 .setRegionCachedSize(regionPrefetchMap.getSecond()).build(); 245 tmpMap.put(hfileName, tmpRegionFileSize); 246 }); 247 return tmpMap; 248 } 249 250 static Map<String, Pair<String, Long>> 251 fromPB(Map<String, BucketCacheProtos.RegionFileSizeMap> prefetchHFileNames) { 252 Map<String, Pair<String, Long>> hfileMap = new HashMap<>(); 253 prefetchHFileNames.forEach((hfileName, regionPrefetchMap) -> { 254 hfileMap.put(hfileName, 255 new Pair<>(regionPrefetchMap.getRegionName(), regionPrefetchMap.getRegionCachedSize())); 256 }); 257 return hfileMap; 258 } 259}