001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.lang.reflect.Constructor;
023import java.lang.reflect.Modifier;
024import java.util.concurrent.TimeUnit;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HConstants;
027import org.apache.hadoop.hbase.util.NonceKey;
028import org.apache.hadoop.hbase.util.RetryCounter;
029import org.apache.hadoop.hbase.util.RetryCounter.ExponentialBackoffPolicyWithLimit;
030import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
031import org.apache.yetus.audience.InterfaceAudience;
032
033import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
034import org.apache.hbase.thirdparty.com.google.protobuf.Any;
035import org.apache.hbase.thirdparty.com.google.protobuf.Internal;
036import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
037import org.apache.hbase.thirdparty.com.google.protobuf.Message;
038import org.apache.hbase.thirdparty.com.google.protobuf.Parser;
039import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
040
041import org.apache.hadoop.hbase.shaded.protobuf.generated.LockServiceProtos;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
043
044/**
045 * Helper to convert to/from ProcedureProtos
046 */
047@InterfaceAudience.Private
048public final class ProcedureUtil {
049  private ProcedureUtil() {
050  }
051
052  // ==========================================================================
053  // Reflection helpers to create/validate a Procedure object
054  // ==========================================================================
055  private static Procedure<?> newProcedure(String className) throws BadProcedureException {
056    try {
057      Class<?> clazz = Class.forName(className);
058      if (!Modifier.isPublic(clazz.getModifiers())) {
059        throw new Exception("the " + clazz + " class is not public");
060      }
061
062      @SuppressWarnings("rawtypes")
063      Constructor<? extends Procedure> ctor = clazz.asSubclass(Procedure.class).getConstructor();
064      assert ctor != null : "no constructor found";
065      if (!Modifier.isPublic(ctor.getModifiers())) {
066        throw new Exception("the " + clazz + " constructor is not public");
067      }
068      return ctor.newInstance();
069    } catch (Exception e) {
070      throw new BadProcedureException(
071        "The procedure class " + className + " must be accessible and have an empty constructor",
072        e);
073    }
074  }
075
076  static void validateClass(Procedure<?> proc) throws BadProcedureException {
077    try {
078      Class<?> clazz = proc.getClass();
079      if (!Modifier.isPublic(clazz.getModifiers())) {
080        throw new Exception("the " + clazz + " class is not public");
081      }
082
083      Constructor<?> ctor = clazz.getConstructor();
084      assert ctor != null;
085      if (!Modifier.isPublic(ctor.getModifiers())) {
086        throw new Exception("the " + clazz + " constructor is not public");
087      }
088    } catch (Exception e) {
089      throw new BadProcedureException("The procedure class " + proc.getClass().getName()
090        + " must be accessible and have an empty constructor", e);
091    }
092  }
093
094  // ==========================================================================
095  // convert to and from Procedure object
096  // ==========================================================================
097
098  /**
099   * A serializer for our Procedures. Instead of the previous serializer, it uses the stateMessage
100   * list to store the internal state of the Procedures.
101   */
102  private static class StateSerializer implements ProcedureStateSerializer {
103    private final ProcedureProtos.Procedure.Builder builder;
104    private int deserializeIndex;
105
106    public StateSerializer(ProcedureProtos.Procedure.Builder builder) {
107      this.builder = builder;
108    }
109
110    @Override
111    public void serialize(Message message) throws IOException {
112      Any packedMessage = Any.pack(message);
113      builder.addStateMessage(packedMessage);
114    }
115
116    @Override
117    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
118      if (deserializeIndex >= builder.getStateMessageCount()) {
119        throw new IOException("Invalid state message index: " + deserializeIndex);
120      }
121
122      try {
123        Any packedMessage = builder.getStateMessage(deserializeIndex++);
124        return packedMessage.unpack(clazz);
125      } catch (InvalidProtocolBufferException e) {
126        throw e.unwrapIOException();
127      }
128    }
129  }
130
131  /**
132   * A serializer (deserializer) for those Procedures which were serialized before this patch. It
133   * deserializes the old, binary stateData field.
134   */
135  private static class CompatStateSerializer implements ProcedureStateSerializer {
136    private InputStream inputStream;
137
138    public CompatStateSerializer(InputStream inputStream) {
139      this.inputStream = inputStream;
140    }
141
142    @Override
143    public void serialize(Message message) throws IOException {
144      throw new UnsupportedOperationException();
145    }
146
147    @SuppressWarnings("unchecked")
148    @Override
149    public <M extends Message> M deserialize(Class<M> clazz) throws IOException {
150      Parser<M> parser = (Parser<M>) Internal.getDefaultInstance(clazz).getParserForType();
151      try {
152        return parser.parseDelimitedFrom(inputStream);
153      } catch (InvalidProtocolBufferException e) {
154        throw e.unwrapIOException();
155      }
156    }
157  }
158
159  /**
160   * Helper to convert the procedure to protobuf.
161   * <p/>
162   * Used by ProcedureStore implementations.
163   */
164  public static ProcedureProtos.Procedure convertToProtoProcedure(Procedure<?> proc)
165    throws IOException {
166    Preconditions.checkArgument(proc != null);
167    validateClass(proc);
168
169    final ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder()
170      .setClassName(proc.getClass().getName()).setProcId(proc.getProcId()).setState(proc.getState())
171      .setSubmittedTime(proc.getSubmittedTime()).setLastUpdate(proc.getLastUpdate());
172
173    if (proc.hasParent()) {
174      builder.setParentId(proc.getParentProcId());
175    }
176
177    if (proc.hasTimeout()) {
178      builder.setTimeout(proc.getTimeout());
179    }
180
181    if (proc.hasOwner()) {
182      builder.setOwner(proc.getOwner());
183    }
184
185    final int[] stackIds = proc.getStackIndexes();
186    if (stackIds != null) {
187      for (int i = 0; i < stackIds.length; ++i) {
188        builder.addStackId(stackIds[i]);
189      }
190    }
191
192    if (proc.hasException()) {
193      RemoteProcedureException exception = proc.getException();
194      builder.setException(
195        RemoteProcedureException.toProto(exception.getSource(), exception.getCause()));
196    }
197
198    final byte[] result = proc.getResult();
199    if (result != null) {
200      builder.setResult(UnsafeByteOperations.unsafeWrap(result));
201    }
202
203    ProcedureStateSerializer serializer = new StateSerializer(builder);
204    proc.serializeStateData(serializer);
205
206    if (proc.getNonceKey() != null) {
207      builder.setNonceGroup(proc.getNonceKey().getNonceGroup());
208      builder.setNonce(proc.getNonceKey().getNonce());
209    }
210
211    if (proc.hasLock()) {
212      builder.setLocked(true);
213    }
214
215    if (proc.isBypass()) {
216      builder.setBypass(true);
217    }
218    return builder.build();
219  }
220
221  /**
222   * Helper to convert the protobuf procedure.
223   * <p/>
224   * Used by ProcedureStore implementations.
225   * <p/>
226   * TODO: OPTIMIZATION: some of the field never change during the execution (e.g. className,
227   * procId, parentId, ...). We can split in 'data' and 'state', and the store may take advantage of
228   * it by storing the data only on insert().
229   */
230  public static Procedure<?> convertToProcedure(ProcedureProtos.Procedure proto)
231    throws IOException {
232    // Procedure from class name
233    Procedure<?> proc = newProcedure(proto.getClassName());
234
235    // set fields
236    proc.setProcId(proto.getProcId());
237    proc.setState(proto.getState());
238    proc.setSubmittedTime(proto.getSubmittedTime());
239    proc.setLastUpdate(proto.getLastUpdate());
240
241    if (proto.hasParentId()) {
242      proc.setParentProcId(proto.getParentId());
243    }
244
245    if (proto.hasOwner()) {
246      proc.setOwner(proto.getOwner());
247    }
248
249    if (proto.hasTimeout()) {
250      proc.setTimeout(proto.getTimeout());
251    }
252
253    if (proto.getStackIdCount() > 0) {
254      proc.setStackIndexes(proto.getStackIdList());
255    }
256
257    if (proto.hasException()) {
258      assert proc.getState() == ProcedureProtos.ProcedureState.FAILED
259        || proc.getState() == ProcedureProtos.ProcedureState.ROLLEDBACK
260        : "The procedure must be failed (waiting to rollback) or rolledback";
261      proc.setFailure(RemoteProcedureException.fromProto(proto.getException()));
262    }
263
264    if (proto.hasResult()) {
265      proc.setResult(proto.getResult().toByteArray());
266    }
267
268    if (proto.getNonce() != HConstants.NO_NONCE) {
269      proc.setNonceKey(new NonceKey(proto.getNonceGroup(), proto.getNonce()));
270    }
271
272    if (proto.getLocked()) {
273      proc.lockedWhenLoading();
274    }
275
276    if (proto.getBypass()) {
277      proc.bypass(null);
278    }
279
280    ProcedureStateSerializer serializer = null;
281
282    if (proto.getStateMessageCount() > 0) {
283      serializer = new StateSerializer(proto.toBuilder());
284    } else if (proto.hasStateData()) {
285      InputStream inputStream = proto.getStateData().newInput();
286      serializer = new CompatStateSerializer(inputStream);
287    }
288
289    if (serializer != null) {
290      proc.deserializeStateData(serializer);
291    }
292
293    return proc;
294  }
295
296  // ==========================================================================
297  // convert from LockedResource object
298  // ==========================================================================
299
300  public static LockServiceProtos.LockedResourceType
301    convertToProtoResourceType(LockedResourceType resourceType) {
302    return LockServiceProtos.LockedResourceType.valueOf(resourceType.name());
303  }
304
305  public static LockServiceProtos.LockType convertToProtoLockType(LockType lockType) {
306    return LockServiceProtos.LockType.valueOf(lockType.name());
307  }
308
309  public static LockServiceProtos.LockedResource
310    convertToProtoLockedResource(LockedResource lockedResource) throws IOException {
311    LockServiceProtos.LockedResource.Builder builder =
312      LockServiceProtos.LockedResource.newBuilder();
313
314    builder.setResourceType(convertToProtoResourceType(lockedResource.getResourceType()))
315      .setResourceName(lockedResource.getResourceName())
316      .setLockType(convertToProtoLockType(lockedResource.getLockType()));
317
318    Procedure<?> exclusiveLockOwnerProcedure = lockedResource.getExclusiveLockOwnerProcedure();
319
320    if (exclusiveLockOwnerProcedure != null) {
321      ProcedureProtos.Procedure exclusiveLockOwnerProcedureProto =
322        convertToProtoProcedure(exclusiveLockOwnerProcedure);
323      builder.setExclusiveLockOwnerProcedure(exclusiveLockOwnerProcedureProto);
324    }
325
326    builder.setSharedLockCount(lockedResource.getSharedLockCount());
327
328    for (Procedure<?> waitingProcedure : lockedResource.getWaitingProcedures()) {
329      ProcedureProtos.Procedure waitingProcedureProto = convertToProtoProcedure(waitingProcedure);
330      builder.addWaitingProcedures(waitingProcedureProto);
331    }
332
333    return builder.build();
334  }
335
336  public static final String PROCEDURE_RETRY_SLEEP_INTERVAL_MS =
337    "hbase.procedure.retry.sleep.interval.ms";
338
339  // default to 1 second
340  public static final long DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS = 1000;
341
342  public static final String PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
343    "hbase.procedure.retry.max.sleep.time.ms";
344
345  // default to 10 minutes
346  public static final long DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS =
347    TimeUnit.MINUTES.toMillis(10);
348
349  /**
350   * Get a retry counter for getting the backoff time. We will use the
351   * {@link ExponentialBackoffPolicyWithLimit} policy, and the base unit is 1 second, max sleep time
352   * is 10 minutes by default.
353   * <p/>
354   * For UTs, you can set the {@link #PROCEDURE_RETRY_SLEEP_INTERVAL_MS} and
355   * {@link #PROCEDURE_RETRY_MAX_SLEEP_TIME_MS} to make more frequent retry so your UT will not
356   * timeout.
357   */
358  public static RetryCounter createRetryCounter(Configuration conf) {
359    long sleepIntervalMs =
360      conf.getLong(PROCEDURE_RETRY_SLEEP_INTERVAL_MS, DEFAULT_PROCEDURE_RETRY_SLEEP_INTERVAL_MS);
361    long maxSleepTimeMs =
362      conf.getLong(PROCEDURE_RETRY_MAX_SLEEP_TIME_MS, DEFAULT_PROCEDURE_RETRY_MAX_SLEEP_TIME_MS);
363    RetryConfig retryConfig = new RetryConfig().setSleepInterval(sleepIntervalMs)
364      .setMaxSleepTime(maxSleepTimeMs).setBackoffPolicy(new ExponentialBackoffPolicyWithLimit());
365    return new RetryCounter(retryConfig);
366  }
367
368  public static boolean isFinished(ProcedureProtos.Procedure proc) {
369    if (!proc.hasParentId()) {
370      switch (proc.getState()) {
371        case ROLLEDBACK:
372        case SUCCESS:
373          return true;
374        default:
375          break;
376      }
377    }
378    return false;
379  }
380}