001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.InputStream; 022import java.lang.reflect.Constructor; 023import java.lang.reflect.Modifier; 024import java.util.concurrent.TimeUnit; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.util.NonceKey; 028import org.apache.hadoop.hbase.util.RetryCounter; 029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit; 030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 034import org.apache.hbase.thirdparty.com.google.protobuf.Any; 035import org.apache.hbase.thirdparty.com.google.protobuf.Internal; 036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; 037import org.apache.hbase.thirdparty.com.google.protobuf.Message; 038import org.apache.hbase.thirdparty.com.google.protobuf.Parser; 039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; 040 041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 043 044/** 045 * Helper to convert to/from ProcedureProtos 046 */ 047@InterfaceAudience.Private 048public final class ProcedureUtil { 049 050 private ProcedureUtil() { 051 } 052 053 // ========================================================================== 054 // Reflection helpers to create/validate a Procedure object 055 // ========================================================================== 056 private static Procedure<?> newProcedure(String className) throws BadProcedureException { 057 try { 058 Class<?> clazz = Class.forName(className); 059 if (!Modifier.isPublic(clazz.getModifiers())) { 060 throw new Exception("the " + clazz + " class is not public"); 061 } 062 063 @SuppressWarnings("rawtypes") 064 Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor(); 065 assert ctor != null : "no constructor found"; 066 if (!Modifier.isPublic(ctor.getModifiers())) { 067 throw new Exception("the " + clazz + " constructor is not public"); 068 } 069 return ctor.newInstance(); 070 } catch (Exception e) { 071 throw new BadProcedureException( 072 "The procedure class " + className + " must be accessible and have an empty constructor", 073 e); 074 } 075 } 076 077 static void validateClass(Procedure<?> proc) throws BadProcedureException { 078 try { 079 Class<?> clazz = proc.getClass(); 080 if (!Modifier.isPublic(clazz.getModifiers())) { 081 throw new Exception("the " + clazz + " class is not public"); 082 } 083 084 Constructor<?> ctor = clazz.getConstructor(); 085 assert ctor != null; 086 if (!Modifier.isPublic(ctor.getModifiers())) { 087 throw new Exception("the " + clazz + " constructor is not public"); 088 } 089 } catch (Exception e) { 090 throw new BadProcedureException("The procedure class " + proc.getClass().getName() 091 + " must be accessible and have an empty constructor", e); 092 } 093 } 094 095 // ========================================================================== 096 // convert to and from Procedure object 097 // ========================================================================== 098 099 /** 100 * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage 101 * list to store the internal state of the Procedures. 102 */ 103 private static class StateSerializer implements ProcedureStateSerializer { 104 private final ProcedureProtos.Procedure.Builder builder; 105 private int deserializeIndex; 106 107 public StateSerializer(ProcedureProtos.Procedure.Builder builder) { 108 this.builder = builder; 109 } 110 111 @Override 112 public void serialize(Message message) throws IOException { 113 Any packedMessage = Any.pack(message); 114 builder.addStateMessage(packedMessage); 115 } 116 117 @Override 118 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 119 if (deserializeIndex >= builder.getStateMessageCount()) { 120 throw new IOException("Invalid state message index: " + deserializeIndex); 121 } 122 123 try { 124 Any packedMessage = builder.getStateMessage(deserializeIndex++); 125 return packedMessage.unpack(clazz); 126 } catch (InvalidProtocolBufferException e) { 127 throw e.unwrapIOException(); 128 } 129 } 130 } 131 132 /** 133 * A serializer (deserializer) for those Procedures which were serialized before this patch. It 134 * deserializes the old, binary stateData field. 135 */ 136 private static class CompatStateSerializer implements ProcedureStateSerializer { 137 private InputStream inputStream; 138 139 public CompatStateSerializer(InputStream inputStream) { 140 this.inputStream = inputStream; 141 } 142 143 @Override 144 public void serialize(Message message) throws IOException { 145 throw new UnsupportedOperationException(); 146 } 147 148 @SuppressWarnings("unchecked") 149 @Override 150 public <M extends Message> M deserialize(Class<M> clazz) throws IOException { 151 Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType(); 152 try { 153 return parser.parseDelimitedFrom(inputStream); 154 } catch (InvalidProtocolBufferException e) { 155 throw e.unwrapIOException(); 156 } 157 } 158 } 159 160 /** 161 * Helper to convert the procedure to protobuf. 162 * <p/> 163 * Used by ProcedureStore implementations. 164 */ 165 public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc) 166 throws IOException { 167 Preconditions.checkArgument(proc != null); 168 validateClass(proc); 169 170 final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() 171 .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState()) 172 .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate()); 173 174 if (proc.hasParent()) { 175 builder.setParentId(proc.getParentProcId()); 176 } 177 178 if (proc.hasTimeout()) { 179 builder.setTimeout(proc.getTimeout()); 180 } 181 182 if (proc.hasOwner()) { 183 builder.setOwner(proc.getOwner()); 184 } 185 186 final int[] stackIds = proc.getStackIndexes(); 187 if (stackIds != null) { 188 for (int i = 0; i < stackIds.length; ++i) { 189 builder.addStackId(stackIds[i]); 190 } 191 } 192 builder.setExecuted(proc.wasExecuted()); 193 194 if (proc.hasException()) { 195 RemoteProcedureException exception = proc.getException(); 196 builder.setException( 197 RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); 198 } 199 200 final byte[] result = proc.getResult(); 201 if (result != null) { 202 builder.setResult(UnsafeByteOperations.unsafeWrap(result)); 203 } 204 205 ProcedureStateSerializer serializer = new StateSerializer(builder); 206 proc.serializeStateData(serializer); 207 208 if (proc.getNonceKey() != null) { 209 builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); 210 builder.setNonce(proc.getNonceKey().getNonce()); 211 } 212 213 if (proc.hasLock()) { 214 builder.setLocked(true); 215 } 216 217 if (proc.isBypass()) { 218 builder.setBypass(true); 219 } 220 return builder.build(); 221 } 222 223 /** 224 * Helper to convert the protobuf procedure. 225 * <p/> 226 * Used by ProcedureStore implementations. 227 * <p/> 228 * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className, 229 * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of 230 * it by storing the data only on insert(). 231 */ 232 public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto) 233 throws IOException { 234 // Procedure from class name 235 Procedure<?> proc = newProcedure(proto.getClassName()); 236 237 // set fields 238 proc.setProcId(proto.getProcId()); 239 proc.setState(proto.getState()); 240 proc.setSubmittedTime(proto.getSubmittedTime()); 241 proc.setLastUpdate(proto.getLastUpdate()); 242 243 if (proto.hasParentId()) { 244 proc.setParentProcId(proto.getParentId()); 245 } 246 247 if (proto.hasOwner()) { 248 proc.setOwner(proto.getOwner()); 249 } 250 251 if (proto.hasTimeout()) { 252 proc.setTimeout(proto.getTimeout()); 253 } 254 255 if (proto.getStackIdCount() > 0) { 256 proc.setStackIndexes(proto.getStackIdList()); 257 } 258 if (proto.getExecuted()) { 259 proc.setExecuted(); 260 } 261 262 if (proto.hasException()) { 263 assert proc.getState() == ProcedureProtos.ProcedureState.FAILED 264 || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK 265 : "The procedure must be failed (waiting to rollback) or rolledback"; 266 proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); 267 } 268 269 if (proto.hasResult()) { 270 proc.setResult(proto.getResult().toByteArray()); 271 } 272 273 if (proto.getNonce() != HConstants.NO_NONCE) { 274 proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce())); 275 } 276 277 if (proto.getLocked()) { 278 proc.lockedWhenLoading(); 279 } 280 281 if (proto.getBypass()) { 282 proc.bypass(null); 283 } 284 285 ProcedureStateSerializer serializer = null; 286 287 if (proto.getStateMessageCount() > 0) { 288 serializer = new StateSerializer(proto.toBuilder()); 289 } else if (proto.hasStateData()) { 290 InputStream inputStream = proto.getStateData().newInput(); 291 serializer = new CompatStateSerializer(inputStream); 292 } 293 294 if (serializer != null) { 295 proc.deserializeStateData(serializer); 296 } 297 298 return proc; 299 } 300 301 // ========================================================================== 302 // convert from LockedResource object 303 // ========================================================================== 304 305 public static LockServiceProtos.LockedResourceType 306 convertToProtoResourceType(LockedResourceType resourceType) { 307 return LockServiceProtos.LockedResourceType.valueOf(resourceType.name()); 308 } 309 310 public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) { 311 return LockServiceProtos.LockType.valueOf(lockType.name()); 312 } 313 314 public static LockServiceProtos.LockedResource 315 convertToProtoLockedResource(LockedResource lockedResource) throws IOException { 316 LockServiceProtos.LockedResource.Builder builder = 317 LockServiceProtos.LockedResource.newBuilder(); 318 319 builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType())) 320 .setResourceName(lockedResource.getResourceName()) 321 .setLockType(convertToProtoLockType(lockedResource.getLockType())); 322 323 Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure(); 324 325 if (exclusiveLockOwnerProcedure != null) { 326 ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto = 327 convertToProtoProcedure(exclusiveLockOwnerProcedure); 328 builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto); 329 } 330 331 builder.setSharedLockCount(lockedResource.getSharedLockCount()); 332 333 for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) { 334 ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure); 335 builder.addWaitingProcedures(waitingProcedureProto); 336 } 337 338 return builder.build(); 339 } 340 341 public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 342 "hbase.procedure.retry.sleep.interval.ms"; 343 344 // default to 1 second 345 public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000; 346 347 public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 348 "hbase.procedure.retry.max.sleep.time.ms"; 349 350 // default to 10 minutes 351 public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS = 352 TimeUnit.MINUTES.toMillis(10); 353 354 /** 355 * Get a retry counter for getting the backoff time. We will use the 356 * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time 357 * is 10 minutes by default. 358 * <p/> 359 * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and 360 * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not 361 * timeout. 362 */ 363 public static RetryCounter createRetryCounter(Configuration conf) { 364 long sleepIntervalMs = 365 conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS); 366 long maxSleepTimeMs = 367 conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS); 368 RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs) 369 .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit()); 370 return new RetryCounter(retryConfig); 371 } 372 373 public static boolean isFinished(ProcedureProtos.Procedure proc) { 374 if (!proc.hasParentId()) { 375 switch (proc.getState()) { 376 case ROLLEDBACK: 377 case SUCCESS: 378 return true; 379 default: 380 break; 381 } 382 } 383 return false; 384 } 385}