001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.Modifier; 024import java.util.concurrent.ThreadLocalRandom; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 028import org.apache.hbase.thirdparty.com.google.protobuf.Any; 029import org.apache.hbase.thirdparty.com.google.protobuf.Internal; 030import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 031import org.apache.hbase.thirdparty.com.google.protobuf.Message; 032import org.apache.hbase.thirdparty.com.google.protobuf.Parser; 033import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 036import org.apache.hadoop.hbase.util.NonceKey; 037 038/** 039 * Helper to convert to/from ProcedureProtos 040 */ 041@InterfaceAudience.Private 042public final class ProcedureUtil { 043 private ProcedureUtil() { } 044 045 // ========================================================================== 046 // Reflection helpers to create/validate a Procedure object 047 // ========================================================================== 048 private static Procedure<?> newProcedure(String className) throws BadProcedureException { 049 try { 050 Class<?> clazz = Class.forName(className); 051 if (!Modifier.isPublic(clazz.getModifiers())) { 052 throw new Exception("the " + clazz + " class is not public"); 053 } 054 055 @SuppressWarnings("rawtypes") 056 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor(); 057 assert ctor != null : "no constructor found"; 058 if (!Modifier.isPublic(ctor.getModifiers())) { 059 throw new Exception("the " + clazz + " constructor is not public"); 060 } 061 return ctor.newInstance(); 062 } catch (Exception e) { 063 throw new BadProcedureException( 064 "The procedure class " + className + " must be accessible and have an empty constructor", 065 e); 066 } 067 } 068 069 static void validateClass(Procedure<?> proc) throws BadProcedureException { 070 try { 071 Class<?> clazz = proc.getClass(); 072 if (!Modifier.isPublic(clazz.getModifiers())) { 073 throw new Exception("the " + clazz + " class is not public"); 074 } 075 076 Constructor<?> ctor = clazz.getConstructor(); 077 assert ctor != null; 078 if (!Modifier.isPublic(ctor.getModifiers())) { 079 throw new Exception("the " + clazz + " constructor is not public"); 080 } 081 } catch (Exception e) { 082 throw new BadProcedureException("The procedure class " + proc.getClass().getName() + 083 " must be accessible and have an empty constructor", e); 084 } 085 } 086 087 // ========================================================================== 088 // convert to and from Procedure object 089 // ========================================================================== 090 091 /** 092 * A serializer for our Procedures. Instead of the previous serializer, it 093 * uses the stateMessage list to store the internal state of the Procedures. 094 */ 095 private static class StateSerializer implements ProcedureStateSerializer { 096 private final ProcedureProtos.Procedure.Builder builder; 097 private int deserializeIndex; 098 099 public StateSerializer(ProcedureProtos.Procedure.Builder builder) { 100 this.builder = builder; 101 } 102 103 @Override 104 public void serialize(Message message) throws IOException { 105 Any packedMessage = Any.pack(message); 106 builder.addStateMessage(packedMessage); 107 } 108 109 @Override 110 public <M extends Message> M deserialize(Class<M> clazz) 111 throws IOException { 112 if (deserializeIndex >= builder.getStateMessageCount()) { 113 throw new IOException("Invalid state message index: " + deserializeIndex); 114 } 115 116 try { 117 Any packedMessage = builder.getStateMessage(deserializeIndex++); 118 return packedMessage.unpack(clazz); 119 } catch (InvalidProtocolBufferException e) { 120 throw e.unwrapIOException(); 121 } 122 } 123 } 124 125 /** 126 * A serializer (deserializer) for those Procedures which were serialized 127 * before this patch. It deserializes the old, binary stateData field. 128 */ 129 private static class CompatStateSerializer implements ProcedureStateSerializer { 130 private InputStream inputStream; 131 132 public CompatStateSerializer(InputStream inputStream) { 133 this.inputStream = inputStream; 134 } 135 136 @Override 137 public void serialize(Message message) throws IOException { 138 throw new UnsupportedOperationException(); 139 } 140 141 @SuppressWarnings("unchecked") 142 @Override 143 public <M extends Message> M deserialize(Class<M> clazz) 144 throws IOException { 145 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); 146 try { 147 return parser.parseDelimitedFrom(inputStream); 148 } catch (InvalidProtocolBufferException e) { 149 throw e.unwrapIOException(); 150 } 151 } 152 } 153 154 /** 155 * Helper to convert the procedure to protobuf. 156 * <p/> 157 * Used by ProcedureStore implementations. 158 */ 159 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc) 160 throws IOException { 161 Preconditions.checkArgument(proc != null); 162 validateClass(proc); 163 164 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() 165 .setClassName(proc.getClass().getName()) 166 .setProcId(proc.getProcId()) 167 .setState(proc.getState()) 168 .setSubmittedTime(proc.getSubmittedTime()) 169 .setLastUpdate(proc.getLastUpdate()); 170 171 if (proc.hasParent()) { 172 builder.setParentId(proc.getParentProcId()); 173 } 174 175 if (proc.hasTimeout()) { 176 builder.setTimeout(proc.getTimeout()); 177 } 178 179 if (proc.hasOwner()) { 180 builder.setOwner(proc.getOwner()); 181 } 182 183 final int[] stackIds = proc.getStackIndexes(); 184 if (stackIds != null) { 185 for (int i = 0; i < stackIds.length; ++i) { 186 builder.addStackId(stackIds[i]); 187 } 188 } 189 190 if (proc.hasException()) { 191 RemoteProcedureException exception = proc.getException(); 192 builder.setException( 193 RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); 194 } 195 196 final byte[] result = proc.getResult(); 197 if (result != null) { 198 builder.setResult(UnsafeByteOperations.unsafeWrap(result)); 199 } 200 201 ProcedureStateSerializer serializer = new StateSerializer(builder); 202 proc.serializeStateData(serializer); 203 204 if (proc.getNonceKey() != null) { 205 builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); 206 builder.setNonce(proc.getNonceKey().getNonce()); 207 } 208 209 if (proc.hasLock()) { 210 builder.setLocked(true); 211 } 212 213 if (proc.isBypass()) { 214 builder.setBypass(true); 215 } 216 return builder.build(); 217 } 218 219 /** 220 * Helper to convert the protobuf procedure. 221 * <p/> 222 * Used by ProcedureStore implementations. 223 * <p/> 224 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, 225 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of 226 * it by storing the data only on insert(). 227 */ 228 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto) 229 throws IOException { 230 // Procedure from class name 231 Procedure<?> proc = newProcedure(proto.getClassName()); 232 233 // set fields 234 proc.setProcId(proto.getProcId()); 235 proc.setState(proto.getState()); 236 proc.setSubmittedTime(proto.getSubmittedTime()); 237 proc.setLastUpdate(proto.getLastUpdate()); 238 239 if (proto.hasParentId()) { 240 proc.setParentProcId(proto.getParentId()); 241 } 242 243 if (proto.hasOwner()) { 244 proc.setOwner(proto.getOwner()); 245 } 246 247 if (proto.hasTimeout()) { 248 proc.setTimeout(proto.getTimeout()); 249 } 250 251 if (proto.getStackIdCount() > 0) { 252 proc.setStackIndexes(proto.getStackIdList()); 253 } 254 255 if (proto.hasException()) { 256 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED || 257 proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK : 258 "The procedure must be failed (waiting to rollback) or rolledback"; 259 proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); 260 } 261 262 if (proto.hasResult()) { 263 proc.setResult(proto.getResult().toByteArray()); 264 } 265 266 if (proto.getNonce() != HConstants.NO_NONCE) { 267 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); 268 } 269 270 if (proto.getLocked()) { 271 proc.lockedWhenLoading(); 272 } 273 274 if (proto.getBypass()) { 275 proc.bypass(null); 276 } 277 278 ProcedureStateSerializer serializer = null; 279 280 if (proto.getStateMessageCount() > 0) { 281 serializer = new StateSerializer(proto.toBuilder()); 282 } else if (proto.hasStateData()) { 283 InputStream inputStream = proto.getStateData().newInput(); 284 serializer = new CompatStateSerializer(inputStream); 285 } 286 287 if (serializer != null) { 288 proc.deserializeStateData(serializer); 289 } 290 291 return proc; 292 } 293 294 // ========================================================================== 295 // convert from LockedResource object 296 // ========================================================================== 297 298 public static LockServiceProtos.LockedResourceType convertToProtoResourceType( 299 LockedResourceType resourceType) { 300 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); 301 } 302 303 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { 304 return LockServiceProtos.LockType.valueOf(lockType.name()); 305 } 306 307 public static LockServiceProtos.LockedResource convertToProtoLockedResource( 308 LockedResource lockedResource) throws IOException { 309 LockServiceProtos.LockedResource.Builder builder = 310 LockServiceProtos.LockedResource.newBuilder(); 311 312 builder 313 .setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) 314 .setResourceName(lockedResource.getResourceName()) 315 .setLockType(convertToProtoLockType(lockedResource.getLockType())); 316 317 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); 318 319 if (exclusiveLockOwnerProcedure != null) { 320 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = 321 convertToProtoProcedure(exclusiveLockOwnerProcedure); 322 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); 323 } 324 325 builder.setSharedLockCount(lockedResource.getSharedLockCount()); 326 327 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { 328 ProcedureProtos.Procedure waitingProcedureProto = 329 convertToProtoProcedure(waitingProcedure); 330 builder.addWaitingProcedures(waitingProcedureProto); 331 } 332 333 return builder.build(); 334 } 335 336 /** 337 * Get an exponential backoff time, in milliseconds. The base unit is 1 second, and the max 338 * backoff time is 10 minutes. This is the general backoff policy for most procedure 339 * implementation. 340 */ 341 public static long getBackoffTimeMs(int attempts) { 342 long maxBackoffTime = 10L * 60 * 1000; // Ten minutes, hard coded for now. 343 // avoid overflow 344 if (attempts >= 30) { 345 return maxBackoffTime; 346 } 347 long backoffTimeMs = Math.min((long) (1000 * Math.pow(2, attempts)), maxBackoffTime); 348 // 1% possible jitter 349 long jitter = (long) (backoffTimeMs * ThreadLocalRandom.current().nextFloat() * 0.01f); 350 return backoffTimeMs + jitter; 351 } 352}