[jira] [Commented] (APEXCORE-714) Reusable instance operator recovery

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Commented] (APEXCORE-714) Reusable instance operator recovery

JIRA jira@apache.org

    [ https://issues.apache.org/jira/browse/APEXCORE-714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16518201#comment-16518201 ]

ASF GitHub Bot commented on APEXCORE-714:
-----------------------------------------

vrozov closed pull request #522: APEXCORE-714 Adding a new recovery mode where the operator instance before a failure event can be reused
URL: https://github.com/apache/apex-core/pull/522
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java
index 9fe0c46176..0da930d533 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -27,6 +27,7 @@
 
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.Operator.RecoveryMode;
 import com.datatorrent.api.StringCodec.Class2String;
 import com.datatorrent.api.StringCodec.Collection2String;
 import com.datatorrent.api.StringCodec.Integer2String;
@@ -316,6 +317,14 @@
      */
     Attribute<AutoMetric.DimensionsScheme> METRICS_DIMENSIONS_SCHEME = new Attribute<>(Object2String.<AutoMetric.DimensionsScheme>getInstance());
 
+    /**
+     * Specify how to recover the operator in cases of a failure event. The default is to load from checkpoint. However,
+     * in some cases reusing same instance of the operator from before the failure event may be desired. See
+     * {@link RecoveryMode} The latter is only applicable in cases where the recovery is due to a failure of the
+     * upstream operator and not the operator itself.
+     */
+    Attribute<RecoveryMode> RECOVERY_MODE = new Attribute<RecoveryMode>(RecoveryMode.CHECKPOINT);
+
     /**
      * Return the operator runtime id.
      *
diff --git a/api/src/main/java/com/datatorrent/api/DAG.java b/api/src/main/java/com/datatorrent/api/DAG.java
index 93936d7616..471950b5a0 100644
--- a/api/src/main/java/com/datatorrent/api/DAG.java
+++ b/api/src/main/java/com/datatorrent/api/DAG.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 
 /**
  * DAG contains the logical declarations of operators and streams.
@@ -190,6 +191,8 @@
 
     OutputPortMeta getMeta(Operator.OutputPort<?> port);
 
+    OperatorAnnotation getOperatorAnnotation();
+
     /**
      * Return collection of stream which are connected to this operator's
      * input ports.
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index dd694d01d4..f0357e5cda 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.api;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Context.PortContext;
 import com.datatorrent.api.DAG.GenericOperator;
@@ -71,6 +73,25 @@ public String toString()
 
   }
 
+  @Evolving
+  enum RecoveryMode
+  {
+    /**
+     * Recover the operator from checkpoint
+     */
+    CHECKPOINT,
+    /**
+     * Reuse the same instance of the operator from before the failure event.
+     *
+     * This applies to scenarios where the failure is in an upstream operator and the not the operator itself.
+     * Reusing the same instance may not be applicable in all cases as it can lead to incorrect results because the
+     * operator state will not be consistent with the processing position in the stream. This should be used only for
+     * operators that are either invariant to reusing the same state with the stream processing position modified
+     * according to the processing mode or tolerant to it.
+     */
+    REUSE_INSTANCE
+  }
+
   /**
    * This method gets called at the beginning of each window.
    *
@@ -227,7 +248,8 @@ public String toString()
   {
     /**
      * Do the operations just before the operator starts processing tasks within the windows.
-     * e.g. establish a network connection.
+     * e.g. establish a network connection. This method is called irrespective of what {@link RecoveryMode}
+     * is being used.
      * @param context - the context in which the operator is executing.
      */
     void activate(CONTEXT context);
diff --git a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
index 16fd370dbe..e7922b3c67 100644
--- a/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
+++ b/api/src/main/java/com/datatorrent/api/annotation/OperatorAnnotation.java
@@ -24,6 +24,8 @@
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
+import com.datatorrent.api.Operator;
+
 /**
  * Annotation to specify characteristics of an operator.
  *
@@ -49,4 +51,13 @@
    * @return whether operator can be checkpointed in middle of an application window.
    */
   boolean checkpointableWithinAppWindow() default true;
+
+  /**
+   * Element specifying the recovery mode for the operator.
+   * By default the operator state is recovered from checkpoint. The operator developer can indicate a preference for
+   * the recovery mode with this element, see {@link Operator.RecoveryMode}. The application developer can override this
+   * by setting the {@link Context.OperatorContext#RECOVERY_MODE} attribute.
+   * @return
+   */
+  Operator.RecoveryMode recoveryMode() default Operator.RecoveryMode.CHECKPOINT;
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index 88b002f2a6..5f222c53cb 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -135,6 +135,8 @@
   public long firstWindowMillis;
   public long windowWidthMillis;
 
+  protected boolean reuseOperator;
+
   public Node(OPERATOR operator, OperatorContext context)
   {
     this.operator = operator;
@@ -180,12 +182,19 @@ public Operator getOperator()
     return operator;
   }
 
+  public void setReuseOperator(boolean reuseOperator)
+  {
+    this.reuseOperator = reuseOperator;
+  }
+
   @Override
   public void setup(OperatorContext context)
   {
     shutdown = false;
-    logger.debug("Operator Context = {}", context);
-    operator.setup(context);
+    if (!reuseOperator) {
+      logger.debug("Operator Context = {}", context);
+      operator.setup(context);
+    }
 //    this is where the ports should be setup but since the
 //    portcontext is not available here, we are doing it in
 //    StramChild. In future version, we should move that code here
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 927ad6d76b..699d646ef4 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -66,6 +66,7 @@
 import com.datatorrent.api.StreamCodec;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.StringCodec;
+import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.bufferserver.storage.DiskStorage;
@@ -163,6 +164,8 @@
   private RequestFactory requestFactory;
   private TokenRenewer tokenRenewer;
 
+  private Map<Integer, Node<?>> reuseOpNodes = new HashMap<>();
+
   static {
     try {
       eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
@@ -518,7 +521,7 @@ private void disconnectWindowGenerator(int nodeid, Node<?> node)
     }
   }
 
-  private synchronized void undeploy(List<Integer> nodeList)
+  private synchronized Map<Integer, Node<?>> undeploy(List<Integer> nodeList)
   {
     /**
      * make sure that all the operators which we are asked to undeploy are in this container.
@@ -565,6 +568,8 @@ private synchronized void undeploy(List<Integer> nodeList)
     for (Integer operatorId : nodeList) {
       nodes.remove(operatorId);
     }
+
+    return toUndeploy;
   }
 
   public void teardown()
@@ -800,7 +805,9 @@ public void processHeartbeatResponse(ContainerHeartbeatResponse rsp)
     if (rsp.undeployRequest != null) {
       logger.info("Undeploy request: {}", rsp.undeployRequest);
       processNodeRequests(false);
-      undeploy(rsp.undeployRequest);
+      Map<Integer, Node<?>> undeployNodes = undeploy(rsp.undeployRequest);
+      undeployNodes.entrySet().removeIf((entry) -> !isReuseOperator(entry.getValue()));
+      reuseOpNodes.putAll(undeployNodes);
     }
 
     if (rsp.shutdown != null) {
@@ -833,6 +840,19 @@ public void processHeartbeatResponse(ContainerHeartbeatResponse rsp)
     processNodeRequests(true);
   }
 
+  private boolean isReuseOperator(Node<?> node)
+  {
+    if (node.context.getAttributes().contains(OperatorContext.RECOVERY_MODE)) {
+      return node.context.getValue(OperatorContext.RECOVERY_MODE) == Operator.RecoveryMode.REUSE_INSTANCE;
+    } else {
+      if (node.operator.getClass().isAnnotationPresent(OperatorAnnotation.class)) {
+        return node.operator.getClass().getAnnotation(OperatorAnnotation.class).recoveryMode() == Operator.RecoveryMode.REUSE_INSTANCE;
+      }
+    }
+    logger.debug("Is reuse operator {} {}", node, false);
+    return false;
+  }
+
   private void stopInputNodes()
   {
     for (Entry<Integer, Node<?>> e : nodes.entrySet()) {
@@ -924,8 +944,16 @@ private void deployNodes(List<OperatorDeployInfo> nodeList) throws IOException
 
       OperatorContext ctx = new OperatorContext(ndi.id, ndi.name, ndi.contextAttributes, parentContext);
       ctx.attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, ndi.checkpoint.windowId);
-      logger.debug("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
-      Node<?> node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+      Node<?> node = reuseOpNodes.get(ndi.id);
+      if (node == null) {
+        logger.info("Restoring operator {} to checkpoint {} stateless={}.", ndi.id, Codec.getStringWindowId(ndi.checkpoint.windowId), ctx.stateless);
+        node = Node.retrieveNode(backupAgent.load(ndi.id, ctx.stateless ? Stateless.WINDOW_ID : ndi.checkpoint.windowId), ctx, ndi.type);
+      } else {
+        logger.info("Reusing previous operator instance {}", ndi.id);
+        node = Node.retrieveNode(node.operator, ctx, ndi.type);
+        node.setReuseOperator(true);
+        reuseOpNodes.remove(ndi.id);
+      }
       node.currentWindowId = ndi.checkpoint.windowId;
       node.applicationWindowCount = ndi.checkpoint.applicationWindowCount;
       node.firstWindowMillis = firstWindowMillis;
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
index 18a9a63574..74510a66a7 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java
@@ -1165,6 +1165,12 @@ public OutputPortMeta getMeta(Operator.OutputPort<?> port)
       return getPortMapping().outPortMap.get(port);
     }
 
+    @Override
+    public OperatorAnnotation getOperatorAnnotation()
+    {
+      return operatorAnnotation;
+    }
+
     @Override
     public InputPortMeta getMeta(Operator.InputPort<?> port)
     {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[hidden email]


> Reusable instance operator recovery
> -----------------------------------
>
>                 Key: APEXCORE-714
>                 URL: https://issues.apache.org/jira/browse/APEXCORE-714
>             Project: Apache Apex Core
>          Issue Type: Improvement
>            Reporter: Pramod Immaneni
>            Assignee: Pramod Immaneni
>            Priority: Major
>
> In a failure scenario, when a container fails, it is redeployed along with all the operators in it. The operators downstream to these operators are also redeployed within their containers. The operators are restored from their checkpoint and connect to the appropriate point in the stream according to the processing mode. In at least once mode, for example, the data is replayed from the same checkpoint
> Restoring an operator state from checkpoint could turn out to be a costly operation depending on the size of the state. In some use cases, based on the operator logic, when there is an upstream failure, without restoring the operator from checkpoint and reusing the current instance, will still produce the same results with the data replayed from the last fully processed window. The operator state can remain the same as it was before the upstream failure by reusing the same operator instance from before and only the streams and window reset to the window after the last fully processed window to guarantee the at least once processing of tuples. If the container where the operator itself is running goes down, it would need to be restored from the checkpoint of course. This scenario occurs in some batch use cases with operators that have a large state.
> I would like to propose adding the ability for a user to explicitly identify operators to be of this type and the corresponding functionality in the engine to handle their recovery in the way described above by not restoring their state from checkpoint, reusing the instance and restoring the stream to the window after the last fully processed window for the operator. When operators are not identified to be of this type, the default behavior is what it is today and nothing changes.
> I have done some prototyping on the engine side to ensure that this is possible with our current code base without requiring a massive overhaul, especially the restoration of the operator instance within the Node in the streaming container, the re-establishment of the subscriber stream to a window in the buffer server where the publisher (upstream) hasn't yet reached as it would be restarting from checkpoint and have been able to get it all working successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)