001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.Modifier; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.util.NonceKey; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.protobuf.Any; 035import org.apache.hbase.thirdparty.com.google.protobuf.Internal; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037import org.apache.hbase.thirdparty.com.google.protobuf.Message; 038import org.apache.hbase.thirdparty.com.google.protobuf.Parser; 039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 043 044/** 045 * Helper to convert to/from ProcedureProtos 046 */ 047@InterfaceAudience.Private 048public final class ProcedureUtil { 049 private ProcedureUtil() { } 050 051 // ========================================================================== 052 // Reflection helpers to create/validate a Procedure object 053 // ========================================================================== 054 private static Procedure<?> newProcedure(String className) throws BadProcedureException { 055 try { 056 Class<?> clazz = Class.forName(className); 057 if (!Modifier.isPublic(clazz.getModifiers())) { 058 throw new Exception("the " + clazz + " class is not public"); 059 } 060 061 @SuppressWarnings("rawtypes") 062 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor(); 063 assert ctor != null : "no constructor found"; 064 if (!Modifier.isPublic(ctor.getModifiers())) { 065 throw new Exception("the " + clazz + " constructor is not public"); 066 } 067 return ctor.newInstance(); 068 } catch (Exception e) { 069 throw new BadProcedureException( 070 "The procedure class " + className + " must be accessible and have an empty constructor", 071 e); 072 } 073 } 074 075 static void validateClass(Procedure<?> proc) throws BadProcedureException { 076 try { 077 Class<?> clazz = proc.getClass(); 078 if (!Modifier.isPublic(clazz.getModifiers())) { 079 throw new Exception("the " + clazz + " class is not public"); 080 } 081 082 Constructor<?> ctor = clazz.getConstructor(); 083 assert ctor != null; 084 if (!Modifier.isPublic(ctor.getModifiers())) { 085 throw new Exception("the " + clazz + " constructor is not public"); 086 } 087 } catch (Exception e) { 088 throw new BadProcedureException("The procedure class " + proc.getClass().getName() + 089 " must be accessible and have an empty constructor", e); 090 } 091 } 092 093 // ========================================================================== 094 // convert to and from Procedure object 095 // ========================================================================== 096 097 /** 098 * A serializer for our Procedures. Instead of the previous serializer, it 099 * uses the stateMessage list to store the internal state of the Procedures. 100 */ 101 private static class StateSerializer implements ProcedureStateSerializer { 102 private final ProcedureProtos.Procedure.Builder builder; 103 private int deserializeIndex; 104 105 public StateSerializer(ProcedureProtos.Procedure.Builder builder) { 106 this.builder = builder; 107 } 108 109 @Override 110 public void serialize(Message message) throws IOException { 111 Any packedMessage = Any.pack(message); 112 builder.addStateMessage(packedMessage); 113 } 114 115 @Override 116 public <M extends Message> M deserialize(Class<M> clazz) 117 throws IOException { 118 if (deserializeIndex >= builder.getStateMessageCount()) { 119 throw new IOException("Invalid state message index: " + deserializeIndex); 120 } 121 122 try { 123 Any packedMessage = builder.getStateMessage(deserializeIndex++); 124 return packedMessage.unpack(clazz); 125 } catch (InvalidProtocolBufferException e) { 126 throw e.unwrapIOException(); 127 } 128 } 129 } 130 131 /** 132 * A serializer (deserializer) for those Procedures which were serialized 133 * before this patch. It deserializes the old, binary stateData field. 134 */ 135 private static class CompatStateSerializer implements ProcedureStateSerializer { 136 private InputStream inputStream; 137 138 public CompatStateSerializer(InputStream inputStream) { 139 this.inputStream = inputStream; 140 } 141 142 @Override 143 public void serialize(Message message) throws IOException { 144 throw new UnsupportedOperationException(); 145 } 146 147 @SuppressWarnings("unchecked") 148 @Override 149 public <M extends Message> M deserialize(Class<M> clazz) 150 throws IOException { 151 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); 152 try { 153 return parser.parseDelimitedFrom(inputStream); 154 } catch (InvalidProtocolBufferException e) { 155 throw e.unwrapIOException(); 156 } 157 } 158 } 159 160 /** 161 * Helper to convert the procedure to protobuf. 162 * <p/> 163 * Used by ProcedureStore implementations. 164 */ 165 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc) 166 throws IOException { 167 Preconditions.checkArgument(proc != null); 168 validateClass(proc); 169 170 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() 171 .setClassName(proc.getClass().getName()) 172 .setProcId(proc.getProcId()) 173 .setState(proc.getState()) 174 .setSubmittedTime(proc.getSubmittedTime()) 175 .setLastUpdate(proc.getLastUpdate()); 176 177 if (proc.hasParent()) { 178 builder.setParentId(proc.getParentProcId()); 179 } 180 181 if (proc.hasTimeout()) { 182 builder.setTimeout(proc.getTimeout()); 183 } 184 185 if (proc.hasOwner()) { 186 builder.setOwner(proc.getOwner()); 187 } 188 189 final int[] stackIds = proc.getStackIndexes(); 190 if (stackIds != null) { 191 for (int i = 0; i < stackIds.length; ++i) { 192 builder.addStackId(stackIds[i]); 193 } 194 } 195 196 if (proc.hasException()) { 197 RemoteProcedureException exception = proc.getException(); 198 builder.setException( 199 RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); 200 } 201 202 final byte[] result = proc.getResult(); 203 if (result != null) { 204 builder.setResult(UnsafeByteOperations.unsafeWrap(result)); 205 } 206 207 ProcedureStateSerializer serializer = new StateSerializer(builder); 208 proc.serializeStateData(serializer); 209 210 if (proc.getNonceKey() != null) { 211 builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); 212 builder.setNonce(proc.getNonceKey().getNonce()); 213 } 214 215 if (proc.hasLock()) { 216 builder.setLocked(true); 217 } 218 219 if (proc.isBypass()) { 220 builder.setBypass(true); 221 } 222 return builder.build(); 223 } 224 225 /** 226 * Helper to convert the protobuf procedure. 227 * <p/> 228 * Used by ProcedureStore implementations. 229 * <p/> 230 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, 231 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of 232 * it by storing the data only on insert(). 233 */ 234 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto) 235 throws IOException { 236 // Procedure from class name 237 Procedure<?> proc = newProcedure(proto.getClassName()); 238 239 // set fields 240 proc.setProcId(proto.getProcId()); 241 proc.setState(proto.getState()); 242 proc.setSubmittedTime(proto.getSubmittedTime()); 243 proc.setLastUpdate(proto.getLastUpdate()); 244 245 if (proto.hasParentId()) { 246 proc.setParentProcId(proto.getParentId()); 247 } 248 249 if (proto.hasOwner()) { 250 proc.setOwner(proto.getOwner()); 251 } 252 253 if (proto.hasTimeout()) { 254 proc.setTimeout(proto.getTimeout()); 255 } 256 257 if (proto.getStackIdCount() > 0) { 258 proc.setStackIndexes(proto.getStackIdList()); 259 } 260 261 if (proto.hasException()) { 262 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED || 263 proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK : 264 "The procedure must be failed (waiting to rollback) or rolledback"; 265 proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); 266 } 267 268 if (proto.hasResult()) { 269 proc.setResult(proto.getResult().toByteArray()); 270 } 271 272 if (proto.getNonce() != HConstants.NO_NONCE) { 273 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); 274 } 275 276 if (proto.getLocked()) { 277 proc.lockedWhenLoading(); 278 } 279 280 if (proto.getBypass()) { 281 proc.bypass(null); 282 } 283 284 ProcedureStateSerializer serializer = null; 285 286 if (proto.getStateMessageCount() > 0) { 287 serializer = new StateSerializer(proto.toBuilder()); 288 } else if (proto.hasStateData()) { 289 InputStream inputStream = proto.getStateData().newInput(); 290 serializer = new CompatStateSerializer(inputStream); 291 } 292 293 if (serializer != null) { 294 proc.deserializeStateData(serializer); 295 } 296 297 return proc; 298 } 299 300 // ========================================================================== 301 // convert from LockedResource object 302 // ========================================================================== 303 304 public static LockServiceProtos.LockedResourceType convertToProtoResourceType( 305 LockedResourceType resourceType) { 306 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); 307 } 308 309 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { 310 return LockServiceProtos.LockType.valueOf(lockType.name()); 311 } 312 313 public static LockServiceProtos.LockedResource convertToProtoLockedResource( 314 LockedResource lockedResource) throws IOException { 315 LockServiceProtos.LockedResource.Builder builder = 316 LockServiceProtos.LockedResource.newBuilder(); 317 318 builder 319 .setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) 320 .setResourceName(lockedResource.getResourceName()) 321 .setLockType(convertToProtoLockType(lockedResource.getLockType())); 322 323 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); 324 325 if (exclusiveLockOwnerProcedure != null) { 326 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = 327 convertToProtoProcedure(exclusiveLockOwnerProcedure); 328 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); 329 } 330 331 builder.setSharedLockCount(lockedResource.getSharedLockCount()); 332 333 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { 334 ProcedureProtos.Procedure waitingProcedureProto = 335 convertToProtoProcedure(waitingProcedure); 336 builder.addWaitingProcedures(waitingProcedureProto); 337 } 338 339 return builder.build(); 340 } 341 342 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 343 "hbase.procedure.retry.sleep.interval.ms"; 344 345 // default to 1 second 346 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000; 347 348 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 349 "hbase.procedure.retry.max.sleep.time.ms"; 350 351 // default to 10 minutes 352 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 353 TimeUnit.MINUTES.toMillis(10); 354 355 /** 356 * Get a retry counter for getting the backoff time. We will use the 357 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time 358 * is 10 minutes by default. 359 * <p/> 360 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and 361 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not 362 * timeout. 363 */ 364 public static RetryCounter createRetryCounter(Configuration conf) { 365 long sleepIntervalMs = 366 conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS); 367 long maxSleepTimeMs = 368 conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS); 369 RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs) 370 .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()); 371 return new RetryCounter(retryConfig); 372 } 373}